You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ri...@apache.org on 2022/07/13 18:21:05 UTC

[ozone] branch master updated: HDDS-6909. [Multi-Tenant] Use RangerClient for Ranger operations (#3576)

This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f3577a452 HDDS-6909. [Multi-Tenant] Use RangerClient for Ranger operations (#3576)
3f3577a452 is described below

commit 3f3577a45290a2a989eb310d56577544c60b8225
Author: Siyao Meng <50...@users.noreply.github.com>
AuthorDate: Wed Jul 13 11:21:00 2022 -0700

    HDDS-6909. [Multi-Tenant] Use RangerClient for Ranger operations (#3576)
---
 .../docs/content/feature/S3-Multi-Tenancy-Setup.md |  27 +-
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   5 +
 .../hadoop/ozone/om/multitenant/AccessPolicy.java  |   2 +-
 .../hadoop/ozone/om/multitenant/OzoneTenant.java   |  10 +-
 .../ozone/om/multitenant/RangerAccessPolicy.java   |   4 +-
 .../apache/hadoop/ozone/om/multitenant/Tenant.java |  12 +-
 .../ozone/om/protocol/OzoneManagerProtocol.java    |  12 +
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  18 +
 .../main/compose/ozonesecure/docker-compose.yaml   |  14 -
 .../src/main/compose/ozonesecure/docker-config     |  26 +-
 .../ozonesecure/mockserverInitialization.json      |  98 ---
 .../smoketest/security/ozone-secure-tenant.robot   |   4 +
 .../ozone/om/multitenant/RangerUserRequest.java    | 278 +++++++
 ...estMultiTenantAccessAuthorizerRangerPlugin.java | 269 -------
 .../om/multitenant/TestRangerBGSyncService.java    | 349 ++++-----
 .../hadoop/ozone/shell/TestOzoneTenantShell.java   |  27 +-
 .../src/main/proto/OmClientProtocol.proto          |  11 +
 .../hadoop/ozone/om/OMMultiTenantManager.java      | 112 ++-
 .../hadoop/ozone/om/OMMultiTenantManagerImpl.java  | 301 ++++----
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  51 +-
 .../InMemoryMultiTenantAccessController.java       |  49 +-
 .../multitenant/MultiTenantAccessAuthorizer.java   | 282 -------
 .../MultiTenantAccessAuthorizerDummyPlugin.java    | 194 -----
 .../MultiTenantAccessAuthorizerRangerPlugin.java   | 854 ---------------------
 .../multitenant/MultiTenantAccessController.java   | 190 +++--
 .../om/multitenant/OMRangerBGSyncService.java      | 190 +++--
 .../RangerClientMultiTenantAccessController.java   | 261 +++++--
 .../RangerRestMultiTenantAccessController.java     |  31 +-
 .../tenant/OMTenantAssignUserAccessIdRequest.java  |   2 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |  15 +
 .../hadoop/ozone/om/TestOMMultiTenantManager.java  |  32 +-
 .../TestMultiTenantAccessController.java           |  97 ++-
 .../org/apache/hadoop/ozone/admin/om/OMAdmin.java  |   5 +-
 .../ozone/admin/om/UpdateRangerSubcommand.java     |  96 +++
 34 files changed, 1617 insertions(+), 2311 deletions(-)

diff --git a/hadoop-hdds/docs/content/feature/S3-Multi-Tenancy-Setup.md b/hadoop-hdds/docs/content/feature/S3-Multi-Tenancy-Setup.md
index 0965b23fd5..24ecedfb43 100644
--- a/hadoop-hdds/docs/content/feature/S3-Multi-Tenancy-Setup.md
+++ b/hadoop-hdds/docs/content/feature/S3-Multi-Tenancy-Setup.md
@@ -42,17 +42,34 @@ Follow [this guide]({{< ref "interface/S3.md" >}}) the cluster to set up at leas
 First make sure ACL is enabled, and `RangerOzoneAuthorizer` is the effective ACL authorizer implementation in-use for Ozone.
 If that is not the case, [follow this]({{< ref "security/SecurityWithRanger.md" >}}). 
 
-Then add the following configs to all Ozone Managers' `ozone-site.xml`:
+Add the following configs to all Ozone Managers' `ozone-site.xml`:
 
 ```xml
 <property>
-   <name>ozone.om.multitenancy.enabled</name>
-   <value>true</value>
+	<name>ozone.om.multitenancy.enabled</name>
+	<value>true</value>
 </property>
 <property>
 	<name>ozone.om.ranger.https-address</name>
-	<value>https://RANGER_HOSTNAME:6182</value>
+	<value>https://RANGER_HOST:6182</value>
 </property>
+<property>
+	<name>ozone.om.ranger.service</name>
+	<value>RANGER_OZONE_SERVICE_NAME</value>
+</property>
+```
+
+The value of `ozone.om.ranger.service` should match Ozone's "Service Name" as configured under "Service Manager" page in Ranger Admin Server Web UI. e.g. `cm_ozone`.
+
+To authenticate to Apache Ranger Admin Server, `ozone.om.kerberos.principal` and `ozone.om.kerberos.keytab.file` will be picked up from the existing configs used for Kerberos security setup.
+
+- Note: Make sure the user behind the Kerberos principal (e.g. `om`) has Admin privilege in Ranger, otherwise some functionality will break.
+This is a limitation of Apache Ranger at the moment.
+e.g. background sync won't be able to get the policyVersion to function properly, and create/update/delete Ranger role will fail.
+
+In addition, if one wants to test Ranger with user name and clear text password login (not recommended in production), add the following configs to Ozone Manager:
+
+```xml
 <property>
 	<name>ozone.om.ranger.https.admin.api.user</name>
 	<value>RANGER_ADMIN_USERNAME</value>
@@ -63,6 +80,8 @@ Then add the following configs to all Ozone Managers' `ozone-site.xml`:
 </property>
 ```
 
+Note if both Ranger user name and password are configured, it will be chosen over the (default and recommended) Kerberos keytab authentication method.
+
 Finally restart all OzoneManagers to apply the new configs.
 
 Now you can follow the [Multi-Tenancy CLI command]({{< ref "feature/S3-Tenant-Commands.md" >}}) guide to try the commands. 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 853a14aa1e..a5bfb81c7b 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -267,6 +267,11 @@ public final class OmUtils {
     case ListTenant:
     case TenantGetUserInfo:
     case TenantListUser:
+    case RangerBGSync:
+      // RangerBGSync is a read operation in the sense that it doesn't directly
+      // write to OM DB. And therefore it doesn't need a OMClientRequest.
+      // Although indirectly the Ranger sync service task could invoke write
+      // operation SetRangerServiceVersion.
       return true;
     case CreateVolume:
     case SetVolumeProperty:
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/AccessPolicy.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/AccessPolicy.java
index c1f3f945c8..ee64d5ae09 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/AccessPolicy.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/AccessPolicy.java
@@ -89,7 +89,7 @@ public interface AccessPolicy {
    * @param id This would be policy-ID that an external/native authorizer
    *           could return.
    */
-  void setPolicyID(String id);
+  void setPolicyName(String id);
 
   String getPolicyID();
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenant.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenant.java
index 3e26f9d8f0..d4364a606a 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenant.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenant.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.ozone.om.multitenant.impl.SingleVolumeTenantNamespace;
 public class OzoneTenant implements Tenant {
   private final String tenantId;
   private final List<String> tenantRoleNames;
-  private final List<AccessPolicy> accessPolicies;
+  private final List<String> accessPolicies;
   private final AccountNameSpace accountNameSpace;
   private final BucketNameSpace bucketNameSpace;
 
@@ -42,7 +42,7 @@ public class OzoneTenant implements Tenant {
   }
 
   @Override
-  public String getTenantId() {
+  public String getTenantName() {
     return tenantId;
   }
 
@@ -57,17 +57,17 @@ public class OzoneTenant implements Tenant {
   }
 
   @Override
-  public List<AccessPolicy> getTenantAccessPolicies() {
+  public List<String> getTenantAccessPolicies() {
     return accessPolicies;
   }
 
   @Override
-  public void addTenantAccessPolicy(AccessPolicy policy) {
+  public void addTenantAccessPolicy(String policy) {
     accessPolicies.add(policy);
   }
 
   @Override
-  public void removeTenantAccessPolicy(AccessPolicy policy) {
+  public void removeTenantAccessPolicy(String policy) {
     accessPolicies.remove(policy);
   }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java
index c4b80c7e17..cebb540ba6 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java
@@ -55,7 +55,7 @@ public class RangerAccessPolicy implements AccessPolicy {
     roleList = new HashSet<>();
   }
 
-  public void setPolicyID(String id) {
+  public void setPolicyName(String id) {
     policyID = id;
   }
 
@@ -89,7 +89,7 @@ public class RangerAccessPolicy implements AccessPolicy {
 
   @Override
   public String deserializePolicyFromJsonString(JsonObject jsonObject) {
-    setPolicyID(jsonObject.get("id").getAsString());
+    setPolicyName(jsonObject.get("id").getAsString());
     try {
       JsonArray policyItems = jsonObject
           .getAsJsonArray("policyItems");
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/Tenant.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/Tenant.java
index 30424fe8fb..5f02c28706 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/Tenant.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/Tenant.java
@@ -32,7 +32,7 @@ public interface Tenant {
    * A tenant is represented by a globally unique tenant name.
    * @return tenant name.
    */
-  String getTenantId();
+  String getTenantName();
 
   /**
    * Return the AccountNameSpace for the Tenant.
@@ -45,15 +45,15 @@ public interface Tenant {
    */
   BucketNameSpace getTenantBucketNameSpace();
 
-  List<AccessPolicy> getTenantAccessPolicies();
+  List<String> getTenantAccessPolicies();
 
-  void addTenantAccessPolicy(AccessPolicy policy);
+  void addTenantAccessPolicy(String policy);
 
-  void removeTenantAccessPolicy(AccessPolicy policy);
+  void removeTenantAccessPolicy(String policy);
 
-  void addTenantAccessRole(String groupID);
+  void addTenantAccessRole(String roleName);
 
-  void removeTenantAccessRole(String groupID);
+  void removeTenantAccessRole(String roleName);
 
   List<String> getTenantRoles();
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 6017121706..171f515bbb 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -391,6 +391,18 @@ public interface OzoneManagerProtocol
 
   ServiceInfoEx getServiceInfo() throws IOException;
 
+  /**
+   * Triggers Ranger background sync task immediately.
+   *
+   * Requires Ozone administrator privilege.
+   *
+   * @param noWait set to true if client won't wait for the result.
+   * @return true if noWait is true or when task completed successfully,
+   *         false otherwise.
+   * @throws IOException OMException (e.g. PERMISSION_DENIED)
+   */
+  boolean triggerRangerBGSync(boolean noWait) throws IOException;
+
   /**
    * Initiate metadata upgrade finalization.
    * This method when called, initiates finalization of Ozone Manager metadata
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index b24fa238c2..5bc7784f8e 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -141,6 +141,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneFileStatusProto;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerBGSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerBGSyncResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverTrashRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverTrashResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RemoveAclRequest;
@@ -1447,6 +1449,22 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
         resp.getCaCertificate(), resp.getCaCertsList());
   }
 
+  @Override
+  public boolean triggerRangerBGSync(boolean noWait) throws IOException {
+    RangerBGSyncRequest req = RangerBGSyncRequest.newBuilder()
+        .setNoWait(noWait)
+        .build();
+
+    OMRequest omRequest = createOMRequest(Type.RangerBGSync)
+        .setRangerBGSyncRequest(req)
+        .build();
+
+    RangerBGSyncResponse resp = handleError(submitRequest(omRequest))
+        .getRangerBGSyncResponse();
+
+    return resp.getRunSuccess();
+  }
+
   @Override
   public StatusAndMessages finalizeUpgrade(String upgradeClientID)
       throws IOException {
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
index edfa6e6ccf..5e3c0c4be3 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
@@ -109,17 +109,3 @@ services:
       OZONE-SITE.XML_hdds.scm.safemode.min.datanode: "${OZONE_SAFEMODE_MIN_DATANODES:-1}"
       OZONE_OPTS:
     command: ["/opt/hadoop/bin/ozone","scm"]
-  ranger:
-    image: mockserver/mockserver:mockserver-5.13.2
-    hostname: ranger
-    volumes:
-      - ./mockserverInitialization.json:/config/mockserverInitialization.json
-    ports:
-      - "6080:6080"
-      - "6182:6182"
-    environment:
-      MOCKSERVER_MAX_EXPECTATIONS: 100
-      MOCKSERVER_MAX_HEADER_SIZE: 8192
-      MOCKSERVER_WATCH_INITIALIZATION_JSON: "true"
-      MOCKSERVER_INITIALIZATION_JSON_PATH: /config/mockserverInitialization.json
-    command: -logLevel DEBUG -serverPort 6080
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index 5b69b8454b..030cd79ec2 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -132,15 +132,31 @@ OZONE_LOG_DIR=/var/log/hadoop
 
 no_proxy=om,scm,recon,s3g,kdc,localhost,127.0.0.1
 
+# Multi-Tenancy configs
 OZONE-SITE.XML_ozone.om.multitenancy.enabled=true
-OZONE-SITE.XML_ozone.om.ranger.https-address=http://ranger:6080
+OZONE-SITE.XML_ozone.om.ranger.service=cm_ozone
 
+# Note: Ranger address and credentials here doesn't matter when OM uses
+# InMemoryMultiTenantAccessController (used when dev flag is set).
+# But the values can't be empty otherwise OM config check would report failure.
+OZONE-SITE.XML_ozone.om.ranger.https-address=https://ranger:6182
 OZONE-SITE.XML_ozone.om.ranger.https.admin.api.user=admin
-OZONE-SITE.XML_ozone.om.ranger.https.admin.api.passwd=passwd
+OZONE-SITE.XML_ozone.om.ranger.https.admin.api.passwd=Passwd1
 
-# Note: ozone.om.kerberos.principal and ozone.om.kerberos.keytab.file
-# (which are required for the Multi-Tenancy Ranger Java client) are already
-# properly defined above.
+# ozone.om.kerberos.principal and ozone.om.kerberos.keytab.file
+# (can be used for the RangerClient) are already defined above.
 
 OZONE-SITE.XML_ozone.om.multitenancy.ranger.sync.interval=30s
 OZONE-SITE.XML_ozone.om.multitenancy.ranger.sync.timeout=10s
+
+# Use InMemoryMultiTenantAccessController as we don't have Ranger Admin Server here.
+# This is fine with one OM. But for OM HA, each OM would have its own in-memory
+# "Ranger" state as a result.
+# New OM leader's in-memory "Ranger" state won't be consistent with OM DB until
+# a BG sync run has successfully finished because only previously leader OM
+# pushes updates to Ranger.
+#
+# Potential TODO: We could trigger BG sync automatically during OM leadership
+# change or let all OMs write to AccessController if this dev flag is set.
+#
+OZONE-SITE.XML_ozone.om.tenant.dev.skip.ranger=true
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/mockserverInitialization.json b/hadoop-ozone/dist/src/main/compose/ozonesecure/mockserverInitialization.json
deleted file mode 100644
index 8aa8048b0f..0000000000
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/mockserverInitialization.json
+++ /dev/null
@@ -1,98 +0,0 @@
-[
-  {
-    "httpRequest": {
-      "path": "/"
-    },
-    "httpResponse": {
-      "body": "{}"
-    }
-  },
-  {
-    "httpRequest": {
-      "path": "/service/xusers/secure/users*"
-    },
-    "httpResponse": {
-      "body": "{id: 111}"
-    }
-  },
-  {
-    "httpRequest": {
-      "path": "/service/roles/roles"
-    },
-    "httpResponse": {
-      "body": "{id: 222}"
-    }
-  },
-  {
-    "httpRequest": {
-      "path": "/service/roles/roles/name/tenantone-UserRole"
-    },
-    "httpResponse": {
-      "body": "{id: 222, users: []}"
-    }
-  },
-  {
-    "httpRequest": {
-      "path": "/service/roles/roles/name/tenantone-AdminRole"
-    },
-    "httpResponse": {
-      "body": "{id: 222, users: []}"
-    }
-  },
-  {
-    "httpRequest": {
-      "path": "/service/roles/roles/222"
-    },
-    "httpResponse": {
-      "body": "{id: 222}"
-    }
-  },
-  {
-    "httpRequest": {
-      "path": "/service/public/v2/api/policy"
-    },
-    "httpResponse": {
-      "body": "{id: 333}"
-    }
-  },
-  {
-    "httpRequest": {
-      "path": "/service/public/v2/api/policy/*"
-    },
-    "httpResponse": {
-      "body": "[{id: 444}]"
-    }
-  },
-  {
-    "httpRequest": {
-      "path": "/service/plugins/services/"
-    },
-    "httpResponse": {
-      "body": "{\"startIndex\":0,\"pageSize\":200,\"totalCount\":13,\"resultSize\":13,\"sortType\":\"asc\",\"sortBy\":\"serviceId\",\"queryTimeMS\":1651104831041,\"services\":[{\"id\":7,\"guid\":\"b6cbaf6c-3911-4fa6-aeed-60dece4b111b\",\"isEnabled\":true,\"createdBy\":\"Admin\",\"updatedBy\":\"Admin\",\"createTime\":1651040438000,\"updateTime\":1651040438000,\"version\":1,\"type\":\"ozone\",\"name\":\"cm_ozone\",\"displayName\":\"cm_ozone\",\"description\":\"Ozone repo\",\"tagService\":\ [...]
-    }
-  },
-  {
-    "httpRequest": {
-      "path": "/service/plugins/services/7"
-    },
-    "httpResponse": {
-      "body": "{\"id\":7,\"guid\":\"2a83c846-31ed-4882-b987-57a4c7c28867\",\"isEnabled\":true,\"createdBy\":\"Admin\",\"updatedBy\":\"Admin\",\"createTime\":1649339219000,\"updateTime\":1649339219000,\"version\":1,\"type\":\"ozone\",\"name\":\"cm_ozone\",\"displayName\":\"cm_ozone\",\"description\":\"Ozone repo\",\"tagService\":\"cm_tag\",\"configs\":{\"setup.additional.default.policies\":\"true\",\"hadoop.security.authentication\":\"kerberos\",\"ozone.om.http-address\":\"http://localhos [...]
-    }
-  },
-  {
-    "httpRequest": {
-      "path": "/service/plugins/policies/service/7"
-    },
-    "httpResponse": {
-      "body": "{\"id\":7,\"guid\":\"2a83c846-31ed-4882-b987-57a4c7c28867\",\"isEnabled\":true,\"createdBy\":\"Admin\",\"updatedBy\":\"Admin\",\"createTime\":1649339219000,\"updateTime\":1649339219000,\"version\":1,\"type\":\"ozone\",\"name\":\"cm_ozone\",\"displayName\":\"cm_ozone\",\"description\":\"Ozone repo\",\"tagService\":\"cm_tag\",\"configs\":{\"setup.additional.default.policies\":\"true\",\"hadoop.security.authentication\":\"kerberos\",\"ozone.om.http-address\":\"http://localhos [...]
-    }
-  },
-  {
-    "httpRequest": {
-      "path": "/service/plugins/policies/444"
-    },
-    "httpResponse": {
-      "body": "{id: 444}"
-    }
-  }
-]
diff --git a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
index babc010cd6..41237b84cb 100644
--- a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
@@ -94,6 +94,10 @@ Delete Tenant Failure Tenant Not Empty
     ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant delete tenantone
                         Should contain   ${output}         TENANT_NOT_EMPTY Tenant 'tenantone' is not empty. All accessIds associated to this tenant must be revoked before the tenant can be deleted. See `ozone tenant user revoke`
 
+Trigger and wait for background Sync to recover Policies and Roles in Authorizer
+    ${rc}  ${output} =  Run And Return Rc And Output  ozone admin om updateranger -host=om
+                        Should contain   ${output}         Operation completed successfully
+
 Create Tenant Failure with Regular User
     Run Keyword         Kinit test user     testuser2    testuser2.keytab
     ${rc}  ${output} =  Run And Return Rc And Output  ozone tenant create tenanttwo
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/RangerUserRequest.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/RangerUserRequest.java
new file mode 100644
index 0000000000..b3327e47d1
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/RangerUserRequest.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.multitenant;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+import org.apache.kerby.util.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+
+import static java.net.HttpURLConnection.HTTP_OK;
+// TODO: MOVE THIS HERE.
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_CREATE_USER_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_DELETE_USER_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_USER_HTTP_ENDPOINT;
+
+/**
+ * Helper class to create and delete users in Ranger because RangerClient
+ * doesn't support it yet.
+ */
+public class RangerUserRequest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RangerUserRequest.class);
+
+  private String rangerEndpoint;
+
+  // Value of HTTP auth header
+  private String authHeaderValue;
+
+  private int connectionTimeout = 5000;
+  private int connectionRequestTimeout = 5000;
+
+  RangerUserRequest(String rangerHttpsAddress, String userName, String passwd) {
+
+    // Trim trailing slash
+    if (rangerHttpsAddress.endsWith("/")) {
+      rangerHttpsAddress =
+          rangerHttpsAddress.substring(0, rangerHttpsAddress.length() - 1);
+    }
+    this.rangerEndpoint = rangerHttpsAddress;
+
+    String auth = userName + ":" + passwd;
+    byte[] encodedAuth =
+        Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
+    authHeaderValue = "Basic " +
+        new String(encodedAuth, StandardCharsets.UTF_8);
+
+    setupRangerIgnoreServerCertificate();
+  }
+
+  private void setupRangerIgnoreServerCertificate() {
+    // Create a trust manager that does not validate certificate chains
+    TrustManager[] trustAllCerts = new TrustManager[]{
+        new X509TrustManager() {
+          public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+            return null;
+          }
+          public void checkClientTrusted(
+              java.security.cert.X509Certificate[] certs, String authType) {
+          }
+          public void checkServerTrusted(
+              java.security.cert.X509Certificate[] certs, String authType) {
+          }
+        }
+    };
+
+    try {
+      SSLContext sc = SSLContext.getInstance("SSL");
+      sc.init(null, trustAllCerts, new java.security.SecureRandom());
+      HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+    } catch (Exception e) {
+      LOG.info("Setting DefaultSSLSocketFactory failed.");
+    }
+  }
+
+  private HttpURLConnection openURLConnection(URL url) throws IOException {
+    final HttpURLConnection urlConnection;
+    if (url.getProtocol().equals("https")) {
+      urlConnection = (HttpsURLConnection) url.openConnection();
+    } else if (url.getProtocol().equals("http")) {
+      urlConnection = (HttpURLConnection) url.openConnection();
+    } else {
+      throw new IOException("Unsupported protocol: " + url.getProtocol() +
+          "URL: " + url);
+    }
+    return urlConnection;
+  }
+
+  /**
+   * Can make either http or https request.
+   */
+  private HttpURLConnection makeHttpCall(String urlString,
+      String jsonInputString, String method, boolean isSpnego)
+      throws IOException {
+
+    URL url = new URL(urlString);
+    final HttpURLConnection urlConnection = openURLConnection(url);
+
+    urlConnection.setRequestMethod(method);
+    // Timeout in ms
+    urlConnection.setConnectTimeout(connectionTimeout);
+    // Timeout in ms
+    urlConnection.setReadTimeout(connectionRequestTimeout);
+    urlConnection.setRequestProperty("Accept", "application/json");
+    urlConnection.setRequestProperty("Authorization", authHeaderValue);
+
+    if ((jsonInputString != null) && !jsonInputString.isEmpty()) {
+      urlConnection.setDoOutput(true);
+      urlConnection.setRequestProperty("Content-Type", "application/json;");
+      try (OutputStream os = urlConnection.getOutputStream()) {
+        byte[] input = jsonInputString.getBytes(StandardCharsets.UTF_8);
+        os.write(input, 0, input.length);
+        os.flush();
+      }
+    }
+
+    return urlConnection;
+  }
+
+  private String getCreateUserJsonStr(String userName, String password) {
+    return "{"
+        + "  \"name\":\"" +  userName + "\","
+        + "  \"password\":\"" +  password + "\","
+        + "  \"firstName\":\"" +  userName + "\","
+        + "  \"userRoleList\":[\"ROLE_USER\"]"
+        + "}";
+  }
+
+  private String getResponseData(HttpURLConnection urlConnection)
+      throws IOException {
+    StringBuilder response = new StringBuilder();
+    try (BufferedReader br = new BufferedReader(
+        new InputStreamReader(urlConnection.getInputStream(),
+            StandardCharsets.UTF_8))) {
+      String responseLine;
+      while ((responseLine = br.readLine()) != null) {
+        response.append(responseLine.trim());
+      }
+      LOG.debug("Got response: {}", response);
+      // TODO: throw if urlConnection code is 400?
+    } catch (IOException e) {
+      // Common exceptions:
+      // 1. Server returned HTTP response code: 401
+      //   - Possibly incorrect Ranger credentials
+      // 2. Server returned HTTP response code: 400
+      //   - Policy or role does not exist
+      switch (urlConnection.getResponseCode()) {
+      case 400:
+        LOG.error("The policy or role likely does not exist in Ranger");
+        return null;
+      case 401:
+        LOG.error("Check Ranger credentials");
+        //        break;
+      default:
+        e.printStackTrace();
+        throw e;
+      }
+    }
+    return response.toString();
+  }
+
+  private HttpURLConnection makeHttpGetCall(String urlString,
+      String method, boolean isSpnego) throws IOException {
+
+    URL url = new URL(urlString);
+    final HttpURLConnection urlConnection = openURLConnection(url);
+
+    urlConnection.setRequestMethod(method);
+    urlConnection.setConnectTimeout(connectionTimeout);
+    urlConnection.setReadTimeout(connectionRequestTimeout);
+    urlConnection.setRequestProperty("Accept", "application/json");
+    urlConnection.setRequestProperty("Authorization", authHeaderValue);
+
+    return urlConnection;
+  }
+
+  public String getUserId(String userPrincipal) throws IOException {
+    String rangerAdminUrl =
+        rangerEndpoint + OZONE_OM_RANGER_ADMIN_GET_USER_HTTP_ENDPOINT +
+            userPrincipal;
+
+    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl,
+        "GET", false);
+    String response = getResponseData(conn);
+    String userIDCreated = null;
+    try {
+      JsonObject jResonse = JsonParser.parseString(response).getAsJsonObject();
+      JsonArray userinfo = jResonse.get("vXUsers").getAsJsonArray();
+      int numIndex = userinfo.size();
+      for (int i = 0; i < numIndex; ++i) {
+        if (userinfo.get(i).getAsJsonObject().get("name").getAsString()
+            .equals(userPrincipal)) {
+          userIDCreated =
+              userinfo.get(i).getAsJsonObject().get("id").getAsString();
+          break;
+        }
+      }
+      LOG.debug("User ID is: {}", userIDCreated);
+    } catch (JsonParseException e) {
+      e.printStackTrace();
+      throw e;
+    }
+    return userIDCreated;
+  }
+
+  public String createUser(String userName, String password)
+      throws IOException {
+
+    String endpointUrl =
+        rangerEndpoint + OZONE_OM_RANGER_ADMIN_CREATE_USER_HTTP_ENDPOINT;
+
+    String jsonData = getCreateUserJsonStr(userName, password);
+
+    final HttpURLConnection conn = makeHttpCall(endpointUrl,
+        jsonData, "POST", false);
+    if (conn.getResponseCode() != HTTP_OK) {
+      throw new IOException("Ranger REST API failure: " + conn.getResponseCode()
+          + " " + conn.getResponseMessage()
+          + ". User name '" + userName + "' likely already exists in Ranger");
+    }
+    String userInfo = getResponseData(conn);
+    String userId;
+    try {
+      assert userInfo != null;
+      JsonObject jObject = JsonParser.parseString(userInfo).getAsJsonObject();
+      userId = jObject.get("id").getAsString();
+      LOG.debug("Ranger returned userId: {}", userId);
+    } catch (JsonParseException e) {
+      e.printStackTrace();
+      throw e;
+    }
+    return userId;
+  }
+
+  public void deleteUser(String userId) throws IOException {
+
+    String rangerAdminUrl =
+        rangerEndpoint + OZONE_OM_RANGER_ADMIN_DELETE_USER_HTTP_ENDPOINT
+            + userId + "?forceDelete=true";
+
+    HttpURLConnection conn = makeHttpCall(rangerAdminUrl, null,
+        "DELETE", false);
+    int respnseCode = conn.getResponseCode();
+    if (respnseCode != 200 && respnseCode != 204) {
+      throw new IOException("Couldn't delete user " + userId);
+    }
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessAuthorizerRangerPlugin.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessAuthorizerRangerPlugin.java
deleted file mode 100644
index 0f9ccb44e5..0000000000
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessAuthorizerRangerPlugin.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.om.multitenant;
-
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.om.multitenant.AccessPolicy.AccessGrantType.ALLOW;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.CREATE;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.LIST;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.NONE;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ_ACL;
-import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.BUCKET;
-import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.KEY;
-import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.VOLUME;
-import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.om.OMMultiTenantManager;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
-import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Tests TestMultiTenantGateKeeperImplWithRanger.
- * Marking it as Ignore because it needs Ranger access point.
- */
-public class TestMultiTenantAccessAuthorizerRangerPlugin {
-  private static final Logger LOG = LoggerFactory
-      .getLogger(TestMultiTenantAccessAuthorizerRangerPlugin.class);
-
-  /**
-   * Set a timeout for each test.
-   */
-  @Rule
-  public Timeout timeout = new Timeout(300000);
-
-  // The following values need to be set before this test can be enabled.
-  private static final String RANGER_ENDPOINT = "";
-  private static final String RANGER_ENDPOINT_USER = "";
-  private static final String RANGER_ENDPOINT_USER_PASSWD = "";
-
-  private List<String> usersIdsCreated = new ArrayList<String>();
-  private List<String> groupIdsCreated = new ArrayList<String>();
-  private List<String> policyIdsCreated = new ArrayList<String>();
-
-  private static OzoneConfiguration conf;
-
-  @BeforeClass
-  public static void init() throws Exception {
-    conf = new OzoneConfiguration();
-    simulateOzoneSiteXmlConfig();
-  }
-
-  @AfterClass
-  public static void shutdown() {
-  }
-
-  private static void simulateOzoneSiteXmlConfig() {
-    conf.setStrings(OZONE_RANGER_HTTPS_ADDRESS_KEY, RANGER_ENDPOINT);
-    conf.setStrings(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER, RANGER_ENDPOINT_USER);
-    conf.setStrings(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
-        RANGER_ENDPOINT_USER_PASSWD);
-  }
-
-  @Test
-  @Ignore("TODO:Requires (mocked) Ranger endpoint")
-  public void testMultiTenantAccessAuthorizerRangerPlugin() throws Exception {
-    simulateOzoneSiteXmlConfig();
-    final MultiTenantAccessAuthorizer omm =
-        new MultiTenantAccessAuthorizerRangerPlugin();
-    omm.init(conf);
-
-    try {
-      OzoneTenantRolePrincipal adminRole =
-          new OzoneTenantRolePrincipal("tenant1-AdminRole");
-      OzoneTenantRolePrincipal userRole =
-          new OzoneTenantRolePrincipal("tenant1-UserRole");
-
-      String userPrincipal = "user1Test";
-      usersIdsCreated.add(
-          omm.assignUserToRole(userPrincipal, userRole.getName(), false));
-      usersIdsCreated.add(
-          omm.assignUserToRole(userPrincipal, adminRole.getName(), true));
-
-      AccessPolicy tenant1VolumeAccessPolicy = createVolumeAccessPolicy(
-          "vol1", "tenant1");
-      policyIdsCreated.add(omm.createAccessPolicy(tenant1VolumeAccessPolicy));
-
-      AccessPolicy tenant1BucketCreatePolicy = allowCreateBucketPolicy(
-          "vol1", "tenant1");
-      policyIdsCreated.add(omm.createAccessPolicy(tenant1BucketCreatePolicy));
-
-      AccessPolicy tenant1BucketAccessPolicy = allowAccessBucketPolicy(
-          "vol1", "bucket1", "tenant1");
-      policyIdsCreated.add(omm.createAccessPolicy(tenant1BucketAccessPolicy));
-
-      AccessPolicy tenant1KeyAccessPolicy = allowAccessKeyPolicy(
-          "vol1", "bucket1", "tenant1");
-      policyIdsCreated.add(omm.createAccessPolicy(tenant1KeyAccessPolicy));
-
-    } catch (Exception e) {
-      Assert.fail(e.getMessage());
-    } finally {
-      for (String id : policyIdsCreated) {
-        omm.deletePolicyById(id);
-      }
-      for (String id : usersIdsCreated) {
-        omm.deleteUser(id);
-      }
-      for (String id : groupIdsCreated) {
-        omm.deleteRoleById(id);
-      }
-    }
-  }
-
-  @Test
-  @Ignore("TODO:Requires (mocked) Ranger endpoint")
-  public void testMultiTenantAccessAuthorizerRangerPluginWithoutIds()
-      throws Exception {
-    String userPrincipal = null;
-    simulateOzoneSiteXmlConfig();
-    final MultiTenantAccessAuthorizer omm =
-        new MultiTenantAccessAuthorizerRangerPlugin();
-    omm.init(conf);
-
-    try {
-      Assert.assertTrue(policyIdsCreated.size() == 0);
-      OzoneTenantRolePrincipal group1Principal = new OzoneTenantRolePrincipal(
-          OMMultiTenantManager.getDefaultAdminRoleName("tenant1"));
-      OzoneTenantRolePrincipal group2Principal = new OzoneTenantRolePrincipal(
-          OMMultiTenantManager.getDefaultUserRoleName("tenant1"));
-      omm.createRole(group1Principal.getName(), null);
-      groupIdsCreated.add(omm.getRole(group1Principal));
-      omm.createRole(group2Principal.getName(), group1Principal.getName());
-      groupIdsCreated.add(omm.getRole(group2Principal));
-
-      userPrincipal = "user1Test";
-      omm.assignUserToRole(userPrincipal, group2Principal.getName(), false);
-
-      AccessPolicy tenant1VolumeAccessPolicy = createVolumeAccessPolicy(
-          "vol1", "tenant1");
-      omm.createAccessPolicy(tenant1VolumeAccessPolicy);
-      policyIdsCreated.add(tenant1VolumeAccessPolicy.getPolicyName());
-
-      AccessPolicy tenant1BucketCreatePolicy = allowCreateBucketPolicy(
-          "vol1", "tenant1");
-      omm.createAccessPolicy(tenant1BucketCreatePolicy);
-      policyIdsCreated.add(tenant1BucketCreatePolicy.getPolicyName());
-
-      AccessPolicy tenant1BucketAccessPolicy = allowAccessBucketPolicy(
-          "vol1", "bucket1", "tenant1");
-      omm.createAccessPolicy(tenant1BucketAccessPolicy);
-      policyIdsCreated.add(tenant1BucketAccessPolicy.getPolicyName());
-
-      AccessPolicy tenant1KeyAccessPolicy = allowAccessKeyPolicy(
-          "vol1", "bucket1", "tenant1");
-      omm.createAccessPolicy(tenant1KeyAccessPolicy);
-      policyIdsCreated.add(tenant1KeyAccessPolicy.getPolicyName());
-
-    } catch (Exception e) {
-      Assert.fail(e.getMessage());
-    } finally {
-      for (String name : policyIdsCreated) {
-        omm.deletePolicyByName(name);
-      }
-      String userId = omm.getUserId(userPrincipal);
-      omm.deleteUser(userId);
-      for (String id : groupIdsCreated) {
-        omm.deleteRoleById(id);
-      }
-    }
-  }
-
-  private AccessPolicy createVolumeAccessPolicy(String vol, String tenantId)
-      throws IOException {
-    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
-        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
-    AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
-        OMMultiTenantManager.getDefaultBucketNamespacePolicyName(tenantId));
-    OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
-        .setResType(VOLUME).setStoreType(OZONE).setVolumeName(vol)
-        .setBucketName("").setKeyName("").build();
-    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal, READ, ALLOW);
-    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal, LIST, ALLOW);
-    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal,
-        READ_ACL, ALLOW);
-    return tenantVolumeAccessPolicy;
-  }
-
-  private AccessPolicy allowCreateBucketPolicy(String vol, String tenantId)
-      throws IOException {
-    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
-        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
-    AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
-        OMMultiTenantManager.getDefaultBucketPolicyName(tenantId));
-    OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
-        .setResType(BUCKET).setStoreType(OZONE).setVolumeName(vol)
-        .setBucketName("*").setKeyName("").build();
-    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal, CREATE, ALLOW);
-    return tenantVolumeAccessPolicy;
-  }
-
-  private AccessPolicy allowAccessBucketPolicy(String vol, String bucketName,
-      String tenantId) throws IOException {
-    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
-        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
-    AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
-        principal.getName() + "AllowBucketAccess" + vol + bucketName +
-            "Policy");
-    OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
-        .setResType(BUCKET).setStoreType(OZONE).setVolumeName(vol)
-        .setBucketName(bucketName).setKeyName("*").build();
-    for (ACLType acl :ACLType.values()) {
-      if (acl != NONE) {
-        tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal, acl,
-            ALLOW);
-      }
-    }
-    return tenantVolumeAccessPolicy;
-  }
-
-  private AccessPolicy allowAccessKeyPolicy(String vol, String bucketName,
-      String tenantId) throws IOException {
-    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
-        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
-    AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
-        principal.getName() + "AllowBucketKeyAccess" + vol + bucketName +
-            "Policy");
-    OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
-        .setResType(KEY).setStoreType(OZONE).setVolumeName(vol)
-        .setBucketName(bucketName).setKeyName("*").build();
-    for (ACLType acl :ACLType.values()) {
-      if (acl != NONE) {
-        tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal, acl,
-            ALLOW);
-      }
-    }
-    return tenantVolumeAccessPolicy;
-  }
-}
-
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestRangerBGSyncService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestRangerBGSyncService.java
index 2334829601..33c3170a5c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestRangerBGSyncService.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestRangerBGSyncService.java
@@ -19,18 +19,10 @@ package org.apache.hadoop.ozone.om.multitenant;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.om.multitenant.AccessPolicy.AccessGrantType.ALLOW;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.CREATE;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.LIST;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ_ACL;
-import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.BUCKET;
-import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.VOLUME;
-import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_SERVICE;
+import static org.apache.hadoop.ozone.om.OMMultiTenantManager.OZONE_TENANT_RANGER_ROLE_DESCRIPTION;
 import static org.apache.hadoop.security.authentication.util.KerberosName.DEFAULT_MECHANISM;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.framework;
@@ -43,8 +35,10 @@ import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -64,12 +58,14 @@ import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Policy;
+import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Role;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
-import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.util.KerberosName;
 import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ranger.RangerServiceException;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.junit.After;
@@ -86,9 +82,6 @@ import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
 import org.slf4j.event.Level;
 
 /**
@@ -114,14 +107,14 @@ public class TestRangerBGSyncService {
 
   private TemporaryFolder folder = new TemporaryFolder();
 
-  private MultiTenantAccessAuthorizer auth;
+  private MultiTenantAccessController accessController;
   private OMRangerBGSyncService bgSync;
 
-  // List of policy names created in Ranger
+  // List of policy names created in Ranger for the test
   private final List<String> policiesCreated = new ArrayList<>();
-  // List of role ID created in Ranger
+  // List of role names created in Ranger for the test
   private final List<String> rolesCreated = new ArrayList<>();
-  // List of users created in Ranger
+  // List of user names created in Ranger for the test
   private final List<String> usersCreated = new ArrayList<>();
 
   private static OzoneConfiguration conf;
@@ -139,6 +132,7 @@ public class TestRangerBGSyncService {
   private static final String USER_ALICE_SHORT = "alice";
   private UserGroupInformation ugiAlice;
   private static final String USER_BOB_SHORT = "bob";
+  private RangerUserRequest rangerUserRequest;
 
   private static void simulateOzoneSiteXmlConfig() {
     // The following configs need to be set before the test can be enabled.
@@ -148,11 +142,11 @@ public class TestRangerBGSyncService {
     // -Dozone.om.ranger.https.admin.api.user=admin
     // -Dozone.om.ranger.https.admin.api.passwd=passwd
 
-    conf.setStrings(OZONE_RANGER_HTTPS_ADDRESS_KEY,
+    conf.set(OZONE_RANGER_HTTPS_ADDRESS_KEY,
         System.getProperty(OZONE_RANGER_HTTPS_ADDRESS_KEY));
-    conf.setStrings(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER,
+    conf.set(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER,
         System.getProperty(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER));
-    conf.setStrings(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
+    conf.set(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
         System.getProperty(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD));
   }
 
@@ -163,7 +157,8 @@ public class TestRangerBGSyncService {
 
     GenericTestUtils.setLogLevel(OMRangerBGSyncService.LOG, Level.DEBUG);
     GenericTestUtils.setLogLevel(
-        MultiTenantAccessAuthorizerRangerPlugin.LOG, Level.INFO);
+        LoggerFactory.getLogger(RangerClientMultiTenantAccessController.class),
+        Level.INFO);
   }
 
   @AfterClass
@@ -222,13 +217,7 @@ public class TestRangerBGSyncService {
         .thenReturn(OMMultiTenantManager.getDefaultUserRoleName(TENANT_ID));
     when(omMultiTenantManager.getTenantAdminRoleName(TENANT_ID))
         .thenReturn(OMMultiTenantManager.getDefaultAdminRoleName(TENANT_ID));
-    when(omMultiTenantManager.newDefaultVolumeAccessPolicy(eq(TENANT_ID),
-        Mockito.any(OzoneTenantRolePrincipal.class),
-        Mockito.any(OzoneTenantRolePrincipal.class)))
-        .thenReturn(newVolumeAccessPolicy(TENANT_ID, TENANT_ID));
-    when(omMultiTenantManager.newDefaultBucketAccessPolicy(eq(TENANT_ID),
-        Mockito.any(OzoneTenantRolePrincipal.class)))
-        .thenReturn(newBucketAccessPolicy(TENANT_ID, TENANT_ID));
+
     when(omMultiTenantManager.getAuthorizerLock())
         .thenReturn(new AuthorizerLockImpl());
 
@@ -257,8 +246,19 @@ public class TestRangerBGSyncService {
 
     when(tenant.getTenantAccessPolicies()).thenReturn(new ArrayList<>());
 
-    auth = new MultiTenantAccessAuthorizerRangerPlugin();
-    auth.init(conf);
+    System.setProperty("javax.net.ssl.trustStore",
+        "/path/to/cm-auto-global_truststore.jks");
+
+    conf.set(OZONE_RANGER_SERVICE, "cm_ozone");
+
+    // Helper to create and delete users from Ranger for the test.
+    // RangerClient hasn't implemented these yet so we had to roll our own.
+    rangerUserRequest = new RangerUserRequest(
+        conf.get(OZONE_RANGER_HTTPS_ADDRESS_KEY),
+        conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER),
+        conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD));
+
+    accessController = new RangerClientMultiTenantAccessController(conf);
   }
 
   @After
@@ -269,46 +269,22 @@ public class TestRangerBGSyncService {
     framework().clearInlineMocks();
   }
 
-  private AccessPolicy newVolumeAccessPolicy(String vol, String tenantId)
-      throws IOException {
-    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
-        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
-    OzoneTenantRolePrincipal adminRole = new OzoneTenantRolePrincipal(
-        OMMultiTenantManager.getDefaultAdminRoleName(tenantId));
-    AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
-        OMMultiTenantManager.getDefaultBucketNamespacePolicyName(tenantId));
-    OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
-        .setResType(VOLUME).setStoreType(OZONE).setVolumeName(vol)
-        .setBucketName("").setKeyName("").build();
-    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal, READ, ALLOW);
-    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal, LIST, ALLOW);
-    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal,
-        READ_ACL, ALLOW);
-    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, adminRole, ALL, ALLOW);
-    return tenantVolumeAccessPolicy;
-  }
-
-  private AccessPolicy newBucketAccessPolicy(String vol, String tenantId)
-      throws IOException {
-    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
-        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
-    AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
-        OMMultiTenantManager.getDefaultBucketPolicyName(tenantId));
-    OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
-        .setResType(BUCKET).setStoreType(OZONE).setVolumeName(vol)
-        .setBucketName("*").setKeyName("").build();
-    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal, CREATE, ALLOW);
-    return tenantVolumeAccessPolicy;
-  }
-
   long initBGSync() throws IOException {
     bgSync = new OMRangerBGSyncService(ozoneManager,
-        ozoneManager.getMultiTenantManager(), auth,
+        ozoneManager.getMultiTenantManager(), accessController,
         TEST_SYNC_INTERVAL_SEC, TimeUnit.SECONDS, TEST_SYNC_TIMEOUT_SEC);
-    return bgSync.getLatestRangerServiceVersion();
+    return bgSync.getRangerOzoneServicePolicyVersion();
   }
 
-  public void createRolesAndPoliciesInRanger(boolean populateDB) {
+  private void createRoleHelper(Role role) throws IOException {
+    Role roleCreated = accessController.createRole(role);
+    // Confirm role creation
+    Assert.assertEquals(role.getName(), roleCreated.getName());
+    // Add to created roles list
+    rolesCreated.add(0, role.getName());
+  }
+
+  private void createRolesAndPoliciesInRanger(boolean populateDB) {
 
     policiesCreated.clear();
     rolesCreated.clear();
@@ -318,10 +294,10 @@ public class TestRangerBGSyncService {
     // volume name = bucket namespace name
     final String volumeName = tenantId;
 
-    final OzoneTenantRolePrincipal adminRole = new OzoneTenantRolePrincipal(
-        OMMultiTenantManager.getDefaultAdminRoleName(tenantId));
-    final OzoneTenantRolePrincipal userRole = new OzoneTenantRolePrincipal(
-        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
+    final String adminRoleName =
+        OMMultiTenantManager.getDefaultAdminRoleName(tenantId);
+    final String userRoleName =
+        OMMultiTenantManager.getDefaultUserRoleName(tenantId);
     final String bucketNamespacePolicyName =
         OMMultiTenantManager.getDefaultBucketNamespacePolicyName(tenantId);
     final String bucketPolicyName =
@@ -334,7 +310,7 @@ public class TestRangerBGSyncService {
         // Tenant State entry
         omMetadataManager.getTenantStateTable().put(tenantId,
             new OmDBTenantState(
-                tenantId, volumeName, userRole.getName(), adminRole.getName(),
+                tenantId, volumeName, userRoleName, adminRoleName,
                 bucketNamespacePolicyName, bucketPolicyName));
         // Access ID entry for alice
         final String aliceAccessId = OMMultiTenantManager.getDefaultAccessId(
@@ -362,70 +338,77 @@ public class TestRangerBGSyncService {
     }
 
     try {
-      LOG.info("Creating admin role in Ranger: {}", adminRole.getName());
-      auth.createRole(adminRole.getName(), null);
-      String role1 = auth.getRole(adminRole);
-      rolesCreated.add(0, role1);
-    } catch (Exception e) {
+      LOG.info("Creating user in Ranger: {}", USER_ALICE_SHORT);
+      rangerUserRequest.createUser(USER_ALICE_SHORT, "Password12");
+      usersCreated.add(USER_ALICE_SHORT);
+    } catch (IOException e) {
       Assert.fail(e.getMessage());
     }
 
     try {
-      LOG.info("Creating user role in Ranger: {}", userRole.getName());
-      auth.createRole(userRole.getName(), adminRole.getName());
-      String role2 = auth.getRole(userRole);
-      // Prepend user role (order matters when deleting due to dependency)
-      rolesCreated.add(0, role2);
-    } catch (Exception e) {
+      LOG.info("Creating user in Ranger: {}", USER_BOB_SHORT);
+      rangerUserRequest.createUser(USER_BOB_SHORT, "Password12");
+      usersCreated.add(USER_BOB_SHORT);
+    } catch (IOException e) {
       Assert.fail(e.getMessage());
     }
 
     try {
-      LOG.info("Creating user in Ranger: {}", USER_ALICE_SHORT);
-      auth.createUser(USER_ALICE_SHORT, "password1");
-      usersCreated.add(USER_ALICE_SHORT);
-      auth.assignUserToRole(USER_ALICE_SHORT, auth.getRole(userRole), false);
-    } catch (Exception e) {
+      LOG.info("Creating admin role in Ranger: {}", adminRoleName);
+      // Create empty admin role first
+      Role adminRole = new Role.Builder()
+          .setName(adminRoleName)
+          .setDescription(OZONE_TENANT_RANGER_ROLE_DESCRIPTION)
+          .build();
+      createRoleHelper(adminRole);
+    } catch (IOException e) {
       Assert.fail(e.getMessage());
     }
 
     try {
-      LOG.info("Creating user in Ranger: {}", USER_BOB_SHORT);
-      auth.createUser(USER_BOB_SHORT, "password2");
-      usersCreated.add(USER_BOB_SHORT);
-      auth.assignUserToRole(USER_BOB_SHORT, auth.getRole(userRole), false);
-    } catch (Exception e) {
+      LOG.info("Creating user role in Ranger: {}", userRoleName);
+      Role userRole = new Role.Builder()
+          .setName(userRoleName)
+          .setDescription(OZONE_TENANT_RANGER_ROLE_DESCRIPTION)
+          .addRole(adminRoleName, true)
+          // Add alice and bob to the user role
+          .addUsers(Arrays.asList(USER_ALICE_SHORT, USER_BOB_SHORT))
+          .build();
+      createRoleHelper(userRole);
+    } catch (IOException e) {
       Assert.fail(e.getMessage());
     }
 
     try {
-      AccessPolicy tenant1VolumeAccessPolicy = newVolumeAccessPolicy(
-          volumeName, tenantId);
+      Policy tenant1VolumeAccessPolicy =
+          OMMultiTenantManager.getDefaultVolumeAccessPolicy(
+              tenantId, volumeName, userRoleName, adminRoleName);
       LOG.info("Creating VolumeAccess policy in Ranger: {}",
-          tenant1VolumeAccessPolicy.getPolicyName());
-      auth.createAccessPolicy(tenant1VolumeAccessPolicy);
-      policiesCreated.add(tenant1VolumeAccessPolicy.getPolicyName());
-    } catch (Exception e) {
+          tenant1VolumeAccessPolicy.getName());
+      accessController.createPolicy(tenant1VolumeAccessPolicy);
+      policiesCreated.add(tenant1VolumeAccessPolicy.getName());
+    } catch (IOException e) {
       Assert.fail(e.getMessage());
     }
 
     try {
-      AccessPolicy tenant1BucketCreatePolicy = newBucketAccessPolicy(
-          volumeName, tenantId);
+      Policy tenant1BucketCreatePolicy =
+          OMMultiTenantManager.getDefaultBucketAccessPolicy(
+              tenantId, volumeName, userRoleName);
       LOG.info("Creating BucketAccess policy in Ranger: {}",
-          tenant1BucketCreatePolicy.getPolicyName());
-      auth.createAccessPolicy(tenant1BucketCreatePolicy);
-      policiesCreated.add(tenant1BucketCreatePolicy.getPolicyName());
-    } catch (Exception e) {
+          tenant1BucketCreatePolicy.getName());
+      accessController.createPolicy(tenant1BucketCreatePolicy);
+      policiesCreated.add(tenant1BucketCreatePolicy.getName());
+    } catch (IOException e) {
       Assert.fail(e.getMessage());
     }
   }
 
   public void cleanupPolicies() {
-    for (String name : policiesCreated) {
+    for (String policyName : policiesCreated) {
       try {
-        LOG.info("Deleting policy: {}", name);
-        auth.deletePolicyByName(name);
+        LOG.info("Deleting policy: {}", policyName);
+        accessController.deletePolicy(policyName);
       } catch (Exception e) {
         LOG.error(e.getMessage());
       }
@@ -433,13 +416,10 @@ public class TestRangerBGSyncService {
   }
 
   public void cleanupRoles() {
-    for (String roleObj : rolesCreated) {
-      final JsonObject jObj = new JsonParser().parse(roleObj).getAsJsonObject();
-      final String roleId = jObj.get("id").getAsString();
-      final String roleName = jObj.get("name").getAsString();
+    for (String roleName : rolesCreated) {
       try {
         LOG.info("Deleting role: {}", roleName);
-        auth.deleteRoleById(roleId);
+        accessController.deleteRole(roleName);
       } catch (Exception e) {
         LOG.error(e.getMessage());
       }
@@ -450,8 +430,8 @@ public class TestRangerBGSyncService {
     for (String user : usersCreated) {
       try {
         LOG.info("Deleting user: {}", user);
-        String userId = auth.getUserId(user);
-        auth.deleteUser(userId);
+        String userId = rangerUserRequest.getUserId(user);
+        rangerUserRequest.deleteUser(userId);
       } catch (Exception e) {
         LOG.error(e.getMessage());
       }
@@ -491,7 +471,8 @@ public class TestRangerBGSyncService {
     // backed up by OzoneManger Multi-Tenant tables
     createRolesAndPoliciesInRanger(false);
 
-    final long rangerSvcVersionBefore = bgSync.getLatestRangerServiceVersion();
+    final long rangerSvcVersionBefore =
+        bgSync.getRangerOzoneServicePolicyVersion();
     Assert.assertTrue(rangerSvcVersionBefore >= startingRangerVersion);
 
     // Note: DB Service Version will be -1 if the test starts with an empty DB
@@ -503,7 +484,8 @@ public class TestRangerBGSyncService {
         CHECK_SYNC_MILLIS, WAIT_SYNC_TIMEOUT_MILLIS);
     bgSync.shutdown();
     final long dbSvcVersionAfter = bgSync.getOMDBRangerServiceVersion();
-    final long rangerSvcVersionAfter = bgSync.getLatestRangerServiceVersion();
+    final long rangerSvcVersionAfter =
+        bgSync.getRangerOzoneServicePolicyVersion();
     Assert.assertEquals(rangerSvcVersionAfter, dbSvcVersionAfter);
     Assert.assertTrue(dbSvcVersionAfter > dbSvcVersionBefore);
     Assert.assertTrue(rangerSvcVersionAfter > rangerSvcVersionBefore);
@@ -512,17 +494,31 @@ public class TestRangerBGSyncService {
     // by OzoneManager Multi-Tenancy tables are cleaned up by sync thread
 
     for (String policy : policiesCreated) {
-      final AccessPolicy policyRead = auth.getAccessPolicyByName(policy);
-      Assert.assertNull("This policy should have been deleted from Ranger: " +
-          policy, policyRead);
+      try {
+        final Policy policyRead = accessController.getPolicy(policy);
+        Assert.fail("The policy should have been deleted: " + policyRead);
+      } catch (IOException ex) {
+        if (!(ex.getCause() instanceof RangerServiceException)) {
+          Assert.fail("Expected RangerServiceException, got " +
+              ex.getCause().getClass().getSimpleName());
+        }
+        RangerServiceException rse = (RangerServiceException) ex.getCause();
+        Assert.assertEquals(404, rse.getStatus().getStatusCode());
+      }
     }
 
-    for (String roleObj : rolesCreated) {
-      final String roleName = new JsonParser().parse(roleObj)
-          .getAsJsonObject().get("name").getAsString();
-      final String roleObjRead = auth.getRole(roleName);
-      Assert.assertNull("This role should have been deleted from Ranger: " +
-          roleName, roleObjRead);
+    for (String roleName : rolesCreated) {
+      try {
+        final Role role = accessController.getRole(roleName);
+        Assert.fail("This role should have been deleted from Ranger: " + role);
+      } catch (IOException ex) {
+        if (!(ex.getCause() instanceof RangerServiceException)) {
+          Assert.fail("Expected RangerServiceException, got " +
+              ex.getCause().getClass().getSimpleName());
+        }
+        RangerServiceException rse = (RangerServiceException) ex.getCause();
+        Assert.assertEquals(400, rse.getStatus().getStatusCode());
+      }
     }
   }
 
@@ -539,7 +535,7 @@ public class TestRangerBGSyncService {
     // backed up by OzoneManger Multi-Tenant tables
     createRolesAndPoliciesInRanger(true);
 
-    long rangerSvcVersionBefore = bgSync.getLatestRangerServiceVersion();
+    long rangerSvcVersionBefore = bgSync.getRangerOzoneServicePolicyVersion();
     Assert.assertTrue(rangerSvcVersionBefore >= startingRangerVersion);
 
     // Note: DB Service Version will be -1 if the test starts with an empty DB
@@ -551,32 +547,28 @@ public class TestRangerBGSyncService {
         CHECK_SYNC_MILLIS, WAIT_SYNC_TIMEOUT_MILLIS);
     bgSync.shutdown();
     final long dbSvcVersionAfter = bgSync.getOMDBRangerServiceVersion();
-    final long rangerSvcVersionAfter = bgSync.getLatestRangerServiceVersion();
+    final long rangerSvcVersionAfter =
+        bgSync.getRangerOzoneServicePolicyVersion();
     Assert.assertEquals(rangerSvcVersionAfter, dbSvcVersionAfter);
     Assert.assertEquals(rangerSvcVersionAfter, rangerSvcVersionBefore);
     if (dbSvcVersionBefore != -1L) {
       Assert.assertEquals(dbSvcVersionBefore, dbSvcVersionAfter);
     }
 
-    for (String policy : policiesCreated) {
+    for (String policyName : policiesCreated) {
       try {
-        final AccessPolicy policyRead = auth.getAccessPolicyByName(policy);
-
-        Assert.assertEquals(policy, policyRead.getPolicyName());
+        final Policy policyRead = accessController.getPolicy(policyName);
+        Assert.assertEquals(policyName, policyRead.getName());
       } catch (Exception e) {
         e.printStackTrace();
         Assert.fail(e.getMessage());
       }
     }
 
-    for (String roleObj : rolesCreated) {
+    for (String roleName : rolesCreated) {
       try {
-        final String roleName = new JsonParser().parse(roleObj)
-            .getAsJsonObject().get("name").getAsString();
-        String roleObjRead = auth.getRole(roleName);
-        final String roleNameReadBack = new JsonParser().parse(roleObjRead)
-            .getAsJsonObject().get("name").getAsString();
-        Assert.assertEquals(roleName, roleNameReadBack);
+        final Role roleResponse = accessController.getRole(roleName);
+        Assert.assertEquals(roleName, roleResponse.getName());
       } catch (Exception e) {
         e.printStackTrace();
         Assert.fail(e.getMessage());
@@ -595,16 +587,21 @@ public class TestRangerBGSyncService {
 
     createRolesAndPoliciesInRanger(true);
 
-    long rangerVersionAfterCreation = bgSync.getLatestRangerServiceVersion();
+    long rangerVersionAfterCreation =
+        bgSync.getRangerOzoneServicePolicyVersion();
     Assert.assertTrue(rangerVersionAfterCreation >= startingRangerVersion);
 
-    // Delete a user from user role, expect Ranger sync thread to update it
-    String userRoleName = new JsonParser().parse(rolesCreated.get(0))
-        .getAsJsonObject().get("name").getAsString();
+    // Delete user bob from user role, expect Ranger sync thread to update it
+    String userRoleName = rolesCreated.get(0);
     Assert.assertEquals(
         OMMultiTenantManager.getDefaultUserRoleName(TENANT_ID), userRoleName);
 
-    auth.revokeUserFromRole(USER_BOB_SHORT, auth.getRole(userRoleName));
+    Role userRole = accessController.getRole(userRoleName);
+    // Remove user from role
+    Role updatedRole = new Role.Builder(userRole)
+        .removeUser(USER_BOB_SHORT)
+        .build();
+    accessController.updateRole(userRole.getId().get(), updatedRole);
 
     HashSet<String> userSet = new HashSet<>();
     userSet.add(USER_ALICE_SHORT);
@@ -612,7 +609,8 @@ public class TestRangerBGSyncService {
 
     // Note: DB Service Version will be -1 if the test starts with an empty DB
     final long dbSvcVersionBefore = bgSync.getOMDBRangerServiceVersion();
-    final long rangerSvcVersionBefore = bgSync.getLatestRangerServiceVersion();
+    final long rangerSvcVersionBefore =
+        bgSync.getRangerOzoneServicePolicyVersion();
     final long currRunCount = bgSync.getRangerSyncRunCount();
     bgSync.start();
     // Wait for sync to finish once.
@@ -622,37 +620,26 @@ public class TestRangerBGSyncService {
         CHECK_SYNC_MILLIS, WAIT_SYNC_TIMEOUT_MILLIS);
     bgSync.shutdown();
     final long dbSvcVersionAfter = bgSync.getOMDBRangerServiceVersion();
-    final long rangerSvcVersionAfter = bgSync.getLatestRangerServiceVersion();
+    final long rangerSvcVersionAfter =
+        bgSync.getRangerOzoneServicePolicyVersion();
     Assert.assertEquals(rangerSvcVersionAfter, dbSvcVersionAfter);
     Assert.assertTrue(dbSvcVersionAfter > dbSvcVersionBefore);
     Assert.assertTrue(rangerSvcVersionAfter > rangerSvcVersionBefore);
 
-    for (String policy : policiesCreated) {
-      final AccessPolicy verifier = auth.getAccessPolicyByName(policy);
-      Assert.assertNotNull("Policy should exist in Ranger: " + policy,
-          verifier);
-      Assert.assertEquals(policy, verifier.getPolicyName());
+    for (String policyName : policiesCreated) {
+      final Policy policy = accessController.getPolicy(policyName);
+      Assert.assertNotNull("Policy should exist in Ranger: " + policyName,
+          policy);
+      Assert.assertEquals(policyName, policy.getName());
     }
 
-    for (String role : rolesCreated) {
-      final String roleName = new JsonParser().parse(role).getAsJsonObject()
-          .get("name").getAsString();
+    for (String roleName : rolesCreated) {
       if (!roleName.equals(userRoleName)) {
         continue;
       }
-      final String roleObjRead = auth.getRole(roleName);
-      final JsonObject jsonObj = new JsonParser().parse(roleObjRead)
-          .getAsJsonObject();
-      final JsonArray verifier = jsonObj.get("users").getAsJsonArray();
-      Assert.assertEquals(2, verifier.size());
-      // Verify that users are in the role
-      for (int i = 0; i < verifier.size();  ++i) {
-        String user = verifier.get(i).getAsJsonObject()
-            .get("name").getAsString();
-        Assert.assertTrue(userSet.contains(user));
-        userSet.remove(user);
-      }
-      Assert.assertTrue(userSet.isEmpty());
+      final Role roleRead = accessController.getRole(roleName);
+      final Set<String> usersGot = roleRead.getUsersMap().keySet();
+      Assert.assertEquals(userSet, usersGot);
       break;
     }
   }
@@ -669,16 +656,18 @@ public class TestRangerBGSyncService {
     // backed up by OzoneManger Multi-Tenant tables
     createRolesAndPoliciesInRanger(true);
 
-    long rangerVersionAfterCreation = bgSync.getLatestRangerServiceVersion();
+    long rangerVersionAfterCreation =
+        bgSync.getRangerOzoneServicePolicyVersion();
     Assert.assertTrue(rangerVersionAfterCreation >= startingRangerVersion);
 
     // Delete both policies, expect Ranger sync thread to recover both
-    auth.deletePolicyByName(
+    accessController.deletePolicy(
         OMMultiTenantManager.getDefaultBucketNamespacePolicyName(TENANT_ID));
-    auth.deletePolicyByName(
+    accessController.deletePolicy(
         OMMultiTenantManager.getDefaultBucketPolicyName(TENANT_ID));
 
-    final long rangerSvcVersionBefore = bgSync.getLatestRangerServiceVersion();
+    final long rangerSvcVersionBefore =
+        bgSync.getRangerOzoneServicePolicyVersion();
     // Note: DB Service Version will be -1 if the test starts with an empty DB
     final long dbSvcVersionBefore = bgSync.getOMDBRangerServiceVersion();
     bgSync.start();
@@ -688,30 +677,26 @@ public class TestRangerBGSyncService {
         CHECK_SYNC_MILLIS, WAIT_SYNC_TIMEOUT_MILLIS);
     bgSync.shutdown();
     long dbSvcVersionAfter = bgSync.getOMDBRangerServiceVersion();
-    final long rangerSvcVersionAfter = bgSync.getLatestRangerServiceVersion();
+    final long rangerSvcVersionAfter =
+        bgSync.getRangerOzoneServicePolicyVersion();
     Assert.assertEquals(rangerSvcVersionAfter, dbSvcVersionAfter);
     Assert.assertTrue(dbSvcVersionAfter > dbSvcVersionBefore);
     Assert.assertTrue(rangerSvcVersionAfter > rangerSvcVersionBefore);
 
-    for (String policy : policiesCreated) {
+    for (String policyName : policiesCreated) {
       try {
-        final AccessPolicy policyRead = auth.getAccessPolicyByName(policy);
-
-        Assert.assertEquals(policy, policyRead.getPolicyName());
+        final Policy policyRead = accessController.getPolicy(policyName);
+        Assert.assertEquals(policyName, policyRead.getName());
       } catch (Exception e) {
         e.printStackTrace();
         Assert.fail(e.getMessage());
       }
     }
 
-    for (String roleObj : rolesCreated) {
+    for (String roleName : rolesCreated) {
       try {
-        final String roleName = new JsonParser().parse(roleObj)
-            .getAsJsonObject().get("name").getAsString();
-        String roleObjRead = auth.getRole(roleName);
-        final String roleNameReadBack = new JsonParser().parse(roleObjRead)
-            .getAsJsonObject().get("name").getAsString();
-        Assert.assertEquals(roleName, roleNameReadBack);
+        final Role roleRead = accessController.getRole(roleName);
+        Assert.assertEquals(roleName, roleRead.getName());
       } catch (Exception e) {
         e.printStackTrace();
         Assert.fail(e.getMessage());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
index 6d1ece7f79..5aa019acf7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/shell/TestOzoneTenantShell.java
@@ -26,13 +26,13 @@ import org.apache.hadoop.hdds.cli.GenericCli;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.io.retry.RetryInvocationHandler;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.ha.ConfUtils;
 import org.apache.hadoop.ozone.om.multitenant.AuthorizerLockImpl;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
-import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessAuthorizerRangerPlugin;
+import org.apache.hadoop.ozone.om.multitenant.OMRangerBGSyncService;
 import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantAssignUserAccessIdRequest;
 import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantCreateRequest;
 import org.apache.hadoop.ozone.shell.tenant.TenantShell;
@@ -66,6 +66,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_ENAB
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl.OZONE_OM_TENANT_DEV_SKIP_RANGER;
 import static org.junit.Assert.fail;
 
 /**
@@ -97,6 +98,7 @@ public class TestOzoneTenantShell {
 
   private static OzoneConfiguration conf = null;
   private static MiniOzoneCluster cluster = null;
+  private static MiniOzoneHAClusterImpl haCluster = null;
   private static OzoneShell ozoneSh = null;
   private static TenantShell tenantShell = null;
 
@@ -126,8 +128,7 @@ public class TestOzoneTenantShell {
     }
 
     conf = new OzoneConfiguration();
-    conf.setBoolean(
-        OMMultiTenantManagerImpl.OZONE_OM_TENANT_DEV_SKIP_RANGER, true);
+    conf.setBoolean(OZONE_OM_TENANT_DEV_SKIP_RANGER, true);
     conf.setBoolean(OZONE_OM_MULTITENANCY_ENABLED, true);
 
     if (USE_ACTUAL_RANGER) {
@@ -137,8 +138,7 @@ public class TestOzoneTenantShell {
       conf.set(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
           System.getenv("RANGER_PASSWD"));
     } else {
-      conf.setBoolean(OMMultiTenantManagerImpl.OZONE_OM_TENANT_DEV_SKIP_RANGER,
-          true);
+      conf.setBoolean(OZONE_OM_TENANT_DEV_SKIP_RANGER, true);
     }
 
     String path = GenericTestUtils.getTempPath(
@@ -165,6 +165,7 @@ public class TestOzoneTenantShell {
         .setNumOfOzoneManagers(numOfOMs)
         .withoutDatanodes()  // Remove this once we are actually writing data
         .build();
+    haCluster = (MiniOzoneHAClusterImpl) cluster;
     cluster.waitForClusterToBeReady();
   }
 
@@ -197,9 +198,9 @@ public class TestOzoneTenantShell {
     GenericTestUtils.setLogLevel(OMTenantCreateRequest.LOG, Level.DEBUG);
     GenericTestUtils.setLogLevel(
         OMTenantAssignUserAccessIdRequest.LOG, Level.DEBUG);
-    GenericTestUtils.setLogLevel(
-        MultiTenantAccessAuthorizerRangerPlugin.LOG, Level.DEBUG);
     GenericTestUtils.setLogLevel(AuthorizerLockImpl.LOG, Level.DEBUG);
+
+    GenericTestUtils.setLogLevel(OMRangerBGSyncService.LOG, Level.DEBUG);
   }
 
   @After
@@ -650,6 +651,16 @@ public class TestOzoneTenantShell {
         + "to this tenant must be revoked before the tenant can be deleted. "
         + "See `ozone tenant user revoke`\n", true);
 
+    // Trigger BG sync on OMs to recover roles and policies deleted
+    // by previous tenant delete attempt.
+    // Note: Potential source of flakiness if triggered only on leader OM
+    // in this case.
+    // Because InMemoryMultiTenantAccessController is used in OMs for this
+    // integration test, we need to trigger BG sync on all OMs just
+    // in case a leader changed right after the last operation.
+    haCluster.getOzoneManagersList().forEach(om -> om.getMultiTenantManager()
+        .getOMRangerBGSyncService().triggerRangerSyncOnce());
+
     // Delete dev volume should fail because the volume reference count > 0L
     exitCode = execute(ozoneSh, new String[] {"volume", "delete", "dev"});
     Assert.assertTrue("Volume delete should fail!", exitCode != 0);
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 4b6a8dd342..d2dbffb6d7 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -121,6 +121,7 @@ enum Type {
   SetS3Secret = 106;
 
   SetRangerServiceVersion = 107;
+  RangerBGSync = 109;
 }
 
 message OMRequest {
@@ -224,6 +225,7 @@ message OMRequest {
   optional SetS3SecretRequest               SetS3SecretRequest             = 106;
 
   optional SetRangerServiceVersionRequest   SetRangerServiceVersionRequest = 107;
+  optional RangerBGSyncRequest              RangerBGSyncRequest            = 109;
 }
 
 message OMResponse {
@@ -320,6 +322,7 @@ message OMResponse {
   optional SetS3SecretResponse               SetS3SecretResponse           = 106;
 
   optional SetRangerServiceVersionResponse   SetRangerServiceVersionResponse = 107;
+  optional RangerBGSyncResponse              RangerBGSyncResponse          = 109;
 }
 
 enum Status {
@@ -1254,6 +1257,14 @@ message DBUpdatesResponse {
     optional uint64 latestSequenceNumber = 3;
 }
 
+message RangerBGSyncRequest {
+    optional bool noWait = 1;
+}
+
+message RangerBGSyncResponse {
+    optional bool runSuccess = 1;
+}
+
 message FinalizeUpgradeRequest {
   required string upgradeClientId = 1;
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
index 48640c31f7..de397d031f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
@@ -16,9 +16,11 @@
  */
 package org.apache.hadoop.ozone.om;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -27,22 +29,32 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.helpers.TenantUserList;
-import org.apache.hadoop.ozone.om.multitenant.AccessPolicy;
 import org.apache.hadoop.ozone.om.multitenant.AuthorizerLock;
+import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Acl;
+import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Policy;
 import org.apache.hadoop.ozone.om.multitenant.OMRangerBGSyncService;
-import org.apache.hadoop.ozone.om.multitenant.OzoneTenantRolePrincipal;
+import org.apache.hadoop.ozone.om.multitenant.OzoneOwnerPrincipal;
 import org.apache.hadoop.ozone.om.multitenant.Tenant;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.slf4j.Logger;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_TENANT_RANGER_POLICY_LABEL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_ENABLED;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_SERVICE;
 import static org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl.OZONE_OM_TENANT_DEV_SKIP_RANGER;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.CREATE;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.LIST;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ_ACL;
 
 /**
  * OM MultiTenant manager interface.
@@ -77,7 +89,6 @@ public interface OMMultiTenantManager {
   /**
    * Returns the instance of OMRangerBGSyncService.
    */
-  @VisibleForTesting
   OMRangerBGSyncService getOMRangerBGSyncService();
 
   /**
@@ -282,18 +293,43 @@ public interface OMMultiTenantManager {
           OZONE_RANGER_HTTPS_ADDRESS_KEY);
     }
 
-    // TODO: Do not check Ranger user/passwd if Ranger Java client is enabled
-    final String rangerUser = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER);
-    if (StringUtils.isBlank(rangerUser)) {
+    final String rangerService = conf.get(OZONE_RANGER_SERVICE);
+    if (StringUtils.isBlank(rangerService)) {
       isS3MultiTenancyEnabled = false;
       logger.error("{} is required to enable S3 Multi-Tenancy but not set",
-          OZONE_OM_RANGER_HTTPS_ADMIN_API_USER);
+          OZONE_RANGER_SERVICE);
     }
-    final String rangerPw = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD);
-    if (StringUtils.isBlank(rangerPw)) {
-      isS3MultiTenancyEnabled = false;
-      logger.error("{} is required to enable S3 Multi-Tenancy but not set",
-          OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD);
+
+    String fallbackUsername = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER);
+    String fallbackPassword = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD);
+
+    if (fallbackUsername != null && fallbackPassword != null) {
+      logger.warn("Detected clear text username and password override configs. "
+          + "These will be used to authenticate to Ranger Admin Server instead "
+          + "of using the recommended Kerberos principal and keytab "
+          + "authentication method. "
+          + "This is NOT recommended on a production cluster.");
+    } else {
+      // Check Kerberos principal and keytab file path configs if not both
+      // clear text username and password overrides are set.
+      final String omKerbPrinc = conf.get(OZONE_OM_KERBEROS_PRINCIPAL_KEY);
+      // Note: ozone.om.kerberos.keytab.file and ozone.om.kerberos.principal
+      //  are not empty by default. The default values may or may not be valid.
+      if (StringUtils.isBlank(omKerbPrinc)) {
+        isS3MultiTenancyEnabled = false;
+        logger.error("{} is required to enable S3 Multi-Tenancy but not set",
+            OZONE_OM_KERBEROS_PRINCIPAL_KEY);
+      }
+      final String rangerPw = conf.get(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY);
+      if (StringUtils.isBlank(rangerPw)) {
+        isS3MultiTenancyEnabled = false;
+        logger.error("{} is required to enable S3 Multi-Tenancy but not set",
+            OZONE_OM_KERBEROS_KEYTAB_FILE_KEY);
+      }
+      if (!(new File(rangerPw).isFile())) {
+        logger.error("{} = '{}' file path doesn't exist or is not a file",
+            OZONE_OM_KERBEROS_KEYTAB_FILE_KEY, rangerPw);
+      }
     }
 
     if (!isS3MultiTenancyEnabled) {
@@ -304,18 +340,58 @@ public interface OMMultiTenantManager {
     return true;
   }
 
+  String OZONE_TENANT_RANGER_POLICY_DESCRIPTION =
+      "Created by Ozone. WARNING: "
+          + "Changes will be lost when this tenant is deleted.";
+
+  String OZONE_TENANT_RANGER_ROLE_DESCRIPTION =
+      "Managed by Ozone. WARNING: "
+          + "Changes will be overridden. "
+          + "Use Ozone tenant CLI to manage users in this tenant role instead.";
+
   /**
    * Returns default VolumeAccess policy given tenant and role names.
    */
-  AccessPolicy newDefaultVolumeAccessPolicy(String tenantId,
-      OzoneTenantRolePrincipal userRole, OzoneTenantRolePrincipal adminRole)
-      throws IOException;
+  static Policy getDefaultVolumeAccessPolicy(
+      String tenantId, String volumeName,
+      String userRoleName, String adminRoleName)
+      throws IOException {
+
+    final String volumePolicyName = OMMultiTenantManager
+        .getDefaultBucketNamespacePolicyName(tenantId);
+
+    return new Policy.Builder()
+        .setName(volumePolicyName)
+        .addVolume(volumeName)
+        .setDescription(OZONE_TENANT_RANGER_POLICY_DESCRIPTION)
+        .addLabel(OZONE_TENANT_RANGER_POLICY_LABEL)
+        .addRoleAcl(userRoleName, Arrays.asList(
+            Acl.allow(READ), Acl.allow(LIST), Acl.allow(READ_ACL)))
+        .addRoleAcl(adminRoleName, Collections.singletonList(Acl.allow(ALL)))
+        .build();
+  }
 
   /**
    * Returns default BucketAccess policy given tenant and user role name.
    */
-  AccessPolicy newDefaultBucketAccessPolicy(String tenantId,
-      OzoneTenantRolePrincipal userRole) throws IOException;
+  static Policy getDefaultBucketAccessPolicy(
+      String tenantId, String volumeName,
+      String userRoleName) throws IOException {
+    final String bucketPolicyName = OMMultiTenantManager
+        .getDefaultBucketPolicyName(tenantId);
+
+    return new Policy.Builder()
+        .setName(bucketPolicyName)
+        .addVolume(volumeName)
+        .addBucket("*")
+        .setDescription(OZONE_TENANT_RANGER_POLICY_DESCRIPTION)
+        .addLabel(OZONE_TENANT_RANGER_POLICY_LABEL)
+        .addRoleAcl(userRoleName,
+            Collections.singletonList(Acl.allow(CREATE)))
+        .addUserAcl(new OzoneOwnerPrincipal().getName(),
+            Collections.singletonList(Acl.allow(ALL)))
+        .build();
+  }
 
   AuthorizerLock getAuthorizerLock();
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
index faa998e4c3..e8f35f6786 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
@@ -27,13 +27,7 @@ import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENA
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.multitenant.AccessPolicy.AccessGrantType.ALLOW;
 import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.CREATE;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.LIST;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ;
-import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ_ACL;
-import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.BUCKET;
 import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.KEY;
-import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.VOLUME;
 import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
 
 import java.io.IOException;
@@ -69,14 +63,15 @@ import org.apache.hadoop.ozone.om.multitenant.AuthorizerLockImpl;
 import org.apache.hadoop.ozone.om.multitenant.BucketNameSpace;
 import org.apache.hadoop.ozone.om.multitenant.CachedTenantState;
 import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
-import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessAuthorizer;
-import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessAuthorizerDummyPlugin;
-import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessAuthorizerRangerPlugin;
+import org.apache.hadoop.ozone.om.multitenant.InMemoryMultiTenantAccessController;
+import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController;
+import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Policy;
+import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Role;
 import org.apache.hadoop.ozone.om.multitenant.OMRangerBGSyncService;
 import org.apache.hadoop.ozone.om.multitenant.OzoneOwnerPrincipal;
 import org.apache.hadoop.ozone.om.multitenant.OzoneTenant;
-import org.apache.hadoop.ozone.om.multitenant.OzoneTenantRolePrincipal;
 import org.apache.hadoop.ozone.om.multitenant.RangerAccessPolicy;
+import org.apache.hadoop.ozone.om.multitenant.RangerClientMultiTenantAccessController;
 import org.apache.hadoop.ozone.om.multitenant.Tenant;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.UserAccessIdInfo;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
@@ -107,7 +102,7 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
   private final Map<String, CachedTenantState> tenantCache;
   private final ReentrantReadWriteLock tenantCacheLock;
   private final OMRangerBGSyncService omRangerBGSyncService;
-  private MultiTenantAccessAuthorizer authorizer;
+  private final MultiTenantAccessController accessController;
   private final AuthorizerLock authorizerLock;
   /**
    * Authorizer operations. Meant to be called in tenant preExecute.
@@ -130,27 +125,18 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
 
     loadTenantCacheFromDB();
 
-    boolean devSkipRanger =
-        conf.getBoolean(OZONE_OM_TENANT_DEV_SKIP_RANGER, false);
+    boolean devSkipRanger = conf.getBoolean(
+        OZONE_OM_TENANT_DEV_SKIP_RANGER, false);
+
     if (devSkipRanger) {
-      this.authorizer = new MultiTenantAccessAuthorizerDummyPlugin();
+      this.accessController = new InMemoryMultiTenantAccessController();
     } else {
-      this.authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
-    }
-    try {
-      this.authorizer.init(conf);
-    } catch (OMException ex) {
-      if (ex.getResult().equals(INTERNAL_ERROR)) {
-        LOG.error("Failed to initialize {}, falling back to dummy authorizer",
-            authorizer.getClass().getSimpleName());
-        this.authorizer = new MultiTenantAccessAuthorizerDummyPlugin();
-      } else {
-        throw ex;
-      }
+      this.accessController = new RangerClientMultiTenantAccessController(conf);
     }
 
     cacheOp = new CacheOp(tenantCache, tenantCacheLock);
-    authorizerOp = new AuthorizerOp(authorizer, tenantCache, tenantCacheLock);
+    authorizerOp = new AuthorizerOp(accessController,
+        tenantCache, tenantCacheLock);
 
     // Define the internal time unit for the config
     final TimeUnit internalTimeUnit = TimeUnit.SECONDS;
@@ -168,7 +154,8 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
         internalTimeUnit);
     // Initialize the Ranger Sync Thread
     omRangerBGSyncService = new OMRangerBGSyncService(ozoneManager, this,
-        authorizer, rangerSyncInterval, internalTimeUnit, rangerSyncTimeout);
+        accessController, rangerSyncInterval, internalTimeUnit,
+        rangerSyncTimeout);
 
     // Start the Ranger Sync Thread
     this.start();
@@ -214,14 +201,14 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
    */
   public class AuthorizerOp implements TenantOp {
 
-    private final MultiTenantAccessAuthorizer authorizer;
+    private final MultiTenantAccessController accessController;
     private final Map<String, CachedTenantState> tenantCache;
     private final ReentrantReadWriteLock tenantCacheLock;
 
-    AuthorizerOp(MultiTenantAccessAuthorizer authorizer,
+    AuthorizerOp(MultiTenantAccessController accessController,
         Map<String, CachedTenantState> tenantCache,
         ReentrantReadWriteLock tenantCacheLock) {
-      this.authorizer = authorizer;
+      this.accessController = accessController;
       this.tenantCache = tenantCache;
       this.tenantCacheLock = tenantCacheLock;
     }
@@ -282,40 +269,56 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
       Tenant tenant = new OzoneTenant(tenantId);
 
       try {
-        // Create admin role first
-        String adminRoleId = authorizer.createRole(adminRoleName, null);
-        tenant.addTenantAccessRole(adminRoleId);
-
-        // Then create user role, and add admin role as its delegated admin
-        String userRoleId = authorizer.createRole(userRoleName, adminRoleName);
-        tenant.addTenantAccessRole(userRoleId);
+        // Create empty admin role first
+        Role adminRole = new Role.Builder()
+            .setName(adminRoleName)
+            .setDescription(OZONE_TENANT_RANGER_ROLE_DESCRIPTION)
+            .build();
+        adminRole = accessController.createRole(adminRole);
+        // Sanity check. Response role name should be equal to the one we sent.
+        Preconditions.checkState(adminRole.getName().equals(adminRoleName));
+        tenant.addTenantAccessRole(adminRoleName);
+
+        // Then create user role with the admin role as its delegated admin
+        Role userRole = new Role.Builder()
+            .setName(userRoleName)
+            .addRole(adminRoleName, true)
+            .setDescription(OZONE_TENANT_RANGER_ROLE_DESCRIPTION)
+            .build();
+        userRole = accessController.createRole(userRole);
+        // Sanity check. Response role name should be equal to the one we sent.
+        Preconditions.checkState(userRole.getName().equals(userRoleName));
+        tenant.addTenantAccessRole(userRoleName);
 
         BucketNameSpace bucketNameSpace = tenant.getTenantBucketNameSpace();
-        // Bucket namespace is volume
+        // Bucket namespace is volume.
+        // Note at the moment we only support one volume for each tenant.
         for (OzoneObj volume : bucketNameSpace.getBucketNameSpaceObjects()) {
           String volumeName = volume.getVolumeName();
 
-          final OzoneTenantRolePrincipal userRole =
-              new OzoneTenantRolePrincipal(userRoleName);
-          final OzoneTenantRolePrincipal adminRole =
-              new OzoneTenantRolePrincipal(adminRoleName);
-
           // Allow Volume List access
-          AccessPolicy tenantVolumeAccessPolicy = newDefaultVolumeAccessPolicy(
-              volumeName, userRole, adminRole);
-          tenantVolumeAccessPolicy.setPolicyID(
-              authorizer.createAccessPolicy(tenantVolumeAccessPolicy));
-          tenant.addTenantAccessPolicy(tenantVolumeAccessPolicy);
+          Policy volumePolicy =
+              OMMultiTenantManager.getDefaultVolumeAccessPolicy(
+                  tenantId, volumeName, userRoleName, adminRoleName);
+          volumePolicy = accessController.createPolicy(volumePolicy);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Created volume policy: {}", volumePolicy);
+          }
+          // TODO: Review if this tenant object is useful.
+          tenant.addTenantAccessPolicy(volumePolicy.getName());
 
           // Allow Bucket Create within Volume
-          AccessPolicy tenantBucketCreatePolicy =
-              newDefaultBucketAccessPolicy(volumeName, userRole);
-          tenantBucketCreatePolicy.setPolicyID(
-              authorizer.createAccessPolicy(tenantBucketCreatePolicy));
-          tenant.addTenantAccessPolicy(tenantBucketCreatePolicy);
+          Policy bucketPolicy =
+              OMMultiTenantManager.getDefaultBucketAccessPolicy(
+                  tenantId, volumeName, userRoleName);
+          bucketPolicy = accessController.createPolicy(bucketPolicy);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Created bucket policy: {}", bucketPolicy);
+          }
+          // TODO: Review if this tenant object is useful.
+          tenant.addTenantAccessPolicy(bucketPolicy.getName());
         }
 
-        // Does NOT update tenant cache here
       } catch (IOException e) {
         // Expect the sync thread to restore the admin role later if op succeeds
         throw new OMException(e, TENANT_AUTHORIZER_ERROR);
@@ -330,12 +333,12 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
       LOG.info("Deleting tenant policies and roles from Ranger: {}", tenant);
 
       try {
-        for (AccessPolicy policy : tenant.getTenantAccessPolicies()) {
-          authorizer.deletePolicyByName(policy.getPolicyName());
+        for (String policyName : tenant.getTenantAccessPolicies()) {
+          accessController.deletePolicy(policyName);
         }
 
         for (String roleName : tenant.getTenantRoles()) {
-          authorizer.deleteRoleByName(roleName);
+          accessController.deleteRole(roleName);
         }
       } catch (IOException e) {
         // Expect the sync thread to restore the admin role later if op succeeds
@@ -343,6 +346,17 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
       }
     }
 
+    /**
+     * Helper method to check roleId presence in a Role.
+     */
+    private void checkRoleIdExistence(Role role) throws IOException {
+      if (!role.getId().isPresent()) {
+        final String errMsg = String.format("Received no role ID in: %s", role);
+        LOG.error(errMsg);
+        throw new IOException(errMsg);
+      }
+    }
+
     /**
      *  Algorithm
      *  Authorizer-plugin(Ranger) State :
@@ -374,21 +388,34 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
         Preconditions.checkNotNull(cachedTenantState,
             "Cache entry for tenant '" + tenantId + "' does not exist");
 
-        // Does NOT update tenant cache here
-
         final String tenantUserRoleName =
             tenantCache.get(tenantId).getTenantUserRoleName();
-        final OzoneTenantRolePrincipal tenantUserRolePrincipal =
-            new OzoneTenantRolePrincipal(tenantUserRoleName);
-        String roleJsonStr = authorizer.getRole(tenantUserRolePrincipal);
-        final String roleId =
-            authorizer.assignUserToRole(userPrincipal, roleJsonStr, false);
+        // Get tenant user role from Ranger
+        Role userRole = accessController.getRole(tenantUserRoleName);
+        checkRoleIdExistence(userRole);
+        final long roleId = userRole.getId().get();
+
+        // Sanity check user existence in tenant, but won't throw
+        if (userRole.getUsersMap().containsKey(userPrincipal)) {
+          LOG.warn("User '{}' is already assigned to tenant '{}'",
+              userPrincipal, tenantId);
+        }
+
+        // Add user (not accessId) to the role
+        userRole = new Role.Builder(userRole)
+            .addUser(userPrincipal, false)
+            .build();
+        // Push updated role to Ranger
+        userRole = accessController.updateRole(roleId, userRole);
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug("roleId that the user is assigned to: {}", roleId);
+          LOG.debug("Updated user role: {}", userRole);
         }
 
       } catch (IOException e) {
+        // If the user name doesn't exist in Ranger, it throws 400 Bad Request
+        // with message: user with name: USERNAME does not exist
+
         // Expect the sync thread to restore the user role later if op succeeds
         throw new OMException(e, TENANT_AUTHORIZER_ERROR);
       } finally {
@@ -414,15 +441,29 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
 
         final String userPrincipal = omDBAccessIdInfo.getUserPrincipal();
 
-        // Delete user from role in Ranger
         final String tenantUserRoleName =
             tenantCache.get(tenantId).getTenantUserRoleName();
-        final OzoneTenantRolePrincipal tenantUserRolePrincipal =
-            new OzoneTenantRolePrincipal(tenantUserRoleName);
-        String roleJsonStr = authorizer.getRole(tenantUserRolePrincipal);
-        final String roleId =
-            authorizer.revokeUserFromRole(userPrincipal, roleJsonStr);
-        Preconditions.checkNotNull(roleId);
+        // Get tenant user role from Ranger
+        Role userRole = accessController.getRole(tenantUserRoleName);
+        checkRoleIdExistence(userRole);
+        final long roleId = userRole.getId().get();
+
+        // Sanity check user existence in tenant, but won't throw
+        if (!userRole.getUsersMap().containsKey(userPrincipal)) {
+          LOG.warn("User '{}' is not assigned to tenant '{}'",
+              userPrincipal, tenantId);
+        }
+
+        // Remove user from role
+        userRole = new Role.Builder(userRole)
+            .removeUser(userPrincipal)
+            .build();
+        // Push updated role to ranger
+        userRole = accessController.updateRole(roleId, userRole);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Updated user role: {}", userRole);
+        }
 
         // Does NOT update tenant cache here
       } catch (IOException e) {
@@ -443,22 +484,34 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
       try {
         // tenant name is needed to retrieve role name
         final String tenantId = getTenantForAccessIDThrowIfNotFound(accessId);
-
         final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
-
         final String tenantAdminRoleName =
             cachedTenantState.getTenantAdminRoleName();
-        final OzoneTenantRolePrincipal existingAdminRole =
-            new OzoneTenantRolePrincipal(tenantAdminRoleName);
-
-        final String roleJsonStr = authorizer.getRole(existingAdminRole);
         final String userPrincipal = getUserNameGivenAccessId(accessId);
-        // Update Ranger. Add user principal (not accessId!) to the role
-        final String roleId = authorizer.assignUserToRole(
-            userPrincipal, roleJsonStr, delegated);
-        assert (roleId != null);
 
-        // Does NOT update tenant cache here
+        // Get tenant admin role from Ranger
+        Role adminRole = accessController.getRole(tenantAdminRoleName);
+        checkRoleIdExistence(adminRole);
+
+        // Sanity check user existence in tenant, but won't throw
+        // TODO: Or throw if user is already in admin role?
+        if (adminRole.getUsersMap().containsKey(userPrincipal)) {
+          LOG.warn("User '{}' is already admin in tenant '{}'",
+              userPrincipal, tenantId);
+        }
+
+        final long roleId = adminRole.getId().get();
+        // Add user principal (not accessId!) to admin role
+        adminRole = new Role.Builder(adminRole)
+            .addUser(userPrincipal, delegated)
+            .build();
+        // Push updated role to Ranger
+        adminRole = accessController.updateRole(roleId, adminRole);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Updated admin role: {}", adminRole);
+        }
+
       } catch (IOException e) {
         // Expect the sync thread to restore the admin role later if op succeeds
         throw new OMException(e, TENANT_AUTHORIZER_ERROR);
@@ -477,21 +530,34 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
       try {
         // tenant name is needed to retrieve role name
         final String tenantId = getTenantForAccessIDThrowIfNotFound(accessId);
-
         final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
         final String tenantAdminRoleName =
             cachedTenantState.getTenantAdminRoleName();
-        final OzoneTenantRolePrincipal existingAdminRole =
-            new OzoneTenantRolePrincipal(tenantAdminRoleName);
-
-        final String roleJsonStr = authorizer.getRole(existingAdminRole);
         final String userPrincipal = getUserNameGivenAccessId(accessId);
-        // Update Ranger. Add user principal (not accessId!) to the role
-        final String roleId = authorizer.revokeUserFromRole(
-            userPrincipal, roleJsonStr);
-        assert (roleId != null);
 
-        // Does NOT update tenant cache here
+        // Get tenant admin role from Ranger
+        Role adminRole = accessController.getRole(tenantAdminRoleName);
+        checkRoleIdExistence(adminRole);
+        final long roleId = adminRole.getId().get();
+
+        // Sanity check user existence in tenant, but won't throw
+        // TODO: Or throw if user is not in admin role?
+        if (!adminRole.getUsersMap().containsKey(userPrincipal)) {
+          LOG.warn("User '{}' is not admin in tenant '{}'",
+              userPrincipal, tenantId);
+        }
+
+        // Add user principal (not accessId!) to admin role
+        adminRole = new Role.Builder(adminRole)
+            .removeUser(userPrincipal)
+            .build();
+        // Push updated role to Ranger
+        adminRole = accessController.updateRole(roleId, adminRole);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Updated admin role: {}", adminRole);
+        }
+
       } catch (IOException e) {
         // Expect the sync thread to restore the admin role later if op succeeds
         throw new OMException(e, TENANT_AUTHORIZER_ERROR);
@@ -538,7 +604,7 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     @Override
     public void deleteTenant(Tenant tenant) throws IOException {
 
-      final String tenantId = tenant.getTenantId();
+      final String tenantId = tenant.getTenantName();
 
       tenantCacheLock.writeLock().lock();
       try {
@@ -787,41 +853,6 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     return optionalTenant.get();
   }
 
-  public AccessPolicy newDefaultVolumeAccessPolicy(String tenantId,
-      OzoneTenantRolePrincipal userRole, OzoneTenantRolePrincipal adminRole)
-      throws IOException {
-
-    final String volumeAccessPolicyName =
-        OMMultiTenantManager.getDefaultBucketNamespacePolicyName(tenantId);
-    AccessPolicy policy = new RangerAccessPolicy(volumeAccessPolicyName);
-    OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
-        .setResType(VOLUME).setStoreType(OZONE).setVolumeName(tenantId)
-        .setBucketName("").setKeyName("").build();
-    // Tenant users have READ, LIST and READ_ACL access on the volume
-    policy.addAccessPolicyElem(obj, userRole, READ, ALLOW);
-    policy.addAccessPolicyElem(obj, userRole, LIST, ALLOW);
-    policy.addAccessPolicyElem(obj, userRole, READ_ACL, ALLOW);
-    // Tenant admins have ALL access on the volume
-    policy.addAccessPolicyElem(obj, adminRole, ALL, ALLOW);
-    return policy;
-  }
-
-  public AccessPolicy newDefaultBucketAccessPolicy(String tenantId,
-      OzoneTenantRolePrincipal userRole) throws IOException {
-
-    final String bucketAccessPolicyName =
-        OMMultiTenantManager.getDefaultBucketPolicyName(tenantId);
-    AccessPolicy policy = new RangerAccessPolicy(bucketAccessPolicyName);
-    OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
-        .setResType(BUCKET).setStoreType(OZONE).setVolumeName(tenantId)
-        .setBucketName("*").setKeyName("").build();
-    // Tenant users have permission to CREATE buckets
-    policy.addAccessPolicyElem(obj, userRole, CREATE, ALLOW);
-    // Bucket owner have ALL access on their own buckets
-    policy.addAccessPolicyElem(obj, new OzoneOwnerPrincipal(), ALL, ALLOW);
-    return policy;
-  }
-
   // TODO: This policy doesn't seem necessary as the bucket-level policy has
   //  already granted the key-level access.
   //  Not sure if that is the intended behavior in Ranger though.
@@ -830,8 +861,8 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
       String bucketName) throws IOException {
     AccessPolicy policy = new RangerAccessPolicy(
         // principal already contains volume name
-        volumeName + " - KeyAccess");
-    // TODO: Double check the policy
+        volumeName + "-KeyAccess");
+
     OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
         .setResType(KEY).setStoreType(OZONE).setVolumeName(volumeName)
         .setBucketName("*").setKeyName("*").build();
@@ -1013,10 +1044,8 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
 
     final Tenant tenantObj = new OzoneTenant(tenantState.getTenantId());
 
-    tenantObj.addTenantAccessPolicy(
-        new RangerAccessPolicy(tenantState.getBucketNamespacePolicyName()));
-    tenantObj.addTenantAccessPolicy(
-        new RangerAccessPolicy(tenantState.getBucketNamespaceName()));
+    tenantObj.addTenantAccessPolicy(tenantState.getBucketNamespacePolicyName());
+    tenantObj.addTenantAccessPolicy(tenantState.getBucketPolicyName());
 
     tenantObj.addTenantAccessRole(tenantState.getUserRoleName());
     tenantObj.addTenantAccessRole(tenantState.getAdminRoleName());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 60ba6bb852..6292e15010 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.ozone.OzoneManagerVersion;
+import org.apache.hadoop.ozone.om.multitenant.OMRangerBGSyncService;
 import org.apache.hadoop.ozone.util.OzoneNetUtils;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
@@ -264,6 +265,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLI
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SERVER_DEFAULT_REPLICATION_TYPE_KEY;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.DETECTED_LOOP_IN_BUCKET_LINKS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FEATURE_NOT_ENABLED;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PERMISSION_DENIED;
@@ -813,7 +815,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
     throw new OMException("S3 multi-tenancy feature is not enabled. Please "
         + "set ozone.om.multitenancy.enabled to true and restart all OMs.",
-        ResultCodes.FEATURE_NOT_ENABLED);
+        FEATURE_NOT_ENABLED);
   }
 
   /**
@@ -3081,6 +3083,53 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     return new ServiceInfoEx(getServiceList(), caCertPem, caCertPemList);
   }
 
+  @Override
+  public boolean triggerRangerBGSync(boolean noWait) throws IOException {
+
+    // OM should be leader and ready.
+    // This check should always pass if called from a client since there
+    // is a leader check somewhere before entering this method.
+    if (!isLeaderReady()) {
+      // And even if we could allow followers to trigger sync, checkLeader()
+      // calls inside the sync would quit the sync anyway.
+      throw new OMException("OM is not leader or not ready", INVALID_REQUEST);
+    }
+
+    final UserGroupInformation ugi = getRemoteUser();
+    // Check Ozone admin privilege
+    if (!isAdmin(ugi)) {
+      throw new OMException("Only Ozone admins are allowed to trigger "
+          + "Ranger background sync manually", PERMISSION_DENIED);
+    }
+
+    // Check if MT manager is inited
+    final OMMultiTenantManager mtManager = getMultiTenantManager();
+    if (mtManager == null) {
+      throw new OMException("S3 Multi-Tenancy is not enabled",
+          FEATURE_NOT_ENABLED);
+    }
+
+    // Check if Ranger BG sync task is inited
+    final OMRangerBGSyncService bgSync = mtManager.getOMRangerBGSyncService();
+    if (bgSync == null) {
+      throw new OMException("Ranger background sync service is not initialized",
+          FEATURE_NOT_ENABLED);
+    }
+
+    // Trigger Ranger BG Sync
+    if (noWait) {
+      final Thread t = new Thread(bgSync::triggerRangerSyncOnce);
+      t.start();
+      LOG.info("User '{}' manually triggered Multi-Tenancy Ranger Sync "
+          + "in a new thread, tid={}", ugi, t.getId());
+      return true;
+    } else {
+      LOG.info("User '{}' manually triggered Multi-Tenancy Ranger Sync", ugi);
+      // Block in the handler thread
+      return bgSync.triggerRangerSyncOnce();
+    }
+  }
+
   @Override
   public StatusAndMessages finalizeUpgrade(String upgradeClientID)
       throws IOException {
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/multitenant/InMemoryMultiTenantAccessController.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/InMemoryMultiTenantAccessController.java
similarity index 72%
rename from hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/multitenant/InMemoryMultiTenantAccessController.java
rename to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/InMemoryMultiTenantAccessController.java
index cfb2a05e2f..27fd66ec83 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/multitenant/InMemoryMultiTenantAccessController.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/InMemoryMultiTenantAccessController.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.ozone.om.multitenant;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
@@ -42,16 +43,16 @@ public class InMemoryMultiTenantAccessController
   }
 
   @Override
-  public void createPolicy(Policy policy) throws Exception {
+  public Policy createPolicy(Policy policy) throws IOException {
     if (policies.containsKey(policy.getName())) {
-      throw new Exception("Policy already exists.");
+      throw new IOException("Policy already exists.");
     }
     // Multiple policies for the same resource should not be allowed.
     for (Policy existingPolicy: policies.values()) {
       if (existingPolicy.getVolumes().equals(policy.getVolumes()) &&
           existingPolicy.getBuckets().equals(policy.getBuckets()) &&
           existingPolicy.getKeys().equals(policy.getKeys())) {
-        throw new Exception("Policy for the same resource already defined.");
+        throw new IOException("Policy for the same resource already defined.");
       }
     }
     policies.put(policy.getName(), policy);
@@ -63,18 +64,20 @@ public class InMemoryMultiTenantAccessController
     }
 
     serviceVersion++;
+
+    return policy;
   }
 
   @Override
-  public Policy getPolicy(String policyName) throws Exception {
+  public Policy getPolicy(String policyName) throws IOException {
     if (!policies.containsKey(policyName)) {
-      throw new Exception("Policy does not exist.");
+      throw new IOException("Policy does not exist.");
     }
     return policies.get(policyName);
   }
 
   @Override
-  public List<Policy> getLabeledPolicies(String label) throws Exception {
+  public List<Policy> getLabeledPolicies(String label) throws IOException {
     List<Policy> result = new ArrayList<>();
     for (Policy policy: policies.values()) {
       if (policy.getLabels().contains(label)) {
@@ -86,27 +89,29 @@ public class InMemoryMultiTenantAccessController
   }
 
   @Override
-  public void updatePolicy(Policy policy) throws Exception {
+  public Policy updatePolicy(Policy policy) throws IOException {
     if (!policies.containsKey(policy.getName())) {
-      throw new Exception("Policy does not exist.");
+      throw new IOException("Policy does not exist.");
     }
     policies.put(policy.getName(), policy);
     serviceVersion++;
+
+    return policy;
   }
 
   @Override
-  public void deletePolicy(String policyName) throws Exception {
+  public void deletePolicy(String policyName) throws IOException {
     if (!policies.containsKey(policyName)) {
-      throw new Exception("Policy does not exist.");
+      throw new IOException("Policy does not exist.");
     }
     policies.remove(policyName);
     serviceVersion++;
   }
 
   @Override
-  public void createRole(Role role) throws Exception {
+  public Role createRole(Role role) throws IOException {
     if (roles.containsKey(role.getName())) {
-      throw new Exception("Role already exists.");
+      throw new IOException("Role already exists.");
     }
     Role newRole = new Role.Builder(role)
         .setID(nextRoleID)
@@ -114,41 +119,45 @@ public class InMemoryMultiTenantAccessController
     nextRoleID++;
     roles.put(newRole.getName(), newRole);
     serviceVersion++;
+
+    return role;
   }
 
   @Override
-  public Role getRole(String roleName) throws Exception {
+  public Role getRole(String roleName) throws IOException {
     if (!roles.containsKey(roleName)) {
-      throw new Exception("Role does not exist.");
+      throw new IOException("Role does not exist.");
     }
     return roles.get(roleName);
   }
 
   @Override
-  public void updateRole(long roleID, Role role) throws Exception {
+  public Role updateRole(long roleId, Role role) throws IOException {
     Optional<Role> originalRole = roles.values().stream()
-        .filter(r -> r.getRoleID().isPresent() && r.getRoleID().get() == roleID)
+        .filter(r -> r.getId().isPresent() && r.getId().get() == roleId)
         .findFirst();
     if (!originalRole.isPresent()) {
-      throw new Exception("Role does not exist.");
+      throw new IOException("Role does not exist.");
     }
     // New role may have same ID but different name.
     roles.remove(originalRole.get().getName());
     roles.put(role.getName(), role);
     serviceVersion++;
+
+    return role;
   }
 
   @Override
-  public void deleteRole(String roleName) throws Exception {
+  public void deleteRole(String roleName) throws IOException {
     if (!roles.containsKey(roleName)) {
-      throw new Exception("Role does not exist.");
+      throw new IOException("Role does not exist.");
     }
     roles.remove(roleName);
     serviceVersion++;
   }
 
   @Override
-  public long getRangerServiceVersion() {
+  public long getRangerServicePolicyVersion() {
     return serviceVersion;
   }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java
deleted file mode 100644
index b8fe50d60a..0000000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.hadoop.ozone.om.multitenant;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.annotation.InterfaceAudience;
-import org.apache.hadoop.hdds.annotation.InterfaceStability;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
-import org.apache.hadoop.ozone.security.acl.IOzoneObj;
-import org.apache.hadoop.ozone.security.acl.RequestContext;
-import org.apache.http.auth.BasicUserPrincipal;
-
-/**
- * Public API for Ozone MultiTenant Gatekeeper. Security providers providing
- * support for Ozone MultiTenancy should implement this.
- */
-@InterfaceAudience.LimitedPrivate({"HDFS", "Yarn", "Ranger", "Hive", "HBase"})
-@InterfaceStability.Evolving
-public interface MultiTenantAccessAuthorizer extends IAccessAuthorizer {
-
-  /**
-   * Initialize the MultiTenantGateKeeper. Initialize any external state.
-   *
-   * @param configuration
-   * @throws IOException
-   */
-  void init(Configuration configuration) throws IOException;
-
-  /**
-   * Shutdown for the MultiTenantGateKeeper.
-   * @throws IOException
-   */
-  void shutdown() throws IOException;
-
-  /**
-   * Assign user to an existing role in the Authorizer.
-   *
-   * @param userPrincipal user principal
-   * @param existingRole  A JSON String representation of the existing role
-   *                      returned from the Authorizer (Ranger).
-   * @param isAdmin true if tenant admin
-   * @return unique and opaque userID that can be used to refer to the user in
-   * MultiTenantGateKeeperplugin Implementation. E.g. a Ranger
-   * based Implementation can return some ID thats relevant for it.
-   */
-  String assignUserToRole(String userPrincipal, String existingRole,
-      boolean isAdmin) throws IOException;
-
-  /**
-   * Update the exising role details and push the changes to Ranger.
-   *
-   * @param userPrincipal user name that exists in Ranger (internal / external).
-   * @param existingRole  An existing role's JSON response String from Ranger.
-   * @return roleId (not useful for now)
-   * @throws IOException
-   */
-  String revokeUserFromRole(String userPrincipal, String existingRole)
-      throws IOException;
-
-  /**
-   * Assign all the users to an existing role.
-   * @param users list of user principals
-   * @param existingRole roleName
-   */
-  String assignAllUsers(HashSet<String> users, String existingRole)
-      throws IOException;
-
-  /**
-   * @param userPrincipal
-   * @return Unique userID maintained by the authorizer plugin.
-   * @throws IOException
-   */
-  String getUserId(String userPrincipal) throws IOException;
-
-  /**
-   * @param principal
-   * @return Unique groupID maintained by the authorizer plugin.
-   * @throws IOException
-   */
-  String getRole(OzoneTenantRolePrincipal principal)
-      throws IOException;
-
-  /**
-   * Returs the details of a role, given the rolename.
-   * @param roleName
-   * @return
-   * @throws IOException
-   */
-  String getRole(String roleName)
-      throws IOException;
-
-  /**
-   * Delete the user userID in MultiTenantGateKeeper plugin.
-   * @param opaqueUserID : unique ID that was returned by
-   *                    MultiTenantGatekeeper in
-   *               createUser().
-   */
-  void deleteUser(String opaqueUserID) throws IOException;
-
-  /**
-   * Create Role entity for MultiTenantGatekeeper plugin.
-   * @param role
-   * @param adminRoleName (Optional) admin role name that will be added to
-   *                      manage this role.
-   * @return unique groupID that can be used to refer to the role in
-   * MultiTenantGateKeeper plugin Implementation e.g. corresponding ID on the
-   * Ranger end for a ranger based implementation .
-   */
-  String createRole(String role, String adminRoleName)
-      throws IOException;
-
-  /**
-   * Creates a new user.
-   * @param userName
-   * @param password
-   * @return
-   * @throws IOException
-   */
-  String createUser(String userName, String password)
-      throws IOException;
-
-  /**
-   * Delete the group groupID in MultiTenantGateKeeper plugin.
-   * @param groupID : unique opaque ID that was returned by
-   *                MultiTenantGatekeeper in createGroup().
-   */
-  void deleteRoleById(String groupID) throws IOException;
-
-  /**
-   * Create access policy with the given parameters in the authorizer
-   * (e.g. Ranger).
-   *
-   * @param policy AccessPolicy
-   * @return unique and opaque policy ID that is maintained by the plugin.
-   * @throws IOException
-   */
-  String createAccessPolicy(AccessPolicy policy) throws IOException;
-
-  /**
-   * Get AccessPolicy by policy name.
-   *
-   * @param policyName policy name
-   * @return AccessPolicy
-   * @throws IOException
-   */
-  AccessPolicy getAccessPolicyByName(String policyName) throws IOException;
-
-  /**
-   * Given a policy Id, returns the policy.
-   *
-   * @param policyId
-   * @return
-   * @throws IOException
-   */
-  AccessPolicy getAccessPolicyById(String policyId) throws IOException;
-
-  /**
-   * Delete policy by policy ID.
-   *
-   * @param policyId ID that was returned earlier by createAccessPolicy().
-   * @throws IOException
-   */
-  void deletePolicyById(String policyId) throws IOException;
-
-  /**
-   * Delete policy by policy name.
-   *
-   * @param policyName policy name
-   * @throws IOException
-   */
-  void deletePolicyByName(String policyName) throws IOException;
-
-  /**
-   * Delete role by role name.
-   *
-   * @param roleName role name
-   * @throws IOException
-   */
-  void deleteRoleByName(String roleName) throws IOException;
-
-  /**
-   * Grant user aclType access to bucketNameSpace.
-   * @param bucketNameSpace
-   * @param user
-   * @param aclType
-   */
-  void grantAccess(BucketNameSpace bucketNameSpace,
-                   BasicUserPrincipal user, ACLType aclType);
-
-  /**
-   * Revoke from user aclType access from bucketNameSpace.
-   * @param bucketNameSpace
-   * @param user
-   * @param aclType
-   */
-  void revokeAccess(BucketNameSpace bucketNameSpace,
-                    BasicUserPrincipal user, ACLType aclType);
-
-  /**
-   * Grant user aclType access to accountNameSpace.
-   * @param accountNameSpace
-   * @param user
-   * @param aclType
-   */
-  void grantAccess(AccountNameSpace accountNameSpace,
-                   BasicUserPrincipal user,
-                   ACLType aclType);
-
-  /**
-   * Revoke from user aclType access from bucketNameSpace.
-   * @param accountNameSpace
-   * @param user
-   * @param aclType
-   */
-  void revokeAccess(AccountNameSpace accountNameSpace,
-                    BasicUserPrincipal user, ACLType aclType);
-
-  /**
-   * Return all bucketnamespace accesses granted to user.
-   * @param user
-   * @return list of access
-   */
-  List<Pair<BucketNameSpace, ACLType>> getAllBucketNameSpaceAccesses(
-      BasicUserPrincipal user);
-
-  /**
-   * Checks if the user has access to bucketNameSpace.
-   * @param bucketNameSpace
-   * @param user
-   * @return true if access is granted, false otherwise.
-   */
-  boolean checkAccess(BucketNameSpace bucketNameSpace,
-                      BasicUserPrincipal user);
-
-  /**
-   * Checks if the user has access to accountNameSpace.
-   * @param accountNameSpace
-   * @param user
-   * @return true if access is granted, false otherwise.
-   */
-  boolean checkAccess(AccountNameSpace accountNameSpace,
-                      BasicUserPrincipal user);
-
-  /**
-   * Check access for given ozoneObject. Access for the object would be
-   * checked in the context of a MultiTenant environment.
-   *
-   * @param ozoneObject object for which access needs to be checked.
-   * @param context Context object encapsulating all user related information.
-   * @throws org.apache.hadoop.ozone.om.exceptions.OMException
-   * @return true if user has access else false.
-   */
-  @Override
-  boolean checkAccess(IOzoneObj ozoneObject, RequestContext context)
-      throws OMException;
-
-  long getLatestOzoneServiceVersion() throws IOException;
-
-  String getAllMultiTenantPolicies() throws IOException;
-
-  MultiTenantAccessController getMultiTenantAccessController();
-}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java
deleted file mode 100644
index c88f332a46..0000000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.om.multitenant;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.security.acl.IOzoneObj;
-import org.apache.hadoop.ozone.security.acl.RequestContext;
-import org.apache.http.auth.BasicUserPrincipal;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-
-/**
- * Dummy implementation of MultiTenantAccessAuthorizer when some parts of
- * testing don't need to deal with Ranger.
- */
-public class MultiTenantAccessAuthorizerDummyPlugin implements
-    MultiTenantAccessAuthorizer {
-
-  @Override
-  public void init(Configuration configuration) throws IOException {
-
-  }
-
-  @Override
-  public void shutdown() throws IOException {
-
-  }
-
-  @Override
-  public String assignUserToRole(String userPrincipal,
-      String existingRole, boolean isAdmin) {
-    return "roleId";
-  }
-
-  @Override
-  public String revokeUserFromRole(String userPrincipal, String existingRole) {
-    return "roleId";
-  }
-
-  @Override
-  public String assignAllUsers(HashSet<String> users, String existingRole)
-      throws IOException {
-    return null;
-  }
-
-  @Override
-  public String getUserId(String userPrincipal) throws IOException {
-    return null;
-  }
-
-  @Override
-  public String getRole(OzoneTenantRolePrincipal principal) throws IOException {
-    return null;
-  }
-
-  @Override
-  public String getRole(String roleName) throws IOException {
-    return null;
-  }
-
-  @Override
-  public void deleteUser(String opaqueUserID) throws IOException {
-
-  }
-
-  @Override
-  public String createRole(String role, String adminRoleName)
-      throws IOException {
-    return null;
-  }
-
-  @Override
-  public String createUser(String userName, String password)
-      throws IOException {
-    return null;
-  }
-
-  @Override
-  public void deleteRoleById(String groupID) throws IOException {
-
-  }
-
-  @Override
-  public String createAccessPolicy(AccessPolicy policy) throws IOException {
-    return null;
-  }
-
-  @Override
-  public AccessPolicy getAccessPolicyByName(String policyName) {
-    return null;
-  }
-
-  @Override
-  public AccessPolicy getAccessPolicyById(String policyName) {
-    return null;
-  }
-
-  @Override
-  public void deletePolicyById(String policyId) throws IOException {
-
-  }
-
-  @Override
-  public void deleteRoleByName(String roleName) throws IOException {
-
-  }
-
-  @Override
-  public void deletePolicyByName(String policyName) throws IOException {
-
-  }
-
-  @Override
-  public void grantAccess(BucketNameSpace bucketNameSpace,
-      BasicUserPrincipal user, ACLType aclType) {
-
-  }
-
-  @Override
-  public void revokeAccess(BucketNameSpace bucketNameSpace,
-      BasicUserPrincipal user, ACLType aclType) {
-
-  }
-
-  @Override
-  public void grantAccess(AccountNameSpace accountNameSpace,
-      BasicUserPrincipal user, ACLType aclType) {
-
-  }
-
-  @Override
-  public void revokeAccess(AccountNameSpace accountNameSpace,
-      BasicUserPrincipal user, ACLType aclType) {
-
-  }
-
-  @Override
-  public List<Pair<BucketNameSpace, ACLType>> getAllBucketNameSpaceAccesses(
-      BasicUserPrincipal user) {
-    return null;
-  }
-
-  @Override
-  public boolean checkAccess(BucketNameSpace bucketNameSpace,
-      BasicUserPrincipal user) {
-    return false;
-  }
-
-  @Override
-  public boolean checkAccess(AccountNameSpace accountNameSpace,
-      BasicUserPrincipal user) {
-    return false;
-  }
-
-  @Override
-  public boolean checkAccess(IOzoneObj ozoneObject, RequestContext context)
-      throws OMException {
-    return false;
-  }
-
-  @Override
-  public long getLatestOzoneServiceVersion() {
-    return -1;
-  }
-
-  @Override
-  public String getAllMultiTenantPolicies() {
-    return null;
-  }
-
-  @Override
-  public MultiTenantAccessController getMultiTenantAccessController() {
-    return null;
-  }
-}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
deleted file mode 100644
index 8a33b87e16..0000000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
+++ /dev/null
@@ -1,854 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.om.multitenant;
-
-import static java.net.HttpURLConnection.HTTP_OK;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_CREATE_POLICY_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_CREATE_ROLE_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_CREATE_USER_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_DELETE_POLICY_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_DELETE_ROLE_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_DELETE_USER_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_POLICY_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_POLICY_ID_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_USER_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_ROLE_ADD_USER_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ALL_POLICIES_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_DOWNLOAD_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_OZONE_SERVICE_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_TENANT_RANGER_POLICY_LABEL;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_TIMEOUT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_IGNORE_SERVER_CERT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_IGNORE_SERVER_CERT_DEFAULT;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.net.ConnectException;
-import java.net.HttpURLConnection;
-import java.net.SocketTimeoutException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.X509TrustManager;
-
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
-import com.google.gson.JsonParser;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.kerby.util.Base64;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.security.acl.IOzoneObj;
-import org.apache.hadoop.ozone.security.acl.RequestContext;
-import org.apache.http.auth.BasicUserPrincipal;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements MultiTenantAccessAuthorizer for Apache Ranger.
- */
-public class MultiTenantAccessAuthorizerRangerPlugin implements
-    MultiTenantAccessAuthorizer {
-  public static final Logger LOG = LoggerFactory
-      .getLogger(MultiTenantAccessAuthorizerRangerPlugin.class);
-
-  private MultiTenantAccessController accessController;
-
-  private OzoneConfiguration conf;
-  private boolean ignoreServerCert = true;
-  private int connectionTimeout;
-  private int connectionRequestTimeout;
-  private String authHeaderValue;
-  private String rangerHttpsAddress;
-  // Stores Ranger cm_ozone service ID. This value should not change (unless
-  // somehow Ranger cm_ozone service is deleted and re-created while OM is
-  // still running and not reloaded / restarted).
-  private int rangerOzoneServiceId = -1;
-
-  @Override
-  public void init(Configuration configuration) throws IOException {
-    conf = new OzoneConfiguration(configuration);
-    accessController = new RangerRestMultiTenantAccessController(conf);
-    rangerHttpsAddress = conf.get(OZONE_RANGER_HTTPS_ADDRESS_KEY);
-    if (rangerHttpsAddress == null) {
-      throw new OMException("Config ozone.om.ranger.https-address is not set! "
-          + "Multi-Tenancy feature requires Apache Ranger to function properly",
-          OMException.ResultCodes.INTERNAL_ERROR);
-    }
-    initializeRangerConnection();
-
-    // Get Ranger Ozone service ID
-    try {
-      rangerOzoneServiceId = retrieveRangerOzoneServiceId();
-    } catch (SocketTimeoutException | ConnectException e) {
-      // Exceptions (e.g. ConnectException: Connection refused)
-      // thrown here can crash OM during startup.
-      // Tolerate potential connection failure to Ranger during initialization
-      // due to cluster services starting up at the same time or not ready yet.
-      // Later when the Ranger Ozone service ID would be used it should try
-      // and retrieve the ID again if it failed earlier.
-      LOG.error("Failed to get Ozone service ID to Ranger. "
-              + "Will retry later", e);
-      rangerOzoneServiceId = -1;
-    }
-  }
-
-  int getRangerOzoneServiceId() {
-    return rangerOzoneServiceId;
-  }
-
-  /**
-   * Helper method that checks if the RangerOzoneServiceId is properly retrieved
-   * during init. If not, try to get it from Ranger.
-   */
-  private void checkRangerOzoneServiceId() throws IOException {
-    if (rangerOzoneServiceId < 0) {
-      rangerOzoneServiceId = retrieveRangerOzoneServiceId();
-    }
-  }
-
-  private void initializeRangerConnection() {
-    setupRangerConnectionConfig();
-    if (ignoreServerCert) {
-      setupRangerIgnoreServerCertificate();
-    }
-    setupRangerConnectionAuthHeader();
-  }
-
-  private void setupRangerConnectionConfig() {
-    connectionTimeout = (int) conf.getTimeDuration(
-        OZONE_RANGER_OM_CONNECTION_TIMEOUT,
-        conf.get(
-            OZONE_RANGER_OM_CONNECTION_TIMEOUT,
-            OZONE_RANGER_OM_CONNECTION_TIMEOUT_DEFAULT),
-        TimeUnit.MILLISECONDS);
-    connectionRequestTimeout = (int)conf.getTimeDuration(
-        OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT,
-        conf.get(
-            OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT,
-            OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT),
-        TimeUnit.MILLISECONDS
-    );
-    ignoreServerCert = (boolean) conf.getBoolean(
-        OZONE_RANGER_OM_IGNORE_SERVER_CERT,
-            OZONE_RANGER_OM_IGNORE_SERVER_CERT_DEFAULT);
-  }
-
-  private void setupRangerIgnoreServerCertificate() {
-    // Create a trust manager that does not validate certificate chains
-    TrustManager[] trustAllCerts = new TrustManager[]{
-        new X509TrustManager() {
-          public java.security.cert.X509Certificate[] getAcceptedIssuers() {
-            return null;
-          }
-          public void checkClientTrusted(
-              java.security.cert.X509Certificate[] certs, String authType) {
-          }
-          public void checkServerTrusted(
-              java.security.cert.X509Certificate[] certs, String authType) {
-          }
-        }
-    };
-
-    try {
-      SSLContext sc = SSLContext.getInstance("SSL");
-      sc.init(null, trustAllCerts, new java.security.SecureRandom());
-      HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
-    } catch (Exception e) {
-      LOG.info("Setting DefaultSSLSocketFactory failed.");
-    }
-  }
-
-  private void setupRangerConnectionAuthHeader() {
-    String userName = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER);
-    String passwd = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD);
-    String auth = userName + ":" + passwd;
-    byte[] encodedAuth =
-        Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
-    authHeaderValue = "Basic " +
-        new String(encodedAuth, StandardCharsets.UTF_8);
-  }
-
-
-  @Override
-  public void shutdown() throws IOException {
-    // TBD
-  }
-
-  @Override
-  public void grantAccess(BucketNameSpace bucketNameSpace,
-                          BasicUserPrincipal user, ACLType aclType) {
-    // TBD
-  }
-
-  @Override
-  public void revokeAccess(BucketNameSpace bucketNameSpace,
-                           BasicUserPrincipal user, ACLType aclType) {
-    // TBD
-  }
-
-  @Override
-  public void grantAccess(AccountNameSpace accountNameSpace,
-                          BasicUserPrincipal user, ACLType aclType) {
-    // TBD
-  }
-
-  @Override
-  public void revokeAccess(AccountNameSpace accountNameSpace,
-                           BasicUserPrincipal user, ACLType aclType) {
-    // TBD
-  }
-
-  public List<Pair<BucketNameSpace, ACLType>>
-      getAllBucketNameSpaceAccesses(BasicUserPrincipal user) {
-    // TBD
-    return null;
-  }
-
-  @Override
-  public boolean checkAccess(BucketNameSpace bucketNameSpace,
-                             BasicUserPrincipal user) {
-    // TBD
-    return true;
-  }
-
-  @Override
-  public boolean checkAccess(AccountNameSpace accountNameSpace,
-                             BasicUserPrincipal user) {
-    // TBD
-    return true;
-  }
-
-  @Override
-  public boolean checkAccess(IOzoneObj ozoneObject, RequestContext context)
-      throws OMException {
-    // TBD
-    return true;
-  }
-
-  @Override
-  public String getRole(OzoneTenantRolePrincipal principal) throws IOException {
-
-    String endpointUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT +
-            principal.getName();
-
-    HttpURLConnection conn = makeHttpGetCall(endpointUrl, "GET", false);
-    return getResponseData(conn);
-  }
-
-  @Override
-  public String getRole(String roleName) throws IOException {
-
-    String endpointUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT +
-            roleName;
-
-    HttpURLConnection conn = makeHttpGetCall(endpointUrl, "GET", false);
-    return getResponseData(conn);
-  }
-
-  @Override
-  public String getUserId(String userPrincipal) throws IOException {
-    String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_USER_HTTP_ENDPOINT +
-        userPrincipal;
-
-    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl,
-        "GET", false);
-    String response = getResponseData(conn);
-    String userIDCreated = null;
-    try {
-      JsonObject jResonse = new JsonParser().parse(response).getAsJsonObject();
-      JsonArray userinfo = jResonse.get("vXUsers").getAsJsonArray();
-      int numIndex = userinfo.size();
-      for (int i = 0; i < numIndex; ++i) {
-        if (userinfo.get(i).getAsJsonObject().get("name").getAsString()
-            .equals(userPrincipal)) {
-          userIDCreated =
-              userinfo.get(i).getAsJsonObject().get("id").getAsString();
-          break;
-        }
-      }
-      LOG.debug("User ID is: {}", userIDCreated);
-    } catch (JsonParseException e) {
-      e.printStackTrace();
-      throw e;
-    }
-    return userIDCreated;
-  }
-
-  /**
-   * Update the exising role details and push the changes to Ranger.
-   *
-   * @param userPrincipal user name that exists in Ranger.
-   * @param existingRole  An existing role's JSON response String from Ranger.
-   * @return roleId (not useful for now)
-   * @throws IOException
-   */
-  @Override
-  public String revokeUserFromRole(String userPrincipal,
-      String existingRole) throws IOException {
-    JsonObject roleObj = new JsonParser().parse(existingRole).getAsJsonObject();
-    // Parse Json
-    final String roleId = roleObj.get("id").getAsString();
-    LOG.debug("Got roleId: {}", roleId);
-
-    JsonArray oldUsersArray = roleObj.getAsJsonArray("users");
-    JsonArray newUsersArray = new JsonArray();
-
-    for (int i = 0; i < oldUsersArray.size(); ++i) {
-      JsonObject newUserEntry = oldUsersArray.get(i).getAsJsonObject();
-      if (!newUserEntry.get("name").getAsString().equals(userPrincipal)) {
-        newUsersArray.add(newUserEntry);
-      }
-      // Update Json array
-    }
-    roleObj.add("users", newUsersArray);
-
-    LOG.debug("Updated: {}", roleObj);
-
-    final String endpointUrl = rangerHttpsAddress +
-        OZONE_OM_RANGER_ADMIN_ROLE_ADD_USER_HTTP_ENDPOINT + roleId;
-    final String jsonData = roleObj.toString();
-
-    HttpURLConnection conn =
-        makeHttpCall(endpointUrl, jsonData, "PUT", false);
-    if (conn.getResponseCode() != HTTP_OK) {
-      throw new IOException("Ranger REST API failure: " + conn.getResponseCode()
-          + " " + conn.getResponseMessage()
-          + ". Error updating Ranger role.");
-    }
-    String resp = getResponseData(conn);
-    String returnedRoleId;
-    try {
-      JsonObject jObject = new JsonParser().parse(resp).getAsJsonObject();
-      returnedRoleId = jObject.get("id").getAsString();
-      LOG.debug("Ranger returns roleId: {}", roleId);
-    } catch (JsonParseException e) {
-      e.printStackTrace();
-      throw e;
-    }
-    return returnedRoleId;
-  }
-
-  /**
-   * Update the exising role details and push the changes to Ranger.
-   *
-   * @param userPrincipal user name that exists in Ranger.
-   * @param existingRole  An existing role's JSON response String from Ranger.
-   * @param isAdmin       Make it delegated admin of the role.
-   * @return roleId (not useful for now)
-   * @throws IOException
-   */
-  public String assignUserToRole(String userPrincipal,
-      String existingRole, boolean isAdmin) throws IOException {
-
-    JsonObject roleObj = new JsonParser().parse(existingRole).getAsJsonObject();
-    // Parse Json
-    final String roleId = roleObj.get("id").getAsString();
-    LOG.debug("Got roleId: {}", roleId);
-
-    JsonArray usersArray = roleObj.getAsJsonArray("users");
-    JsonObject newUserEntry = new JsonObject();
-    newUserEntry.addProperty("name", userPrincipal);
-    newUserEntry.addProperty("isAdmin", isAdmin);
-    usersArray.add(newUserEntry);
-    // Update Json array
-    roleObj.add("users", usersArray);
-
-    LOG.debug("Updated: {}", roleObj);
-
-    final String endpointUrl = rangerHttpsAddress +
-        OZONE_OM_RANGER_ADMIN_ROLE_ADD_USER_HTTP_ENDPOINT + roleId;
-    final String jsonData = roleObj.toString();
-
-    HttpURLConnection conn =
-        makeHttpCall(endpointUrl, jsonData, "PUT", false);
-    if (conn.getResponseCode() != HTTP_OK) {
-      throw new IOException("Ranger REST API failure: " + conn.getResponseCode()
-          + " " + conn.getResponseMessage()
-          + ". Error updating Ranger role.");
-    }
-    String resp = getResponseData(conn);
-    String returnedRoleId;
-    try {
-      JsonObject jObject = new JsonParser().parse(resp).getAsJsonObject();
-      returnedRoleId = jObject.get("id").getAsString();
-      LOG.debug("Ranger returns roleId: {}", roleId);
-    } catch (JsonParseException e) {
-      e.printStackTrace();
-      throw e;
-    }
-    return returnedRoleId;
-  }
-
-  /**
-   * Update the exising role details and push the changes to Ranger.
-   *
-   * @param users must be existing users in Ranger.
-   * @param existingRole An existing role's JSON response String from Ranger.
-   * @return roleId (not useful for now)
-   * @throws IOException
-   */
-  @Override
-  public String assignAllUsers(HashSet<String> users,
-                               String existingRole) throws IOException {
-
-    JsonObject roleObj = new JsonParser().parse(existingRole).getAsJsonObject();
-    // Parse Json
-    final String roleId = roleObj.get("id").getAsString();
-    LOG.debug("Got roleId: {}", roleId);
-
-    JsonArray usersArray = new JsonArray();
-    for (String user: users) {
-      JsonObject newUserEntry = new JsonObject();
-      newUserEntry.addProperty("name", user);
-      newUserEntry.addProperty("isAdmin", false);
-      usersArray.add(newUserEntry);
-    }
-    // Update Json array
-    roleObj.remove("users"); // remove the old users
-    roleObj.add("users", usersArray);
-
-    LOG.debug("Updated: {}", roleObj);
-
-    final String endpointUrl = rangerHttpsAddress +
-        OZONE_OM_RANGER_ADMIN_ROLE_ADD_USER_HTTP_ENDPOINT + roleId;
-    final String jsonData = roleObj.toString();
-
-    HttpURLConnection conn =
-        makeHttpCall(endpointUrl, jsonData, "PUT", false);
-    if (conn.getResponseCode() != HTTP_OK) {
-      throw new IOException("Ranger REST API failure: " + conn.getResponseCode()
-          + " " + conn.getResponseMessage()
-          + ". Error updating Ranger role.");
-    }
-    String resp = getResponseData(conn);
-    String returnedRoleId;
-    try {
-      JsonObject jObject = new JsonParser().parse(resp).getAsJsonObject();
-      returnedRoleId = jObject.get("id").getAsString();
-      LOG.debug("Ranger returns roleId: {}", roleId);
-    } catch (JsonParseException e) {
-      e.printStackTrace();
-      throw e;
-    }
-    return returnedRoleId;
-  }
-
-  private String getCreateRoleJsonStr(String roleName, String adminRoleName) {
-    return "{"
-        + "  \"name\":\"" + roleName + "\","
-        + "  \"description\":\"Role created by Ozone for Multi-Tenancy\""
-        + (adminRoleName == null ? "" : ", \"roles\":"
-        + "[{\"name\":\"" + adminRoleName + "\",\"isAdmin\": true}]")
-        + "}";
-  }
-
-  public String createRole(String role, String adminRoleName)
-      throws IOException {
-
-    String endpointUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_CREATE_ROLE_HTTP_ENDPOINT;
-
-    String jsonData = getCreateRoleJsonStr(role, adminRoleName);
-
-    final HttpURLConnection conn = makeHttpCall(endpointUrl,
-        jsonData, "POST", false);
-    if (conn.getResponseCode() != HTTP_OK) {
-      // TODO: Do not throw on 400 ?
-      throw new IOException("Ranger REST API failure: " + conn.getResponseCode()
-          + " " + conn.getResponseMessage()
-          + ". Role name '" + role + "' likely already exists in Ranger");
-    }
-    String roleInfo = getResponseData(conn);
-    String roleId;
-    try {
-      JsonObject jObject = new JsonParser().parse(roleInfo).getAsJsonObject();
-      roleId = jObject.get("id").getAsString();
-      LOG.debug("Ranger returned roleId: {}", roleId);
-    } catch (JsonParseException e) {
-      e.printStackTrace();
-      throw e;
-    }
-    return roleId;
-  }
-
-  private String getCreateUserJsonStr(String userName, String password) {
-    return "{"
-        + "  \"name\":\"" +  userName + "\","
-        + "  \"password\":\"" +  password + "\","
-        + "  \"firstName\":\"" +  userName + "\","
-        + "  \"userRoleList\":[\"ROLE_USER\"]"
-        + "}";
-  }
-
-  public String createUser(String userName, String password)
-      throws IOException {
-
-    String endpointUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_CREATE_USER_HTTP_ENDPOINT;
-
-    String jsonData = getCreateUserJsonStr(userName, password);
-
-    final HttpURLConnection conn = makeHttpCall(endpointUrl,
-        jsonData, "POST", false);
-    if (conn.getResponseCode() != HTTP_OK) {
-      throw new IOException("Ranger REST API failure: " + conn.getResponseCode()
-          + " " + conn.getResponseMessage()
-          + ". User name '" + userName + "' likely already exists in Ranger");
-    }
-    String userInfo = getResponseData(conn);
-    String userId;
-    try {
-      JsonObject jObject = new JsonParser().parse(userInfo).getAsJsonObject();
-      userId = jObject.get("id").getAsString();
-      LOG.debug("Ranger returned userId: {}", userId);
-    } catch (JsonParseException e) {
-      e.printStackTrace();
-      throw e;
-    }
-    return userId;
-  }
-
-
-  public String createAccessPolicy(AccessPolicy policy) throws IOException {
-    String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_CREATE_POLICY_HTTP_ENDPOINT;
-
-    HttpURLConnection conn = makeHttpCall(rangerAdminUrl,
-        policy.serializePolicyToJsonString(),
-        "POST", false);
-    String policyInfo = getResponseData(conn);
-    String policyID;
-    try {
-      JsonObject jObject = new JsonParser().parse(policyInfo).getAsJsonObject();
-      // TODO: Use policy name instead of id
-      policyID = jObject.get("id").getAsString();
-      LOG.debug("policyID is: {}", policyID);
-    } catch (JsonParseException e) {
-      e.printStackTrace();
-      throw e;
-    }
-    return policyID;
-  }
-
-  public AccessPolicy getAccessPolicyByName(String policyName)
-      throws IOException {
-    String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_POLICY_HTTP_ENDPOINT +
-        policyName;
-
-    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl,
-        "GET", false);
-    String policyInfo = getResponseData(conn);
-    JsonArray jArry = new JsonParser().parse(policyInfo).getAsJsonArray();
-    if (jArry.size() > 0) {
-      JsonObject jsonObject = jArry.get(0).getAsJsonObject();
-      AccessPolicy policy = new RangerAccessPolicy(policyName);
-      policy.deserializePolicyFromJsonString(jsonObject);
-      return policy;
-    } else {
-      // Returns null when policyInfo is an empty array
-      return null;
-    }
-  }
-
-  @Override
-  public AccessPolicy getAccessPolicyById(String policyId)
-      throws IOException {
-    String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_POLICY_ID_HTTP_ENDPOINT +
-            policyId;
-
-    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl,
-        "GET", false);
-    String policyInfo = getResponseData(conn);
-    JsonArray jArry = new JsonParser().parse(policyInfo).getAsJsonArray();
-    JsonObject jsonObject = jArry.get(0).getAsJsonObject();
-    AccessPolicy policy =
-        new RangerAccessPolicy(jsonObject.get("name").getAsString());
-    policy.deserializePolicyFromJsonString(jsonObject);
-    return policy;
-  }
-
-  /**
-   * Returns the service ID for Ozone service in Ranger.
-   * TODO: Error handling when Ozone service doesn't exist in Ranger.
-   */
-  public int retrieveRangerOzoneServiceId() throws IOException {
-
-    String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_OZONE_SERVICE_ENDPOINT;
-    int id = 0;
-
-    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl,
-        "GET", false);
-    String sInfo = getResponseData(conn);
-    JsonObject jObject = new JsonParser().parse(sInfo).getAsJsonObject();
-    JsonArray jArry = jObject.getAsJsonArray("services");
-    for (int i = 0; i < jArry.size(); ++i) {
-      JsonObject serviceObj = jArry.get(i).getAsJsonObject();
-      String serviceName = serviceObj.get("type").getAsString();
-      if (!serviceName.equals("ozone")) {
-        continue;
-      }
-      id = serviceObj.get("id").getAsInt();
-    }
-    return id;
-  }
-
-  public long getLatestOzoneServiceVersion() throws IOException {
-
-    checkRangerOzoneServiceId();
-
-    String rangerAdminUrl = rangerHttpsAddress
-        + OZONE_OM_RANGER_OZONE_SERVICE_ENDPOINT + getRangerOzoneServiceId();
-
-    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl, "GET", false);
-    String sInfo = getResponseData(conn);
-    JsonObject jObject = new JsonParser().parse(sInfo).getAsJsonObject();
-    return jObject.get("policyVersion").getAsLong();
-  }
-
-  public String getIncrementalRangerChanges(long baseVersion)
-      throws IOException {
-    String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_DOWNLOAD_ENDPOINT + baseVersion;
-
-    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl, "GET", false);
-    String sInfo = getResponseData(conn);
-    return sInfo;
-  }
-
-  public String getAllMultiTenantPolicies() throws IOException {
-
-    checkRangerOzoneServiceId();
-
-    // Note: Ranger incremental policies API is broken. So we use policy label
-    // filter to get all Multi-Tenant policies.
-
-    String rangerAdminUrl = rangerHttpsAddress
-        + OZONE_OM_RANGER_ALL_POLICIES_ENDPOINT + getRangerOzoneServiceId()
-        + "?policyLabelsPartial=" + OZONE_TENANT_RANGER_POLICY_LABEL;
-
-    // Also note: policyLabels (not partial) arg doesn't seem to work for Ranger
-    // at this point. When Ranger fixed this we could use exact match instead,
-    // then we can remove the verification logic in
-    // loadAllPoliciesRolesFromRanger().
-
-    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl, "GET", false);
-    final String jsonStr = getResponseData(conn);
-
-    if (jsonStr == null) {
-      throw new IOException("Invalid response from " + rangerAdminUrl);
-    }
-
-    return jsonStr;
-  }
-
-  @Override
-  public MultiTenantAccessController getMultiTenantAccessController() {
-    return this.accessController;
-  }
-
-  public void deleteUser(String userId) throws IOException {
-
-    String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_DELETE_USER_HTTP_ENDPOINT
-            + userId + "?forceDelete=true";
-
-    HttpURLConnection conn = makeHttpCall(rangerAdminUrl, null,
-        "DELETE", false);
-    int respnseCode = conn.getResponseCode();
-    if (respnseCode != 200 && respnseCode != 204) {
-      throw new IOException("Couldn't delete user " + userId);
-    }
-  }
-
-  @Override
-  public void deleteRoleById(String roleId) throws IOException {
-
-    String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_DELETE_ROLE_HTTP_ENDPOINT
-            + roleId + "?forceDelete=true";
-
-    HttpURLConnection conn = makeHttpCall(rangerAdminUrl, null,
-        "DELETE", false);
-    int respnseCode = conn.getResponseCode();
-    if (respnseCode != 200 && respnseCode != 204) {
-      throw new IOException("Couldn't delete role " + roleId);
-    }
-  }
-
-  @Override
-  public void deleteRoleByName(String roleName) throws IOException {
-
-    String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT
-            + roleName + "?forceDelete=true";
-
-    HttpURLConnection conn = makeHttpCall(rangerAdminUrl, null,
-        "DELETE", false);
-    int respnseCode = conn.getResponseCode();
-    if (respnseCode != 200 && respnseCode != 204) {
-      throw new IOException("Couldn't delete role " + roleName);
-    }
-
-  }
-
-  @Override
-  public void deletePolicyByName(String policyName) throws IOException {
-    AccessPolicy policy = getAccessPolicyByName(policyName);
-    if (policy != null) {
-      String policyID = policy.getPolicyID();
-      LOG.debug("policyID is: {}", policyID);
-      deletePolicyById(policyID);
-    } else {
-      LOG.error("No such policy: {} was found!", policyName);
-    }
-  }
-
-  public void deletePolicyById(String policyId) throws IOException {
-
-    String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_DELETE_POLICY_HTTP_ENDPOINT
-            + policyId + "?forceDelete=true";
-    try {
-      HttpURLConnection conn = makeHttpCall(rangerAdminUrl, null,
-          "DELETE", false);
-      int respnseCode = conn.getResponseCode();
-      if (respnseCode != 200 && respnseCode != 204) {
-        throw new IOException("Couldn't delete policy " + policyId);
-      }
-    } catch (Exception e) {
-      throw new IOException("Couldn't delete policy " + policyId, e);
-    }
-  }
-
-  private String getResponseData(HttpURLConnection urlConnection)
-      throws IOException {
-    StringBuilder response = new StringBuilder();
-    try (BufferedReader br = new BufferedReader(
-        new InputStreamReader(urlConnection.getInputStream(),
-            StandardCharsets.UTF_8))) {
-      String responseLine;
-      while ((responseLine = br.readLine()) != null) {
-        response.append(responseLine.trim());
-      }
-      LOG.debug("Got response: {}", response);
-      // TODO: throw if urlConnection code is 400?
-    } catch (IOException e) {
-      // Common exceptions:
-      // 1. Server returned HTTP response code: 401
-      //   - Possibly incorrect Ranger credentials
-      // 2. Server returned HTTP response code: 400
-      //   - Policy or role does not exist
-      switch (urlConnection.getResponseCode()) {
-      case 400:
-        LOG.error("The policy or role likely does not exist in Ranger");
-        return null;
-      case 401:
-        LOG.error("Check Ranger credentials");
-//        break;
-      default:
-        e.printStackTrace();
-        throw e;
-      }
-    }
-    return response.toString();
-  }
-
-  private HttpURLConnection openURLConnection(URL url) throws IOException {
-    final HttpURLConnection urlConnection;
-    if (url.getProtocol().equals("https")) {
-      urlConnection = (HttpsURLConnection) url.openConnection();
-    } else if (url.getProtocol().equals("http")) {
-      urlConnection = (HttpURLConnection) url.openConnection();
-    } else {
-      throw new IOException("Unsupported protocol: " + url.getProtocol() +
-          "URL: " + url);
-    }
-    return urlConnection;
-  }
-
-  /**
-   * Can make either http or https request.
-   */
-  private HttpURLConnection makeHttpCall(String urlString,
-      String jsonInputString, String method, boolean isSpnego)
-      throws IOException {
-
-    URL url = new URL(urlString);
-    final HttpURLConnection urlConnection = openURLConnection(url);
-
-    urlConnection.setRequestMethod(method);
-    urlConnection.setConnectTimeout(connectionTimeout);
-    urlConnection.setReadTimeout(connectionRequestTimeout);
-    urlConnection.setRequestProperty("Accept", "application/json");
-    urlConnection.setRequestProperty("Authorization", authHeaderValue);
-
-    if ((jsonInputString != null) && !jsonInputString.isEmpty()) {
-      urlConnection.setDoOutput(true);
-      urlConnection.setRequestProperty("Content-Type", "application/json;");
-      try (OutputStream os = urlConnection.getOutputStream()) {
-        byte[] input = jsonInputString.getBytes(StandardCharsets.UTF_8);
-        os.write(input, 0, input.length);
-        os.flush();
-      }
-    }
-
-    return urlConnection;
-  }
-
-  /**
-   * Can make either http or https request.
-   */
-  private HttpURLConnection makeHttpGetCall(String urlString,
-      String method, boolean isSpnego) throws IOException {
-
-    URL url = new URL(urlString);
-    final HttpURLConnection urlConnection = openURLConnection(url);
-
-    urlConnection.setRequestMethod(method);
-    urlConnection.setConnectTimeout(connectionTimeout);
-    urlConnection.setReadTimeout(connectionRequestTimeout);
-    urlConnection.setRequestProperty("Accept", "application/json");
-    urlConnection.setRequestProperty("Authorization", authHeaderValue);
-
-    return urlConnection;
-  }
-}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java
index 444907c598..9beb074c0a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.om.multitenant;
 
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumMap;
@@ -40,36 +41,38 @@ public interface MultiTenantAccessController {
    *
    * Roles defined in this policy that do not already exist will be created.
    */
-  void createPolicy(Policy policy) throws Exception;
+  Policy createPolicy(Policy policy) throws IOException;
 
-  Policy getPolicy(String policyName) throws Exception;
+  Policy getPolicy(String policyName) throws IOException;
 
-  List<Policy> getLabeledPolicies(String label) throws Exception;
+  List<Policy> getLabeledPolicies(String label) throws IOException;
 
-  void updatePolicy(Policy policy) throws Exception;
+  Policy updatePolicy(Policy policy) throws IOException;
 
-  void deletePolicy(String policyName) throws Exception;
+  void deletePolicy(String policyName) throws IOException;
 
   /**
    * This operation will fail if a role with the same name already exists.
+   *
+   * @return Role ID returned from remote server.
    */
-  void createRole(Role role) throws Exception;
+  Role createRole(Role role) throws IOException;
 
-  Role getRole(String roleName) throws Exception;
+  Role getRole(String roleName) throws IOException;
 
   /**
-   * Replaces the role given by {@code roleID} with the contents of {@code
-   * role}. If {@code roleID} does not correspond to a role, an exception is
+   * Replaces the role given by {@code roleId} with the contents of {@code
+   * role}. If {@code roleId} does not correspond to a role, an exception is
    * thrown.
    *
-   * The roleID of a given role can be retrieved from the {@code getRole}
+   * The roleId of a given role can be retrieved from the {@code getRole}
    * method.
    */
-  void updateRole(long roleID, Role role) throws Exception;
+  Role updateRole(long roleId, Role role) throws IOException;
 
-  void deleteRole(String roleName) throws Exception;
+  void deleteRole(String roleName) throws IOException;
 
-  long getRangerServiceVersion() throws Exception;
+  long getRangerServicePolicyVersion() throws IOException;
 
   static Map<IAccessAuthorizer.ACLType, String> getRangerAclStrings() {
     Map<IAccessAuthorizer.ACLType, String> rangerAclStrings =
@@ -137,31 +140,39 @@ public interface MultiTenantAccessController {
    */
   class Role {
     private final String name;
-    private final Set<String> users;
+    private final Map<String, Boolean> usersMap;
+    private final Map<String, Boolean> rolesMap;
     private final String description;
-    private final Long roleID;
+    private final Long id;
+    private final String createdByUser;
 
     private Role(Builder builder) {
-      name = builder.name;
-      users = builder.users;
-      description = builder.description;
-      roleID = builder.roleID;
+      this.name = builder.name;
+      this.usersMap = builder.usersMap;
+      this.rolesMap = builder.rolesMap;
+      this.description = builder.description;
+      this.id = builder.id;
+      this.createdByUser = builder.createdByUser;
     }
 
     public String getName() {
       return name;
     }
 
-    public Set<String> getUsers() {
-      return users;
+    public Map<String, Boolean> getUsersMap() {
+      return usersMap;
+    }
+
+    public Map<String, Boolean> getRolesMap() {
+      return rolesMap;
     }
 
     public Optional<String> getDescription() {
       return Optional.ofNullable(description);
     }
 
-    public Optional<Long> getRoleID() {
-      return Optional.ofNullable(roleID);
+    public Optional<Long> getId() {
+      return Optional.ofNullable(id);
     }
 
     @Override
@@ -181,14 +192,18 @@ public interface MultiTenantAccessController {
       // If one role does not have the ID set, still consider them equal.
       // Role ID may not be set if the policy is being sent to Ranger for
       // creation, but will be set if the same policy is retrieved from Ranger.
-      boolean roleIDsMatch = true;
-      if (getRoleID().isPresent() && role.getRoleID().isPresent()) {
-        roleIDsMatch = getRoleID().equals(role.getRoleID());
+      boolean roleIdsMatch = true;
+      if (getId().isPresent() && role.getId().isPresent()) {
+        roleIdsMatch = getId().equals(role.getId());
       }
       return Objects.equals(getName(), role.getName()) &&
-          Objects.equals(getUsers(), role.getUsers()) &&
+          Objects.equals(getUsersMap(), role.getUsersMap()) &&
           Objects.equals(getDescription(), role.getDescription()) &&
-          roleIDsMatch;
+          roleIdsMatch;
+    }
+
+    public String getCreatedByUser() {
+      return createdByUser;
     }
 
     /**
@@ -196,19 +211,26 @@ public interface MultiTenantAccessController {
      */
     public static final class Builder {
       private String name;
-      private final Set<String> users;
+      // userName -> isRoleAdmin
+      private final Map<String, Boolean> usersMap;
+      // roleName -> isRoleAdmin
+      private final Map<String, Boolean> rolesMap;
       private String description;
-      private Long roleID;
+      private Long id;
+      private String createdByUser;
 
       public Builder() {
-        this.users = new HashSet<>();
+        this.usersMap = new HashMap<>();
+        this.rolesMap = new HashMap<>();
       }
 
       public Builder(Role other) {
         this.name = other.getName();
-        this.users = new HashSet<>(other.getUsers());
+        this.usersMap = new HashMap<>(other.getUsersMap());
+        this.rolesMap = new HashMap<>(other.getRolesMap());
         other.getDescription().ifPresent(desc -> this.description = desc);
-        other.getRoleID().ifPresent(id -> this.roleID = id);
+        other.getId().ifPresent(roleId -> this.id = roleId);
+        this.createdByUser = other.getCreatedByUser();
       }
 
       public Builder setName(String roleName) {
@@ -216,13 +238,56 @@ public interface MultiTenantAccessController {
         return this;
       }
 
-      public Builder addUser(String user) {
-        this.users.add(user);
+      /**
+       * Add one user to this role.
+       */
+      public Builder addUser(String userName, boolean isRoleAdmin) {
+        this.usersMap.put(userName, isRoleAdmin);
+        return this;
+      }
+
+      /**
+       * Add a list of users as role non-admins.
+       */
+      public Builder addUsers(Collection<String> userNamesList) {
+        userNamesList.forEach(userName -> this.usersMap.put(userName, false));
+        return this;
+      }
+
+      /**
+       * Merge with another users map.
+       */
+      public Builder addUsersMap(Map<String, Boolean> userNamesList) {
+        this.usersMap.putAll(userNamesList);
+        return this;
+      }
+
+      public Builder removeUser(String userName) {
+        this.usersMap.remove(userName);
+        return this;
+      }
+
+      /**
+       * Clear users map.
+       */
+      public Builder clearUsers() {
+        this.usersMap.clear();
         return this;
       }
 
-      public Builder addUsers(Collection<String> roleUsers) {
-        this.users.addAll(roleUsers);
+      /**
+       * Add one other role to this role.
+       */
+      public Builder addRole(String roleName, boolean isRoleAdmin) {
+        this.rolesMap.put(roleName, isRoleAdmin);
+        return this;
+      }
+
+      /**
+       * Add a list of other roles as role non-admins.
+       */
+      public Builder addRoles(Collection<String> roleNamesList) {
+        roleNamesList.forEach(userName -> this.rolesMap.put(userName, false));
         return this;
       }
 
@@ -232,7 +297,12 @@ public interface MultiTenantAccessController {
       }
 
       public Builder setID(long roleId) {
-        this.roleID = roleId;
+        this.id = roleId;
+        return this;
+      }
+
+      public Builder setCreatedByUser(String createdByUser) {
+        this.createdByUser = createdByUser;
         return this;
       }
 
@@ -246,24 +316,27 @@ public interface MultiTenantAccessController {
    * Define a policy to be created.
    */
   class Policy {
+    private final long id;
     private final String name;
     private final Set<String> volumes;
     private final Set<String> buckets;
     private final Set<String> keys;
     private final String description;
-    private final Map<String, Collection<Acl>> roleAcls;
+    private final Map<String, Collection<Acl>> userAcls, roleAcls;
     private final Set<String> labels;
     private final boolean isEnabled;
 
     private Policy(Builder builder) {
-      name = builder.name;
-      volumes = builder.volumes;
-      buckets = builder.buckets;
-      keys = builder.keys;
-      description = builder.description;
-      roleAcls = builder.roleAcls;
-      labels = builder.labels;
-      isEnabled = builder.isEnabled;
+      this.id = builder.id;
+      this.name = builder.name;
+      this.volumes = builder.volumes;
+      this.buckets = builder.buckets;
+      this.keys = builder.keys;
+      this.description = builder.description;
+      this.userAcls = builder.userAcls;
+      this.roleAcls = builder.roleAcls;
+      this.labels = builder.labels;
+      this.isEnabled = builder.isEnabled;
     }
 
     public Set<String> getVolumes() {
@@ -278,6 +351,10 @@ public interface MultiTenantAccessController {
       return keys;
     }
 
+    public long getId() {
+      return id;
+    }
+
     public String getName() {
       return name;
     }
@@ -290,6 +367,10 @@ public interface MultiTenantAccessController {
       return (labels);
     }
 
+    public Map<String, Collection<Acl>> getUserAcls() {
+      return userAcls;
+    }
+
     public Map<String, Collection<Acl>> getRoleAcls() {
       return roleAcls;
     }
@@ -313,6 +394,7 @@ public interface MultiTenantAccessController {
           Objects.equals(getBuckets(), policy.getBuckets()) &&
           Objects.equals(getKeys(), policy.getKeys()) &&
           Objects.equals(getDescription(), policy.getDescription()) &&
+          Objects.equals(getUserAcls(), policy.getUserAcls()) &&
           Objects.equals(getRoleAcls(), policy.getRoleAcls()) &&
           Objects.equals(getLabels(), policy.getLabels());
     }
@@ -325,12 +407,13 @@ public interface MultiTenantAccessController {
      * Builder class for a policy.
      */
     public static final class Builder {
+      private long id;
       private String name;
       private final Set<String> volumes;
       private final Set<String> buckets;
       private final Set<String> keys;
       private String description;
-      private final Map<String, Collection<Acl>> roleAcls;
+      private final Map<String, Collection<Acl>> userAcls, roleAcls;
       private final Set<String> labels;
       private boolean isEnabled;
 
@@ -338,10 +421,16 @@ public interface MultiTenantAccessController {
         this.volumes = new HashSet<>();
         this.buckets = new HashSet<>();
         this.keys = new HashSet<>();
+        this.userAcls = new HashMap<>();
         this.roleAcls = new HashMap<>();
         this.labels = new HashSet<>();
       }
 
+      public Builder setId(Long policyId) {
+        this.id = policyId;
+        return this;
+      }
+
       public Builder setName(String policyName) {
         this.name = policyName;
         return this;
@@ -387,6 +476,11 @@ public interface MultiTenantAccessController {
         return this;
       }
 
+      public Builder addUserAcl(String userName, Collection<Acl> acls) {
+        this.userAcls.put(userName, new ArrayList<>(acls));
+        return this;
+      }
+
       public Builder addRoleAcl(String roleName, Collection<Acl> acls) {
         this.roleAcls.put(roleName, new ArrayList<>(acls));
         return this;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java
index 047cf6d27f..1ba312fffe 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -49,6 +50,8 @@ import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Policy;
+import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Role;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetRangerServiceVersionRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
@@ -58,11 +61,9 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.gson.JsonArray;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonParser;
-
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_TENANT_RANGER_POLICY_LABEL;
+import static org.apache.hadoop.ozone.om.OMMultiTenantManager.OZONE_TENANT_RANGER_ROLE_DESCRIPTION;
+import static org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl.OZONE_OM_TENANT_DEV_SKIP_RANGER;
 
 /**
  * Background Sync thread that reads Multi-Tenancy state from OM DB
@@ -89,7 +90,7 @@ public class OMRangerBGSyncService extends BackgroundService {
   private final OzoneManager ozoneManager;
   private final OMMetadataManager metadataManager;
   private final OMMultiTenantManager multiTenantManager;
-  private final MultiTenantAccessAuthorizer authorizer;
+  private final MultiTenantAccessController accessController;
   private final AuthorizerLock authorizerLock;
 
   // Maximum number of attempts for each sync run
@@ -200,7 +201,7 @@ public class OMRangerBGSyncService extends BackgroundService {
 
   public OMRangerBGSyncService(OzoneManager ozoneManager,
       OMMultiTenantManager omMultiTenantManager,
-      MultiTenantAccessAuthorizer authorizer,
+      MultiTenantAccessController accessController,
       long interval, TimeUnit unit, long serviceTimeout) {
 
     super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
@@ -212,12 +213,13 @@ public class OMRangerBGSyncService extends BackgroundService {
     this.multiTenantManager = omMultiTenantManager;
     this.authorizerLock = omMultiTenantManager.getAuthorizerLock();
 
-    if (authorizer != null) {
-      this.authorizer = authorizer;
+    if (accessController != null) {
+      this.accessController = accessController;
     } else {
       // authorizer can be null for unit tests
-      LOG.warn("MultiTenantAccessAuthorizer not set. Using dummy authorizer");
-      this.authorizer = new MultiTenantAccessAuthorizerDummyPlugin();
+      LOG.warn("MultiTenantAccessController not set. "
+          + "Using in-memory controller.");
+      this.accessController = new InMemoryMultiTenantAccessController();
     }
   }
 
@@ -230,7 +232,7 @@ public class OMRangerBGSyncService extends BackgroundService {
 
   @Override
   public void start() {
-    if (authorizer == null) {
+    if (accessController == null) {
       LOG.error("Failed to start the background sync service: "
           + "null authorizer. Please check OM configuration. Aborting");
       return;
@@ -276,8 +278,7 @@ public class OMRangerBGSyncService extends BackgroundService {
       if (shouldRun()) {
         final long count = runCount.incrementAndGet();
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Initiating Ranger Multi-Tenancy Ranger Sync: run # {}",
-              count);
+          LOG.debug("Initiating Multi-Tenancy Ranger Sync: run # {}", count);
         }
         triggerRangerSyncOnce();
       }
@@ -286,10 +287,14 @@ public class OMRangerBGSyncService extends BackgroundService {
     }
   }
 
-  private void triggerRangerSyncOnce() {
+  /**
+   * Trigger the sync once.
+   * @return true if completed successfully, false if any exception is thrown.
+   */
+  public synchronized boolean triggerRangerSyncOnce() {
     try {
       long dbOzoneServiceVersion = getOMDBRangerServiceVersion();
-      long rangerOzoneServiceVersion = getLatestRangerServiceVersion();
+      long rangerOzoneServiceVersion = getRangerOzoneServicePolicyVersion();
 
       // Sync thread enters the while-loop when Ranger service (e.g. cm_ozone)
       // version doesn't match the current service version persisted in the DB.
@@ -337,21 +342,28 @@ public class OMRangerBGSyncService extends BackgroundService {
 
         // Check Ranger Ozone service version again
         dbOzoneServiceVersion = rangerOzoneServiceVersion;
-        rangerOzoneServiceVersion = getLatestRangerServiceVersion();
+        rangerOzoneServiceVersion = getRangerOzoneServicePolicyVersion();
       }
     } catch (IOException | ServiceException e) {
       LOG.warn("Exception during Ranger Sync", e);
       // TODO: Check for specific exception once switched to
       //  RangerRestMultiTenantAccessController
+      return false;
     }
 
+    return true;
   }
 
   /**
    * Query Ranger endpoint to get the latest Ozone service version.
    */
-  long getLatestRangerServiceVersion() throws IOException {
-    return authorizer.getLatestOzoneServiceVersion();
+  long getRangerOzoneServicePolicyVersion() throws IOException {
+    long policyVersion = accessController.getRangerServicePolicyVersion();
+    if (policyVersion < 0L) {
+      LOG.warn("Unable to get valid policyVersion for Ranger background sync "
+          + "to function properly. Please check if ");
+    }
+    return policyVersion;
   }
 
   private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
@@ -432,9 +444,12 @@ public class OMRangerBGSyncService extends BackgroundService {
     mtOMDBRoles.clear();
   }
 
-  /**
-   * TODO: Test and make sure invalid JSON response from Ranger won't crash OM.
-   */
+  private List<Policy> getAllMultiTenantPolicies() throws IOException {
+
+    return accessController.getLabeledPolicies(
+        OZONE_TENANT_RANGER_POLICY_LABEL);
+  }
+
   private void loadAllPoliciesAndRoleNamesFromRanger(long baseVersion)
       throws IOException {
 
@@ -442,56 +457,47 @@ public class OMRangerBGSyncService extends BackgroundService {
       LOG.debug("baseVersion is {}", baseVersion);
     }
 
-    String allPolicies = authorizer.getAllMultiTenantPolicies();
-    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
-    JsonArray policies = jObject.getAsJsonArray("policies");
-    if (policies == null) {
-      LOG.warn("No Ranger policy received!");
+    final List<Policy> allPolicies = getAllMultiTenantPolicies();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received policies with {} label: {}",
+          OZONE_TENANT_RANGER_POLICY_LABEL, allPolicies);
+    }
+
+    if (allPolicies.isEmpty()) {
+      // This is normal only if no tenant is created yet.
+      LOG.info("No Ranger policy with label {} received.",
+          OZONE_TENANT_RANGER_POLICY_LABEL);
       return;
     }
-    for (int i = 0; i < policies.size(); ++i) {
-      JsonObject policy = policies.get(i).getAsJsonObject();
-      JsonArray policyLabels = policy.getAsJsonArray("policyLabels");
 
-      // Verify that the policy has the OzoneTenant label
-      boolean hasOzoneTenantLabel = false;
-      // Loop just in case multiple labels are attached to the tenant policy
-      for (int j = 0; j < policyLabels.size(); j++) {
-        final String currentLabel = policyLabels.get(j).getAsString();
-        // Look for exact match
-        if (currentLabel.equals(OZONE_TENANT_RANGER_POLICY_LABEL)) {
-          hasOzoneTenantLabel = true;
-          break;
-        }
-      }
+    for (Policy policy : allPolicies) {
 
-      if (!hasOzoneTenantLabel) {
-        // Shouldn't get policies without the label often as it is
-        // specified in the query param, unless a user removed the tag during
-        // the sync
+      // Verify that the policy has the OzoneTenant label
+      if (!policy.getLabels().contains(OZONE_TENANT_RANGER_POLICY_LABEL)) {
+        // Shouldn't get policies without the label very often as it is
+        // specified in the query param, unless someone removed the tag during
+        // the get all policies request.
         LOG.warn("Ignoring Ranger policy without the {} label: {}",
-            OZONE_TENANT_RANGER_POLICY_LABEL, policy.get("name").getAsString());
+            OZONE_TENANT_RANGER_POLICY_LABEL, policy);
         continue;
       }
 
       // Temporarily put the policy in the to-delete list,
       // valid entries will be removed later
-      mtRangerPoliciesToBeDeleted.put(
-          policy.get("name").getAsString(),
-          policy.get("id").getAsString());
-
-      final JsonArray policyItems = policy.getAsJsonArray("policyItems");
-      for (int j = 0; j < policyItems.size(); ++j) {
-        JsonObject policyItem = policyItems.get(j).getAsJsonObject();
-        JsonArray roles = policyItem.getAsJsonArray("roles");
-        for (int k = 0; k < roles.size(); ++k) {
-          if (!mtRangerRoles.containsKey(roles.get(k).getAsString())) {
-            // We only get the role name here. We need to query and populate it.
-            mtRangerRoles.put(roles.get(k).getAsString(),
-                new BGRole(roles.get(k).getAsString()));
-          }
+      mtRangerPoliciesToBeDeleted.put(policy.getName(),
+          String.valueOf(policy.getId()));
+
+      // We don't need to care about policy.getUserAcls() in the sync as
+      // we only uses special {OWNER} user, at least for now.
+
+      // Iterate through all the roles in this policy
+      policy.getRoleAcls().keySet().forEach(roleName -> {
+        if (!mtRangerRoles.containsKey(roleName)) {
+          // We only got role name here. Will check users in the roles later.
+          mtRangerRoles.put(roleName, new BGRole(roleName));
         }
-      }
+      });
     }
 
   }
@@ -504,6 +510,13 @@ public class OMRangerBGSyncService extends BackgroundService {
    * Note: EACH Ranger request can take 3-7 seconds as tested in UT.
    */
   private void checkLeader() throws IOException {
+    if (ozoneManager.getConfiguration().getBoolean(
+        OZONE_OM_TENANT_DEV_SKIP_RANGER, false)) {
+      // Skip leader check if the test flag is set, used in TestOzoneTenantShell
+      // TODO: Find a proper fix in MiniOzoneCluster to pass
+      //  ozoneManager.isLeaderReady() check?
+      return;
+    }
     if (!ozoneManager.isLeaderReady()) {
       throw new OMNotLeaderException("This OM is no longer the leader. Abort");
     }
@@ -513,15 +526,11 @@ public class OMRangerBGSyncService extends BackgroundService {
     for (Map.Entry<String, BGRole> entry: mtRangerRoles.entrySet()) {
       final String roleName = entry.getKey();
       checkLeader();
-      final String roleDataString = authorizer.getRole(roleName);
-      final JsonObject roleObject =
-          new JsonParser().parse(roleDataString).getAsJsonObject();
-      final BGRole role = entry.getValue();
-      role.setId(roleObject.get("id").getAsString());
-      final JsonArray userArray = roleObject.getAsJsonArray("users");
-      for (int i = 0; i < userArray.size(); ++i) {
-        role.addUserPrincipal(userArray.get(i).getAsJsonObject().get("name")
-            .getAsString());
+      final Role role = accessController.getRole(roleName);
+      final BGRole bgRole = entry.getValue();
+      bgRole.setId(role.getName());
+      for (String username : role.getUsersMap().keySet()) {
+        bgRole.addUserPrincipal(username);
       }
     }
   }
@@ -609,7 +618,7 @@ public class OMRangerBGSyncService extends BackgroundService {
       checkLeader();
       withWriteLock(() -> {
         try {
-          authorizer.deletePolicyByName(policyName);
+          accessController.deletePolicy(policyName);
         } catch (IOException e) {
           LOG.error("Failed to delete policy: {}", policyName, e);
           // Proceed to delete other policies
@@ -632,7 +641,7 @@ public class OMRangerBGSyncService extends BackgroundService {
     final String userRoleName =
         multiTenantManager.getTenantUserRoleName(tenantId);
 
-    final AccessPolicy accessPolicy;
+    final Policy accessPolicy;
 
     switch (policyInfo.getPolicyType()) {
     case BUCKET_NAMESPACE_POLICY:
@@ -641,16 +650,15 @@ public class OMRangerBGSyncService extends BackgroundService {
       final String adminRoleName =
           multiTenantManager.getTenantAdminRoleName(tenantId);
 
-      accessPolicy = multiTenantManager.newDefaultVolumeAccessPolicy(volumeName,
-          new OzoneTenantRolePrincipal(userRoleName),
-          new OzoneTenantRolePrincipal(adminRoleName));
+      accessPolicy = OMMultiTenantManager.getDefaultVolumeAccessPolicy(
+          tenantId, volumeName, userRoleName, adminRoleName);
       break;
 
     case BUCKET_POLICY:
       LOG.info("Recovering BucketAccess policy for tenant: {}", tenantId);
 
-      accessPolicy = multiTenantManager.newDefaultBucketAccessPolicy(volumeName,
-              new OzoneTenantRolePrincipal(userRoleName));
+      accessPolicy = OMMultiTenantManager.getDefaultBucketAccessPolicy(
+          tenantId, volumeName, userRoleName);
       break;
 
     default:
@@ -660,8 +668,8 @@ public class OMRangerBGSyncService extends BackgroundService {
 
     withWriteLock(() -> {
       try {
-        final String id = authorizer.createAccessPolicy(accessPolicy);
-        LOG.info("Created policy. Policy ID: {}", id);
+        final Policy policy = accessController.createPolicy(accessPolicy);
+        LOG.info("Created policy: {}", policy);
       } catch (IOException e) {
         LOG.error("Failed to create policy: {}", accessPolicy, e);
       }
@@ -778,7 +786,11 @@ public class OMRangerBGSyncService extends BackgroundService {
         checkLeader();
         withWriteLock(() -> {
           try {
-            authorizer.createRole(roleName, null);
+            Role role = new Role.Builder()
+                .setName(roleName)
+                .setDescription(OZONE_TENANT_RANGER_ROLE_DESCRIPTION)
+                .build();
+            accessController.createRole(role);
           } catch (IOException e) {
             // Tolerate create role failure, possibly due to role already exists
             LOG.error("Failed to create role: {}", roleName, e);
@@ -806,9 +818,7 @@ public class OMRangerBGSyncService extends BackgroundService {
       checkLeader();
       withWriteLock(() -> {
         try {
-          final String roleObj = authorizer.getRole(roleName);
-          authorizer.deleteRoleById(new JsonParser().parse(roleObj)
-              .getAsJsonObject().get("id").getAsString());
+          accessController.deleteRole(roleName);
         } catch (IOException e) {
           // The role might have been deleted already.
           // Or the role could be referenced in other roles or policies.
@@ -824,8 +834,20 @@ public class OMRangerBGSyncService extends BackgroundService {
     final HashSet<String> omDBUserList = mtOMDBRoles.get(roleName);
     withWriteLock(() -> {
       try {
-        String roleJsonStr = authorizer.getRole(roleName);
-        authorizer.assignAllUsers(omDBUserList, roleJsonStr);
+        Role existingRole = accessController.getRole(roleName);
+        if (!existingRole.getId().isPresent()) {
+          // Should not happen. getRole() would have thrown exception if
+          // role doesn't exist in Ranger.
+          LOG.error("Role doesn't have ID: {}", existingRole);
+          return;
+        }
+        long roleId = existingRole.getId().get();
+        Role newRole = new Role.Builder(existingRole)
+            .clearUsers()
+            .addUsers(omDBUserList)
+            .build();
+        // TODO: Double check result
+        accessController.updateRole(roleId, newRole);
       } catch (IOException e) {
         LOG.error("Failed to update role: {}, target user list: {}",
             roleName, omDBUserList, e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerClientMultiTenantAccessController.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerClientMultiTenantAccessController.java
index 8dab66e695..cb0af7831d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerClientMultiTenantAccessController.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerClientMultiTenantAccessController.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.ozone.om.multitenant;
 
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_SERVICE;
-import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,9 +40,11 @@ import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ranger.RangerServiceException;
 import org.apache.ranger.plugin.model.RangerPolicy;
 import org.apache.ranger.plugin.model.RangerRole;
+import org.apache.ranger.plugin.model.RangerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,59 +60,142 @@ public class RangerClientMultiTenantAccessController implements
   private static final Logger LOG = LoggerFactory
       .getLogger(RangerClientMultiTenantAccessController.class);
 
+  private static final int HTTP_STATUS_CODE_UNAUTHORIZED = 401;
+  private static final int HTTP_STATUS_CODE_BAD_REQUEST = 400;
+
   private final RangerClient client;
   private final String rangerServiceName;
   private final Map<IAccessAuthorizer.ACLType, String> aclToString;
   private final Map<String, IAccessAuthorizer.ACLType> stringToAcl;
   private final String omPrincipal;
+  // execUser for Ranger
+  private final String shortName;
 
   public RangerClientMultiTenantAccessController(OzoneConfiguration conf)
       throws IOException {
+
     aclToString = MultiTenantAccessController.getRangerAclStrings();
     stringToAcl = new HashMap<>();
     aclToString.forEach((type, string) -> stringToAcl.put(string, type));
 
-    // Should have passed the check in OMMultiTenantManager
+    // Should have passed the config checks in
+    // OMMultiTenantManager#checkAndEnableMultiTenancy at this point.
+
     String rangerHttpsAddress = conf.get(OZONE_RANGER_HTTPS_ADDRESS_KEY);
     Preconditions.checkNotNull(rangerHttpsAddress);
     rangerServiceName = conf.get(OZONE_RANGER_SERVICE);
     Preconditions.checkNotNull(rangerServiceName);
 
-    String configuredOmPrincipal = conf.get(OZONE_OM_KERBEROS_PRINCIPAL_KEY);
-    Preconditions.checkNotNull(configuredOmPrincipal);
-    // Replace _HOST pattern with host name in the Kerberos principal. Ranger
-    // client currently does not do this automatically.
-    omPrincipal = SecurityUtil.getServerPrincipal(
-        configuredOmPrincipal, OmUtils.getOmAddress(conf).getHostName());
-    String keytabPath = conf.get(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY);
-    Preconditions.checkNotNull(keytabPath);
+    // Determine auth type (KERBEROS or SIMPLE)
+    final String authType;
+    final String usernameOrPrincipal;
+    final String passwordOrKeytab;
+
+    // If both OZONE_OM_RANGER_HTTPS_ADMIN_API_USER and
+    //  OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD are set, SIMPLE auth will be used
+    String fallbackUsername = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER);
+    String fallbackPassword = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD);
+
+    if (fallbackUsername != null && fallbackPassword != null) {
+      // Both clear text username and password are set, use SIMPLE auth.
+      authType = AuthenticationMethod.SIMPLE.name();
+
+      usernameOrPrincipal = fallbackUsername;
+      passwordOrKeytab = fallbackPassword;
+
+      omPrincipal = fallbackUsername;
+      shortName = fallbackUsername;
+    } else {
+      // Use KERBEROS auth.
+      authType = AuthenticationMethod.KERBEROS.name();
+
+      String configuredOmPrincipal = conf.get(OZONE_OM_KERBEROS_PRINCIPAL_KEY);
+      Preconditions.checkNotNull(configuredOmPrincipal);
+
+      // Replace _HOST pattern with host name in the Kerberos principal.
+      // Ranger client currently does not do this automatically.
+      omPrincipal = SecurityUtil.getServerPrincipal(
+          configuredOmPrincipal, OmUtils.getOmAddress(conf).getHostName());
+      final String keytabPath = conf.get(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY);
+      Preconditions.checkNotNull(keytabPath);
+
+      // Convert to short name to be used in some Ranger requests
+      shortName = UserGroupInformation.createRemoteUser(omPrincipal)
+          .getShortUserName();
+
+      usernameOrPrincipal = omPrincipal;
+      passwordOrKeytab = keytabPath;
+    }
+
+    LOG.info("authType = {}, login user = {}", authType, usernameOrPrincipal);
 
     client = new RangerClient(rangerHttpsAddress,
-        KERBEROS.name().toLowerCase(), omPrincipal, keytabPath,
+        authType, usernameOrPrincipal, passwordOrKeytab,
         rangerServiceName, OzoneConsts.OZONE);
+
+    // Whether or not the Ranger credentials are valid is unknown right after
+    // RangerClient initialization here. Because RangerClient does not perform
+    // any authentication at this point just yet.
+    //
+    // If the credentials are invalid, RangerClient later throws 401 in every
+    // single request to Ranger.
+  }
+
+  /**
+   * Check StatusCode from RangerServiceException and try to log helpful,
+   * actionable messages.
+   *
+   * @param rse RangerServiceException
+   */
+  private void decodeRSEStatusCodes(RangerServiceException rse) {
+
+    switch (rse.getStatus().getStatusCode()) {
+    case HTTP_STATUS_CODE_UNAUTHORIZED:
+      LOG.error("Auth failure. Please double check Ranger-related configs");
+      break;
+    case HTTP_STATUS_CODE_BAD_REQUEST:
+      LOG.error("Request failure. If this is an assign-user operation, "
+          + "check if the user name exists in Ranger.");
+      break;
+    default:
+      LOG.error("Other request failure: {}", rse.getStatus());
+    }
   }
 
   @Override
-  public void createPolicy(Policy policy) throws RangerServiceException {
+  public Policy createPolicy(Policy policy) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending create request for policy {} to Ranger.",
           policy.getName());
     }
-    client.createPolicy(toRangerPolicy(policy));
+    RangerPolicy rangerPolicy;
+    try {
+      rangerPolicy = client.createPolicy(toRangerPolicy(policy));
+    } catch (RangerServiceException e) {
+      decodeRSEStatusCodes(e);
+      throw new IOException(e);
+    }
+    return fromRangerPolicy(rangerPolicy);
   }
 
   @Override
-  public Policy getPolicy(String policyName) throws RangerServiceException {
+  public Policy getPolicy(String policyName) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending get request for policy {} to Ranger.",
           policyName);
     }
-    return fromRangerPolicy(client.getPolicy(rangerServiceName, policyName));
+    final RangerPolicy rangerPolicy;
+    try {
+      rangerPolicy = client.getPolicy(rangerServiceName, policyName);
+    } catch (RangerServiceException e) {
+      decodeRSEStatusCodes(e);
+      throw new IOException(e);
+    }
+    return fromRangerPolicy(rangerPolicy);
   }
 
   @Override
-  public List<Policy> getLabeledPolicies(String label)
-      throws RangerServiceException {
+  public List<Policy> getLabeledPolicies(String label) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending get request for policies with label {} to Ranger.",
           label);
@@ -116,78 +203,135 @@ public class RangerClientMultiTenantAccessController implements
     Map<String, String> filterMap = new HashMap<>();
     filterMap.put("serviceName", rangerServiceName);
     filterMap.put("policyLabelsPartial", label);
-    return client.findPolicies(filterMap).stream()
-        .map(this::fromRangerPolicy)
-        .collect(Collectors.toList());
+    try {
+      return client.findPolicies(filterMap).stream()
+          .map(this::fromRangerPolicy)
+          .collect(Collectors.toList());
+    } catch (RangerServiceException e) {
+      decodeRSEStatusCodes(e);
+      throw new IOException(e);
+    }
   }
 
   @Override
-  public void updatePolicy(Policy policy) throws RangerServiceException {
+  public Policy updatePolicy(Policy policy) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending update request for policy {} to Ranger.",
           policy.getName());
     }
-    client.updatePolicy(rangerServiceName, policy.getName(),
-        toRangerPolicy(policy));
+    final RangerPolicy rangerPolicy;
+    try {
+      rangerPolicy = client.updatePolicy(rangerServiceName,
+          policy.getName(), toRangerPolicy(policy));
+    } catch (RangerServiceException e) {
+      decodeRSEStatusCodes(e);
+      throw new IOException(e);
+    }
+    return fromRangerPolicy(rangerPolicy);
   }
 
   @Override
-  public void deletePolicy(String policyName) throws RangerServiceException {
+  public void deletePolicy(String policyName) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending delete request for policy {} to Ranger.",
           policyName);
     }
-    client.deletePolicy(rangerServiceName, policyName);
+    try {
+      client.deletePolicy(rangerServiceName, policyName);
+    } catch (RangerServiceException e) {
+      decodeRSEStatusCodes(e);
+      throw new IOException(e);
+    }
   }
 
   @Override
-  public void createRole(Role role) throws RangerServiceException {
+  public Role createRole(Role role) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending create request for role {} to Ranger.",
           role.getName());
     }
-    client.createRole(rangerServiceName, toRangerRole(role));
+    final RangerRole rangerRole;
+    try {
+      rangerRole = client.createRole(rangerServiceName,
+          toRangerRole(role, shortName));
+    } catch (RangerServiceException e) {
+      decodeRSEStatusCodes(e);
+      throw new IOException(e);
+    }
+    return fromRangerRole(rangerRole);
   }
 
   @Override
-  public Role getRole(String roleName) throws RangerServiceException {
+  public Role getRole(String roleName) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending get request for role {} to Ranger.",
           roleName);
     }
-    return fromRangerRole(client.getRole(roleName, omPrincipal,
-        rangerServiceName));
+    final RangerRole rangerRole;
+    try {
+      rangerRole = client.getRole(roleName, shortName, rangerServiceName);
+    } catch (RangerServiceException e) {
+      decodeRSEStatusCodes(e);
+      throw new IOException(e);
+    }
+    return fromRangerRole(rangerRole);
   }
 
   @Override
-  public void updateRole(long roleID, Role role) throws RangerServiceException {
+  public Role updateRole(long roleId, Role role) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending update request for role ID {} to Ranger.",
-          roleID);
+          roleId);
+    }
+    // TODO: Check if createdByUser is even needed for updateRole request.
+    //  If not, remove the createdByUser param and set it after.
+    final RangerRole rangerRole;
+    try {
+      rangerRole = client.updateRole(roleId, toRangerRole(role, shortName));
+    } catch (RangerServiceException e) {
+      decodeRSEStatusCodes(e);
+      throw new IOException(e);
     }
-    client.updateRole(roleID, toRangerRole(role));
+    return fromRangerRole(rangerRole);
   }
 
   @Override
-  public void deleteRole(String roleName) throws RangerServiceException {
+  public void deleteRole(String roleName) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Sending delete request for role {} to Ranger.",
           roleName);
     }
-    client.deleteRole(roleName, omPrincipal, rangerServiceName);
+    try {
+      client.deleteRole(roleName, shortName, rangerServiceName);
+    } catch (RangerServiceException e) {
+      decodeRSEStatusCodes(e);
+      throw new IOException(e);
+    }
   }
 
   @Override
-  public long getRangerServiceVersion() {
-    throw new UnsupportedOperationException("Ranger client implementation " +
-        "does not currently support this method. A workaround will be " +
-        "implemented in HDDS-6755.");
+  public long getRangerServicePolicyVersion() throws IOException {
+    RangerService rangerOzoneService;
+    try {
+      rangerOzoneService = client.getService(rangerServiceName);
+    } catch (RangerServiceException e) {
+      decodeRSEStatusCodes(e);
+      throw new IOException(e);
+    }
+    // If the login user doesn't have sufficient privilege, policyVersion
+    // field could be null in RangerService.
+    final Long policyVersion = rangerOzoneService.getPolicyVersion();
+    return policyVersion == null ? -1L : policyVersion;
   }
 
   private static List<RangerRole.RoleMember> toRangerRoleMembers(
-      Collection<String> members) {
-    return members.stream()
-            .map(princ -> new RangerRole.RoleMember(princ, false))
+      Map<String, Boolean> members) {
+    return members.entrySet().stream()
+            .map(entry -> {
+              final String princ = entry.getKey();
+              final boolean isRoleAdmin = entry.getValue();
+              return new RangerRole.RoleMember(princ, isRoleAdmin);
+            })
             .collect(Collectors.toList());
   }
 
@@ -204,13 +348,20 @@ public class RangerClientMultiTenantAccessController implements
       .setName(rangerRole.getName())
       .setDescription(rangerRole.getDescription())
       .addUsers(fromRangerRoleMembers(rangerRole.getUsers()))
+      .setCreatedByUser(rangerRole.getCreatedByUser())
       .build();
   }
 
-  private static RangerRole toRangerRole(Role role) {
+  private static RangerRole toRangerRole(Role role, String createdByUser) {
     RangerRole rangerRole = new RangerRole();
     rangerRole.setName(role.getName());
-    rangerRole.setUsers(toRangerRoleMembers(role.getUsers()));
+    rangerRole.setCreatedByUser(createdByUser);
+    if (!role.getUsersMap().isEmpty()) {
+      rangerRole.setUsers(toRangerRoleMembers(role.getUsersMap()));
+    }
+    if (!role.getRolesMap().isEmpty()) {
+      rangerRole.setRoles(toRangerRoleMembers(role.getRolesMap()));
+    }
     if (role.getDescription().isPresent()) {
       rangerRole.setDescription(role.getDescription().get());
     }
@@ -220,7 +371,7 @@ public class RangerClientMultiTenantAccessController implements
   private Policy fromRangerPolicy(RangerPolicy rangerPolicy) {
     Policy.Builder policyBuilder = new Policy.Builder();
 
-    // Get roles and their acls from the policy.
+    // Get roles and their ACLs from the policy.
     for (RangerPolicy.RangerPolicyItem policyItem:
         rangerPolicy.getPolicyItems()) {
       Collection<Acl> acls = new ArrayList<>();
@@ -260,7 +411,8 @@ public class RangerClientMultiTenantAccessController implements
     }
 
     policyBuilder.setName(rangerPolicy.getName())
-       .setDescription(rangerPolicy.getDescription())
+        .setId(rangerPolicy.getId())
+        .setDescription(rangerPolicy.getDescription())
         .addLabels(rangerPolicy.getPolicyLabels());
 
     return policyBuilder.build();
@@ -301,6 +453,23 @@ public class RangerClientMultiTenantAccessController implements
       rangerPolicy.setDescription(policy.getDescription().get());
     }
 
+    // Add users to the policy.
+    for (Map.Entry<String, Collection<Acl>> userAcls:
+        policy.getUserAcls().entrySet()) {
+      RangerPolicy.RangerPolicyItem item = new RangerPolicy.RangerPolicyItem();
+      item.setUsers(Collections.singletonList(userAcls.getKey()));
+
+      for (Acl acl: userAcls.getValue()) {
+        RangerPolicy.RangerPolicyItemAccess access =
+            new RangerPolicy.RangerPolicyItemAccess();
+        access.setIsAllowed(acl.isAllowed());
+        access.setType(aclToString.get(acl.getAclType()));
+        item.getAccesses().add(access);
+      }
+
+      rangerPolicy.getPolicyItems().add(item);
+    }
+
     // Add roles to the policy.
     for (Map.Entry<String, Collection<Acl>> roleAcls:
         policy.getRoleAcls().entrySet()) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
index e9ca102461..de29870909 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
@@ -72,6 +72,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_SERVICE;
 /**
  * Access controller for multi-tenancy implemented using Ranger's REST API.
  * This class is for testing and is not intended for production use.
+ *
+ * TODO: REMOVE.
  */
 public class RangerRestMultiTenantAccessController
     implements MultiTenantAccessController {
@@ -214,7 +216,7 @@ public class RangerRestMultiTenantAccessController
 
 
   @Override
-  public void createPolicy(Policy policy) throws IOException {
+  public Policy createPolicy(Policy policy) throws IOException {
     String rangerAdminUrl =
         rangerHttpsAddress + OZONE_RANGER_POLICY_HTTP_ENDPOINT;
     HttpsURLConnection conn = makeHttpsPostCall(rangerAdminUrl,
@@ -224,6 +226,9 @@ public class RangerRestMultiTenantAccessController
           "Http response code: %d", policy.getName(), conn.getResponseCode()));
     }
     getResponseData(conn);
+
+    // TODO: Should reconstruct from response data.
+    return policy;
   }
 
   @Override
@@ -281,12 +286,12 @@ public class RangerRestMultiTenantAccessController
   }
 
   @Override
-  public List<Policy> getLabeledPolicies(String label) throws Exception {
+  public List<Policy> getLabeledPolicies(String label) throws IOException {
     throw new NotImplementedException("Not Implemented");
   }
 
   @Override
-  public void updatePolicy(Policy policy) throws Exception {
+  public Policy updatePolicy(Policy policy) throws IOException {
     throw new NotImplementedException("Not Implemented");
   }
 
@@ -303,7 +308,7 @@ public class RangerRestMultiTenantAccessController
   }
 
   @Override
-  public void createRole(Role role) throws IOException {
+  public Role createRole(Role role) throws IOException {
     String rangerAdminUrl =
         rangerHttpsAddress + OZONE_RANGER_ROLE_HTTP_ENDPOINT;
 
@@ -317,6 +322,9 @@ public class RangerRestMultiTenantAccessController
     JsonObject jObject = new JsonParser().parse(responseString)
         .getAsJsonObject();
 //    return jObject.get("id").getAsLong();
+
+    // TODO: Should reconstruct from response data.
+    return role;
   }
 
   @Override
@@ -330,7 +338,7 @@ public class RangerRestMultiTenantAccessController
   }
 
   @Override
-  public long getRangerServiceVersion() throws Exception {
+  public long getRangerServicePolicyVersion() throws IOException {
     throw new NotImplementedException("Not Implemented");
   }
 
@@ -369,16 +377,19 @@ public class RangerRestMultiTenantAccessController
   }
 
   @Override
-  public void updateRole(long roleID, Role role) throws IOException {
+  public Role updateRole(long roleId, Role role) throws IOException {
     String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_RANGER_ROLE_HTTP_ENDPOINT + roleID;
+        rangerHttpsAddress + OZONE_RANGER_ROLE_HTTP_ENDPOINT + roleId;
 
     HttpsURLConnection conn = makeHttpsPutCall(rangerAdminUrl,
         jsonConverter.toJsonTree(role));
     if (!successfulResponseCode(conn.getResponseCode())) {
       throw new IOException(String.format("Failed to update role %d. " +
-          "Http response code: %d", roleID, conn.getResponseCode()));
+          "Http response code: %d", roleId, conn.getResponseCode()));
     }
+
+    // TODO: Should reconstruct from response data.
+    return role;
   }
 
   private HttpsURLConnection makeHttpsPutCall(String url, JsonElement content)
@@ -535,7 +546,7 @@ public class RangerRestMultiTenantAccessController
           for (JsonElement jsonUser : roleJson.get("users").getAsJsonArray()) {
             String userName =
                 jsonUser.getAsJsonObject().get("name").getAsString();
-            role.addUser(userName);
+            role.addUser(userName, false);
           }
 
           return role.build();
@@ -636,7 +647,7 @@ public class RangerRestMultiTenantAccessController
           jsonRole.addProperty("name", javaRole.getName());
 
           JsonArray jsonUserArray = new JsonArray();
-          for (String javaUser : javaRole.getUsers()) {
+          for (String javaUser : javaRole.getUsersMap().keySet()) {
             jsonUserArray.add(jsonConverter.toJsonTree(javaUser));
           }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
index bf8161105a..8eb5919faa 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
@@ -161,7 +161,7 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
     multiTenantManager.checkTenantExistence(tenantId);
 
     // Below call implies user existence check in authorizer.
-    // If the user doesn't exist, Ranger return 400 and the call should throw.
+    // If the user doesn't exist, Ranger returns 400 and the call should throw.
 
     // Acquire write lock to authorizer (Ranger)
     multiTenantManager.getAuthorizerLock().tryWriteLockInOMRequest();
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index b560e9860d..781aa94d92 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -92,6 +92,8 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneFileStatusProto;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerBGSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerBGSyncResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RepeatedKeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListResponse;
@@ -208,6 +210,11 @@ public class OzoneManagerRequestHandler implements RequestHandler {
             request.getServiceListRequest());
         responseBuilder.setServiceListResponse(serviceListResponse);
         break;
+      case RangerBGSync:
+        RangerBGSyncResponse rangerBGSyncResponse = triggerRangerBGSync(
+            request.getRangerBGSyncRequest());
+        responseBuilder.setRangerBGSyncResponse(rangerBGSyncResponse);
+        break;
       case DBUpdates:
         DBUpdatesResponse dbUpdatesResponse = getOMDBUpdates(
             request.getDbUpdatesRequest());
@@ -873,6 +880,14 @@ public class OzoneManagerRequestHandler implements RequestHandler {
     return rb.build();
   }
 
+  private RangerBGSyncResponse triggerRangerBGSync(
+      RangerBGSyncRequest rangerBGSyncRequest) throws IOException {
+
+    boolean res = impl.triggerRangerBGSync(rangerBGSyncRequest.getNoWait());
+
+    return RangerBGSyncResponse.newBuilder().setRunSuccess(res).build();
+  }
+
   @RequestFeatureValidator(
       conditions = ValidationCondition.OLDER_CLIENT_REQUESTS,
       processingPhase = RequestProcessingPhase.POST_PROCESS,
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManager.java
index 8fc0861838..39062472c8 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManager.java
@@ -34,10 +34,13 @@ import org.mockito.Mockito;
 import java.io.IOException;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_ENABLED;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_SERVICE;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FEATURE_NOT_ENABLED;
 
 /**
@@ -74,15 +77,34 @@ public class TestOMMultiTenantManager {
         StringUtils.toLowerCase(AuthenticationMethod.KERBEROS.toString()));
     expectConfigCheckToFail(ozoneManager, conf);
 
+    // Deliberately set ozone.om.kerberos.principal and
+    // ozone.om.kerberos.keytab.file to empty values in order to
+    // test the config checker, since the default values aren't empty.
+    conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY, "");
+    conf.set(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY, "");
+
     // Set essential Ranger conf one by one
-    conf.set(OZONE_RANGER_HTTPS_ADDRESS_KEY, "http://ranger:6080");
+    conf.set(OZONE_RANGER_HTTPS_ADDRESS_KEY, "https://ranger:6182");
     expectConfigCheckToFail(ozoneManager, conf);
-    conf.set(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER, "admin");
+    conf.set(OZONE_RANGER_SERVICE, "cm_ozone");
     expectConfigCheckToFail(ozoneManager, conf);
-    conf.set(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD, "passwd");
+
+    // Try Kerberos auth
+    final OzoneConfiguration confKerbAuth = new OzoneConfiguration(conf);
+    confKerbAuth.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY, "om/_HOST@REALM");
+    expectConfigCheckToFail(ozoneManager, confKerbAuth);
+    confKerbAuth.set(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY, "/path/to/om.keytab");
+    Assert.assertTrue(OMMultiTenantManager.checkAndEnableMultiTenancy(
+        ozoneManager, confKerbAuth));
+
+    // Try basic auth
+    final OzoneConfiguration confBasicAuth = new OzoneConfiguration(conf);
+    confBasicAuth.set(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER, "admin");
+    expectConfigCheckToFail(ozoneManager, confBasicAuth);
+    confBasicAuth.set(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD, "Password1");
     // At this point the config check should pass. Method returns true
-    Assert.assertTrue(
-        OMMultiTenantManager.checkAndEnableMultiTenancy(ozoneManager, conf));
+    Assert.assertTrue(OMMultiTenantManager.checkAndEnableMultiTenancy(
+        ozoneManager, confBasicAuth));
   }
 
   /**
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessController.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessController.java
index d4be749d51..f6b4ed99fb 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessController.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessController.java
@@ -22,9 +22,14 @@ import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Acl;
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Policy;
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessController.Role;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ranger.RangerClient;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -35,6 +40,12 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_SERVICE;
+import static org.apache.hadoop.ozone.om.OMMultiTenantManager.OZONE_TENANT_RANGER_ROLE_DESCRIPTION;
+
 /**
  * To test MultiTenantAccessController with Ranger Client.
  */
@@ -61,14 +72,52 @@ public class TestMultiTenantAccessController {
   /**
    * Use this setup to test against a live Ranger instance.
    */
-  //  @Before
+//  @Before
   public void setupClusterTest() throws Exception {
-    // These config keys must be set when the test is run:
+
+    // Set up truststore
+    System.setProperty("javax.net.ssl.trustStore",
+        "/path/to/cm-auto-global_truststore.jks");
+
+    // Specify Kerberos client config (krb5.conf) path
+    System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
+
+    // Enable Kerberos debugging
+    System.setProperty("sun.security.krb5.debug", "true");
+
+    // DEFAULT rule uses the default realm configured in krb5.conf
+    KerberosName.setRules("DEFAULT");
+
+    final OzoneConfiguration conf = new OzoneConfiguration();
+
+    // These config keys must be properly set when the test is run:
+    //
     // OZONE_RANGER_HTTPS_ADDRESS_KEY
     // OZONE_RANGER_SERVICE
     // OZONE_OM_KERBEROS_PRINCIPAL_KEY
     // OZONE_OM_KERBEROS_KEYTAB_FILE_KEY
-    OzoneConfiguration conf = new OzoneConfiguration();
+
+    // Same as OM ranger-ozone-security.xml ranger.plugin.ozone.policy.rest.url
+    conf.set(OZONE_RANGER_HTTPS_ADDRESS_KEY,
+        "https://RANGER_HOST:6182/");
+
+    // Same as OM ranger-ozone-security.xml ranger.plugin.ozone.service.name
+    conf.set(OZONE_RANGER_SERVICE, "cm_ozone");
+
+    conf.set(OZONE_OM_KERBEROS_PRINCIPAL_KEY,
+        "om/instance@REALM");
+
+    conf.set(OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
+        "/path/to/ozone.keytab");
+
+    // TODO: Test with clear text username and password as well.
+//    conf.set(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER, "rangeruser");
+//    conf.set(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD, "passwd");
+
+    // (Optional) Enable RangerClient debug log
+    GenericTestUtils.setLogLevel(
+        LoggerFactory.getLogger(RangerClient.class), Level.DEBUG);
+
     controller = new RangerClientMultiTenantAccessController(conf);
   }
 
@@ -91,6 +140,9 @@ public class TestMultiTenantAccessController {
             .addLabel("label2")
             .build();
 
+    // Get the starting service version
+    long prevPolicyVersion = controller.getRangerServicePolicyVersion();
+
     // create in ranger.
     controller.createPolicy(originalPolicy);
     // get to check it's there with all attributes.
@@ -98,8 +150,17 @@ public class TestMultiTenantAccessController {
         controller.getPolicy(policyName);
     Assert.assertEquals(originalPolicy, retrievedPolicy);
 
+    // Service policy version should have been bumped by 1 at this point
+    long currPolicyVersion = controller.getRangerServicePolicyVersion();
+    Assert.assertEquals(prevPolicyVersion + 1L, currPolicyVersion);
+
     // delete policy.
     controller.deletePolicy(policyName);
+
+    // Service policy version should have been bumped again
+    currPolicyVersion = controller.getRangerServicePolicyVersion();
+    Assert.assertEquals(prevPolicyVersion + 2L, currPolicyVersion);
+
     // get to check it is deleted.
     try {
       controller.getPolicy(policyName);
@@ -261,12 +322,12 @@ public class TestMultiTenantAccessController {
     // get one of the roles to check it is there but empty.
     Role retrievedRole = controller.getRole(roleName);
     Assert.assertFalse(retrievedRole.getDescription().isPresent());
-    Assert.assertTrue(retrievedRole.getUsers().isEmpty());
-    Assert.assertTrue(retrievedRole.getRoleID().isPresent());
+    Assert.assertTrue(retrievedRole.getUsersMap().isEmpty());
+    Assert.assertTrue(retrievedRole.getId().isPresent());
 
     // Add a user to the role.
-    retrievedRole.getUsers().add(users.get(0));
-    controller.updateRole(retrievedRole.getRoleID().get(), retrievedRole);
+    retrievedRole.getUsersMap().put(users.get(0), false);
+    controller.updateRole(retrievedRole.getId().get(), retrievedRole);
 
     // Create a new policy containing the role. This should not overwrite the
     // role.
@@ -293,6 +354,7 @@ public class TestMultiTenantAccessController {
         new Role.Builder()
             .setName(roleName)
             .addUsers(users)
+            .setDescription(OZONE_TENANT_RANGER_ROLE_DESCRIPTION)
             .build();
 
     // create in ranger.
@@ -300,7 +362,7 @@ public class TestMultiTenantAccessController {
     // get to check it's there with all attributes.
     Role retrievedRole = controller.getRole(roleName);
     // Role ID should have been added by Ranger.
-    Assert.assertTrue(retrievedRole.getRoleID().isPresent());
+    Assert.assertTrue(retrievedRole.getId().isPresent());
     Assert.assertEquals(originalRole, retrievedRole);
 
     // delete role.
@@ -319,6 +381,7 @@ public class TestMultiTenantAccessController {
     final String roleName = "test-role";
     Role originalRole = new Role.Builder()
             .setName(roleName)
+            .setDescription(OZONE_TENANT_RANGER_ROLE_DESCRIPTION)
             .build();
     // create in Ranger.
     controller.createRole(originalRole);
@@ -327,6 +390,7 @@ public class TestMultiTenantAccessController {
     // Create a role with the same name and check for error.
     Role sameNameRole = new Role.Builder()
             .setName(roleName)
+            .setDescription(OZONE_TENANT_RANGER_ROLE_DESCRIPTION)
             .build();
     try {
       controller.createRole(sameNameRole);
@@ -345,24 +409,25 @@ public class TestMultiTenantAccessController {
     Role originalRole = new Role.Builder()
         .setName(roleName)
         .addUsers(users)
+        .setDescription(OZONE_TENANT_RANGER_ROLE_DESCRIPTION)
         .build();
     // create in Ranger.
     controller.createRole(originalRole);
 
     Role retrievedRole = controller.getRole(roleName);
     Assert.assertEquals(originalRole, retrievedRole);
-    Assert.assertTrue(retrievedRole.getRoleID().isPresent());
-    long roleID = retrievedRole.getRoleID().get();
+    Assert.assertTrue(retrievedRole.getId().isPresent());
+    long roleId = retrievedRole.getId().get();
 
     // Remove a user from the role and update it.
-    retrievedRole.getUsers().remove(users.get(0));
-    Assert.assertEquals(originalRole.getUsers().size() - 1,
-        retrievedRole.getUsers().size());
-    controller.updateRole(roleID, retrievedRole);
+    retrievedRole.getUsersMap().remove(users.get(0));
+    Assert.assertEquals(originalRole.getUsersMap().size() - 1,
+        retrievedRole.getUsersMap().size());
+    controller.updateRole(roleId, retrievedRole);
     Role retrievedUpdatedRole = controller.getRole(roleName);
     Assert.assertEquals(retrievedRole, retrievedUpdatedRole);
-    Assert.assertEquals(originalRole.getUsers().size() - 1,
-        retrievedUpdatedRole.getUsers().size());
+    Assert.assertEquals(originalRole.getUsersMap().size() - 1,
+        retrievedUpdatedRole.getUsersMap().size());
 
     // Cleanup.
     controller.deleteRole(roleName);
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java
index ac03b4661f..b61438077f 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/OMAdmin.java
@@ -57,7 +57,8 @@ import java.util.Collection;
         PrepareSubCommand.class,
         CancelPrepareSubCommand.class,
         FinalizationStatusSubCommand.class,
-        DecommissionOMSubcommand.class
+        DecommissionOMSubcommand.class,
+        UpdateRangerSubcommand.class
     })
 @MetaInfServices(SubcommandWithParent.class)
 public class OMAdmin extends GenericCli implements SubcommandWithParent {
@@ -129,7 +130,7 @@ public class OMAdmin extends GenericCli implements SubcommandWithParent {
 
   private String getTheOnlyConfiguredOmServiceIdOrThrow() {
     if (getConfiguredServiceIds().size() != 1) {
-      throw new IllegalArgumentException("There is no Ozone Manager service ID"
+      throw new IllegalArgumentException("There is no Ozone Manager service ID "
           + "specified, but there are either zero, or more than one service "
           + "configured. Please specify the service ID to be finalized.");
     }
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/UpdateRangerSubcommand.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/UpdateRangerSubcommand.java
new file mode 100644
index 0000000000..4234ee29d1
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/admin/om/UpdateRangerSubcommand.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.admin.om;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.ozone.client.OzoneClientException;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Handler of om updateranger command.
+ *
+ * Usage:
+ * ozone admin om updateranger -host=om
+ * ozone admin om updateranger -id=ozone1
+ */
+@CommandLine.Command(
+    name = "updateranger",
+    description = "Trigger Ranger sync background service task on a leader OM "
+        + "that pushes policies and roles updates to Ranger. "
+        + "This operation requires Ozone administrator privilege.",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class UpdateRangerSubcommand implements Callable<Void> {
+
+  @CommandLine.ParentCommand
+  private OMAdmin parent;
+
+  @CommandLine.Option(
+      names = {"-id", "--service-id"},
+      description = "Ozone Manager Service ID"
+  )
+  private String omServiceId;
+
+  @CommandLine.Option(
+      names = {"-host", "--service-host"},
+      description = "Ozone Manager Host. If OM HA is enabled, use -id instead. "
+          + "If insists on using -host with OM HA, this must point directly "
+          + "to the leader OM. "
+          + "This option is required when -id is not provided or "
+          + "when HA is not enabled."
+  )
+  private String omHost;
+
+  @CommandLine.Option(names = {"--no-wait"},
+      description = "Do not wait for task completion. Exit immediately "
+          + "after the Ranger BG sync task is triggered on the OM.")
+  private boolean noWait;
+
+  @Override
+  public Void call() throws Exception {
+
+    if (StringUtils.isEmpty(omServiceId) && StringUtils.isEmpty(omHost)) {
+      System.err.println("Error: Please specify -id or -host");
+      return null;
+    }
+
+    boolean forceHA = false;
+    try (OzoneManagerProtocol client = parent.createOmClient(
+        omServiceId, omHost, forceHA)) {
+
+      boolean res = client.triggerRangerBGSync(noWait);
+
+      if (res) {
+        System.out.println("Operation completed successfully");
+      } else {
+        System.err.println("Operation completed with errors. "
+            + "Check OM log for details");
+      }
+
+    } catch (OzoneClientException ex) {
+      System.err.printf("Error: %s", ex.getMessage());
+    }
+    return null;
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org