You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by si...@apache.org on 2022/05/21 05:53:31 UTC

[ozone] branch HDDS-4944 updated: HDDS-6371. [Multi-Tenant] Provide OM DB to Apache Ranger Sync mechanism (#3131)

This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch HDDS-4944
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-4944 by this push:
     new 35043683a6 HDDS-6371. [Multi-Tenant] Provide OM DB to Apache Ranger Sync mechanism (#3131)
35043683a6 is described below

commit 35043683a6f5b843fe31e7041e999939b22a102f
Author: prashantpogde <pr...@gmail.com>
AuthorDate: Fri May 20 22:53:25 2022 -0700

    HDDS-6371. [Multi-Tenant] Provide OM DB to Apache Ranger Sync mechanism (#3131)
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |  22 +-
 .../common/src/main/resources/ozone-default.xml    |  23 +
 .../org/apache/hadoop/ozone/TestOzoneConsts.java   |  35 +
 .../docs/content/feature/S3-Multi-Tenancy-Setup.md |  21 +-
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   1 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  11 +
 .../hadoop/ozone/om/helpers/OmRangerSyncArgs.java  |  67 ++
 .../hadoop/ozone/om/multitenant/AccessPolicy.java  |  23 +-
 .../om/multitenant/OzoneTenantRolePrincipal.java   |  27 +-
 .../ozone/om/multitenant/RangerAccessPolicy.java   |  57 +-
 .../ozone/om/protocol/OzoneManagerProtocol.java    |   1 -
 .../main/compose/ozonesecure/docker-compose.yaml   |   8 +-
 .../src/main/compose/ozonesecure/docker-config     |   2 +-
 .../ozonesecure/mockserverInitialization.json      |   8 +
 .../smoketest/security/ozone-secure-tenant.robot   |  10 +-
 ...estMultiTenantAccessAuthorizerRangerPlugin.java |  59 +-
 .../om/multitenant/TestMultiTenantVolume.java      |  18 +
 .../om/multitenant/TestRangerBGSyncService.java    | 723 +++++++++++++++++++
 .../src/main/proto/OmClientProtocol.proto          |  19 +-
 .../hadoop/ozone/om/OMMultiTenantManager.java      |  69 +-
 .../hadoop/ozone/om/OMMultiTenantManagerImpl.java  | 519 ++++++++++----
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   4 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   3 +
 .../ozone/om/multitenant/CachedTenantState.java    |  63 +-
 .../multitenant/MultiTenantAccessAuthorizer.java   |  80 ++-
 .../MultiTenantAccessAuthorizerDummyPlugin.java    |  63 +-
 .../MultiTenantAccessAuthorizerRangerPlugin.java   | 395 +++++++++--
 .../multitenant/MultiTenantAccessController.java   |  12 +
 .../om/multitenant/OMRangerBGSyncService.java      | 773 +++++++++++++++++++++
 .../RangerRestMultiTenantAccessController.java     | 670 ++++++++++++++++++
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   3 +
 .../hadoop/ozone/om/request/OMClientRequest.java   |   1 +
 .../tenant/OMSetRangerServiceVersionRequest.java   |  84 +++
 .../s3/tenant/OMTenantAssignAdminRequest.java      |   9 +-
 .../tenant/OMTenantAssignUserAccessIdRequest.java  |  10 +-
 .../request/s3/tenant/OMTenantCreateRequest.java   |  31 +-
 .../request/s3/tenant/OMTenantDeleteRequest.java   |   5 +-
 .../s3/tenant/OMTenantRevokeAdminRequest.java      |   8 +-
 .../tenant/OMTenantRevokeUserAccessIdRequest.java  |   9 +-
 .../tenant/OMSetRangerServiceVersionResponse.java  |  72 ++
 .../hadoop/ozone/om/upgrade/OMLayoutFeature.java   |   2 +
 .../ozone/om/TestOMMultiTenantManagerImpl.java     |  56 +-
 .../s3/security/TestS3GetSecretRequest.java        |   7 +-
 .../tenant/TestSetRangerServiceVersionRequest.java | 110 +++
 44 files changed, 3814 insertions(+), 379 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 19d508945b..f0a26df46f 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -140,6 +140,9 @@ public final class OzoneConsts {
   public static final String OZONE_DB_CHECKPOINT_REQUEST_FLUSH =
       "flushBeforeCheckpoint";
 
+  public static final String RANGER_OZONE_SERVICE_VERSION_KEY =
+      "#RANGEROZONESERVICEVERSION";
+
   /**
    * Supports Bucket Versioning.
    */
@@ -492,16 +495,33 @@ public final class OzoneConsts {
   public static final String OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT =
       "/service/roles/roles/name/";
 
-  // TODO: Use delete role endpoint
   public static final String OZONE_OM_RANGER_ADMIN_DELETE_GROUP_HTTP_ENDPOINT =
       "/service/xusers/secure/groups/id/";
 
+  public static final String OZONE_OM_RANGER_ADMIN_DELETE_ROLE_HTTP_ENDPOINT =
+      "/service/roles/roles/";
+
   public static final String OZONE_OM_RANGER_ADMIN_CREATE_POLICY_HTTP_ENDPOINT =
       "/service/public/v2/api/policy";
 
   public static final String OZONE_OM_RANGER_ADMIN_GET_POLICY_HTTP_ENDPOINT =
       "/service/public/v2/api/policy/?policyName=";
 
+  public static final String OZONE_OM_RANGER_ADMIN_GET_POLICY_ID_HTTP_ENDPOINT =
+      "/service/public/v2/api/policy/?policyId=";
+
   public static final String OZONE_OM_RANGER_ADMIN_DELETE_POLICY_HTTP_ENDPOINT =
       "/service/plugins/policies/";
+
+  public static final String OZONE_OM_RANGER_OZONE_SERVICE_ENDPOINT =
+      "/service/plugins/services/";
+
+  public static final String OZONE_OM_RANGER_DOWNLOAD_ENDPOINT =
+      "/service/plugins/secure/policies/download/cm_ozone" +
+          "?supportsPolicyDeltas=true&lastKnownVersion=";
+
+  public static final String OZONE_OM_RANGER_ALL_POLICIES_ENDPOINT =
+      "/service/plugins/policies/service/";
+
+  public static final String OZONE_TENANT_RANGER_POLICY_LABEL = "OzoneTenant";
 }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index b95f858834..6eed863992 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3126,4 +3126,27 @@
     <description>Enable S3 Multi-Tenancy. If disabled, all S3 multi-tenancy requests are rejected.
     </description>
   </property>
+
+  <property>
+    <name>ozone.om.multitenancy.ranger.sync.interval</name>
+    <value>10m</value>
+    <tag>OZONE, OM</tag>
+    <description>
+      Determines how often the Multi-Tenancy Ranger background sync thread
+      service should run. Background thread periodically checks
+      Ranger policies and roles created by Multi-Tenancy feature.
+      And overwrites them if obvious discrepancies are detected.
+      Value should be set with a unit suffix (ns,ms,s,m,h,d)
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.om.multitenancy.ranger.sync.timeout</name>
+    <value>10s</value>
+    <tag>OZONE, OM</tag>
+    <description>
+      The timeout for each Multi-Tenancy Ranger background sync thread run.
+      If the timeout has been reached, a warning message will be logged.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/TestOzoneConsts.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/TestOzoneConsts.java
new file mode 100644
index 0000000000..3ba76d651d
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/TestOzoneConsts.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test Ozone Constants (e.g. for compatibility).
+ */
+public class TestOzoneConsts {
+
+  @Test
+  public void testOzoneTenantPolicyLabelCompatibility() {
+    // Value of the policy label should not be changed
+    Assert.assertEquals(
+        "OzoneTenant", OzoneConsts.OZONE_TENANT_RANGER_POLICY_LABEL);
+  }
+
+}
diff --git a/hadoop-hdds/docs/content/feature/S3-Multi-Tenancy-Setup.md b/hadoop-hdds/docs/content/feature/S3-Multi-Tenancy-Setup.md
index 79e1320045..0965b23fd5 100644
--- a/hadoop-hdds/docs/content/feature/S3-Multi-Tenancy-Setup.md
+++ b/hadoop-hdds/docs/content/feature/S3-Multi-Tenancy-Setup.md
@@ -86,26 +86,7 @@ docker-compose up -d --scale datanode=3
 docker-compose exec scm bash
 ```
 
-It might be necessary to run the following command first before testing the tenant commands in the `compose/ozonesecure` Docker environment
-in order to workaround a Docker-specific DNS issue when first contacting Ranger.
-
-```shell
-bash-4.2$ curl -k https://ranger:6182/
-{}
-```
-
-Then all subsequent requests to Ranger (mock server) should work as expected.
-
-Otherwise you might see such DNS error:
-
-```shell
-bash-4.2$ ozone tenant create tenantone
-2022-02-16 00:00:00,000 [main] INFO rpc.RpcClient: Creating Tenant: 'tenantone', with new volume: 'tenantone'
-INTERNAL_ERROR No subject alternative DNS name matching ranger found.
-```
-
-
-Operations requiring Ozone cluster administrator privilege are run as `om/om` user:
+Operations requiring Ozone cluster administrator privilege should be run as `om` user:
 
 ```shell
 kinit -kt /etc/security/keytabs/om.keytab om/om@EXAMPLE.COM
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 4aaee5ed01..c1c522ede5 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -309,6 +309,7 @@ public final class OmUtils {
     case TenantRevokeUserAccessId:
     case TenantAssignAdmin:
     case TenantRevokeAdmin:
+    case SetRangerServiceVersion:
       return false;
     default:
       LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index f14b7f6239..291eb6f28d 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -341,4 +341,15 @@ public final class OMConfigKeys {
       "ozone.om.ranger.https-address";
   public static final String OZONE_RANGER_SERVICE =
       "ozone.om.ranger.service";
+
+  public static final String OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL
+      = "ozone.om.multitenancy.ranger.sync.interval";
+  public static final TimeDuration
+      OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL_DEFAULT
+      = TimeDuration.valueOf(600, TimeUnit.SECONDS);
+  public static final String OZONE_OM_MULTITENANCY_RANGER_SYNC_TIMEOUT
+      = "ozone.om.multitenancy.ranger.sync.timeout";
+  public static final TimeDuration
+      OZONE_OM_MULTITENANCY_RANGER_SYNC_TIMEOUT_DEFAULT
+      = TimeDuration.valueOf(10, TimeUnit.SECONDS);
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmRangerSyncArgs.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmRangerSyncArgs.java
new file mode 100644
index 0000000000..fff6f38a37
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmRangerSyncArgs.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class is used for storing Ranger Sync request args.
+ */
+public class OmRangerSyncArgs {
+
+  /**
+   * Ozone RangerServiceVersion to sync to.
+   */
+  private final long newServiceVersion;
+
+  public OmRangerSyncArgs(long version) {
+    this.newServiceVersion = version;
+  }
+
+  public long getNewSyncServiceVersion() {
+    return newServiceVersion;
+  }
+
+  public static OmRangerSyncArgs.Builder newBuilder() {
+    return new OmRangerSyncArgs.Builder();
+  }
+
+  /**
+   * Builder for OmRangerSyncArgs.
+   */
+  @SuppressWarnings("checkstyle:hiddenfield")
+  public static class Builder {
+    private long newServiceVersion;
+    /**
+     * Constructs a builder.
+     */
+    public Builder() {
+    }
+
+    public Builder setNewSyncServiceVersion(long version) {
+      this.newServiceVersion = version;
+      return this;
+    }
+
+    public OmRangerSyncArgs build() {
+      Preconditions.checkNotNull(newServiceVersion);
+      return new OmRangerSyncArgs(newServiceVersion);
+    }
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/AccessPolicy.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/AccessPolicy.java
index 3476d6756f..c1f3f945c8 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/AccessPolicy.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/AccessPolicy.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.om.multitenant;
 
 import java.io.IOException;
 import java.security.Principal;
+import java.util.HashSet;
 import java.util.List;
 
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
@@ -106,16 +107,16 @@ public interface AccessPolicy {
    * All Authorizer policy engines are supposed to provide an implementation
    * of AccessPolicy interface.
    */
-  String serializePolicyToJsonString() throws Exception;
+  String serializePolicyToJsonString() throws IOException;
 
   /**
    * Given a serialized accessPolicy in a Json format, deserializes and
    * constructs a valid access Policy.
    * @return
-   * @throws Exception
+   * @throws IOException
    */
   String deserializePolicyFromJsonString(JsonObject jsonObject)
-      throws Exception;
+      throws IOException;
 
   /**
    * @return AccessPolicyType (Native or otherwise).
@@ -132,4 +133,20 @@ public interface AccessPolicy {
       throws IOException;
 
   List<AccessPolicyElem> getAccessPolicyElem();
+
+  /**
+   * Sets the last update time to mtime.
+   * @param mtime Time in epoch milliseconds
+   */
+  void setPolicyLastUpdateTime(long mtime);
+
+  /**
+   * Returns the last update time of Ranger policies.
+   */
+  long getPolicyLastUpdateTime();
+
+  /**
+   * @return list of roles associated with this policy
+   */
+  HashSet<String> getRoleList();
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenantRolePrincipal.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenantRolePrincipal.java
index 81f6f6fa3f..baf500c03f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenantRolePrincipal.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/OzoneTenantRolePrincipal.java
@@ -17,33 +17,16 @@
  */
 package org.apache.hadoop.ozone.om.multitenant;
 
-import org.apache.hadoop.ozone.OzoneConsts;
 import java.security.Principal;
 
 /**
- * Used to identify a tenant's role in Ranger.
+ * Used to identify a tenant's Ranger Role.
  */
 public final class OzoneTenantRolePrincipal implements Principal {
-  private final String tenantId;
-  private final String roleNameSuffix;
+  private final String tenantRoleName;
 
-  public static OzoneTenantRolePrincipal getUserRole(String tenantId) {
-    return new OzoneTenantRolePrincipal(
-        tenantId, OzoneConsts.DEFAULT_TENANT_ROLE_USER_SUFFIX);
-  }
-
-  public static OzoneTenantRolePrincipal getAdminRole(String tenantId) {
-    return new OzoneTenantRolePrincipal(
-        tenantId, OzoneConsts.DEFAULT_TENANT_ROLE_ADMIN_SUFFIX);
-  }
-
-  private OzoneTenantRolePrincipal(String tenantId, String roleNameSuffix) {
-    this.tenantId = tenantId;
-    this.roleNameSuffix = roleNameSuffix;
-  }
-
-  public String getTenantId() {
-    return tenantId;
+  public OzoneTenantRolePrincipal(String tenantRoleName) {
+    this.tenantRoleName = tenantRoleName;
   }
 
   @Override
@@ -53,6 +36,6 @@ public final class OzoneTenantRolePrincipal implements Principal {
 
   @Override
   public String getName() {
-    return tenantId + roleNameSuffix;
+    return tenantRoleName;
   }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java
index f05f2371b4..169d1aa743 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerAccessPolicy.java
@@ -19,16 +19,20 @@ package org.apache.hadoop.ozone.om.multitenant;
 
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
+
+import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 
 import java.io.IOException;
 import java.security.Principal;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_TENANT_RANGER_POLICY_LABEL;
 import static org.apache.hadoop.ozone.om.multitenant.AccessPolicy.AccessPolicyType.RANGER_POLICY;
 
 /**
@@ -38,14 +42,17 @@ public class RangerAccessPolicy implements AccessPolicy {
 
   // For now RangerAccessPolicy supports only one object per policy
   private OzoneObj accessObject;
-  private Map<String, List<AccessPolicyElem>> policyMap;
+  private final Map<String, List<AccessPolicyElem>> policyMap;
+  private final HashSet<String> roleList;
   private String policyID;
   private String policyJsonString;
   private String policyName;
+  private long lastPolicyUpdateTimeEpochMillis;
 
   public RangerAccessPolicy(String name) {
     policyMap = new ConcurrentHashMap<>();
     policyName = name;
+    roleList = new HashSet<>();
   }
 
   public void setPolicyID(String id) {
@@ -60,17 +67,51 @@ public class RangerAccessPolicy implements AccessPolicy {
     return policyName;
   }
 
+  public HashSet<String> getRoleList() {
+    return roleList;
+  }
+
   @Override
-  public String serializePolicyToJsonString() throws Exception {
+  public void setPolicyLastUpdateTime(long mtime) {
+    lastPolicyUpdateTimeEpochMillis = mtime;
+  }
+
+  @Override
+  public long getPolicyLastUpdateTime() {
+    return lastPolicyUpdateTimeEpochMillis;
+  }
+
+  @Override
+  public String serializePolicyToJsonString() throws IOException {
     updatePolicyJsonString();
     return policyJsonString;
   }
 
   @Override
-  public String deserializePolicyFromJsonString(JsonObject jsonObject)
-      throws Exception {
+  public String deserializePolicyFromJsonString(JsonObject jsonObject) {
     setPolicyID(jsonObject.get("id").getAsString());
+    try {
+      JsonArray policyItems = jsonObject
+          .getAsJsonArray("policyItems");
+      for (int j = 0; j < policyItems.size(); ++j) {
+        JsonObject policy = policyItems.get(j).getAsJsonObject();
+        JsonArray roles = policy.getAsJsonArray("roles");
+        for (int k = 0; k < roles.size(); ++k) {
+          if (!roleList.contains(roles.get(k).getAsString())) {
+            // We only get the role name here. We need to query and populate it.
+            roleList.add(roles.get(k).getAsString());
+          }
+        }
+      }
+    } catch (Exception e) {
+      // Ignore Exception here.
+    }
     // TODO : retrieve other policy fields as well.
+    try {
+      setPolicyLastUpdateTime(jsonObject.get("updateTime").getAsLong());
+    } catch (Exception e) {
+      // lets ignore the exception in case the field is not set.
+    }
     return null;
   }
 
@@ -146,7 +187,7 @@ public class RangerAccessPolicy implements AccessPolicy {
         "removeAccessPolicyElem:  aclType not found." + object.toString());
   }
 
-  private String createRangerResourceItems() throws IOException {
+  private String createRangerResourceItems() {
     StringBuilder resourceItems = new StringBuilder();
     resourceItems.append("\"resources\":{" +
         "\"volume\":{" +
@@ -250,11 +291,13 @@ public class RangerAccessPolicy implements AccessPolicy {
     }
   }
 
-  private void updatePolicyJsonString() throws Exception {
+  private void updatePolicyJsonString() throws IOException {
     policyJsonString =
         "{\"policyType\":\"0\"," + "\"name\":\"" + policyName + "\","
             + "\"isEnabled\":true," + "\"policyPriority\":0,"
-            + "\"policyLabels\":[]," + "\"description\":\"\","
+            + "\"description\":\"Policy created by Ozone for Multi-Tenancy\","
+            + "\"policyLabels\":[\"" + OZONE_TENANT_RANGER_POLICY_LABEL + "\"],"
+            + "\"description\":\"\","
             + "\"isAuditEnabled\":true," + createRangerResourceItems()
             + "\"isDenyAllElse\":false," + createRangerPolicyItems()
             + "\"allowExceptions\":[]," + "\"denyPolicyItems\":[],"
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 5d548d3b23..aa2b24f3ec 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -579,7 +579,6 @@ public interface OzoneManagerProtocol
         "this to be implemented, as write requests use a new approach.");
   }
 
-
   /**
    * Create a tenant.
    * @param omTenantArgs OmTenantArgs
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
index 6053a69fa9..edfa6e6ccf 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
@@ -110,16 +110,16 @@ services:
       OZONE_OPTS:
     command: ["/opt/hadoop/bin/ozone","scm"]
   ranger:
-    image: mockserver/mockserver:mockserver-5.11.2
+    image: mockserver/mockserver:mockserver-5.13.2
     hostname: ranger
     volumes:
       - ./mockserverInitialization.json:/config/mockserverInitialization.json
     ports:
-      - 6182:6182
+      - "6080:6080"
+      - "6182:6182"
     environment:
       MOCKSERVER_MAX_EXPECTATIONS: 100
       MOCKSERVER_MAX_HEADER_SIZE: 8192
       MOCKSERVER_WATCH_INITIALIZATION_JSON: "true"
       MOCKSERVER_INITIALIZATION_JSON_PATH: /config/mockserverInitialization.json
-    command: -logLevel DEBUG -serverPort 6182
-
+    command: -logLevel DEBUG -serverPort 6080
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
index e58b7512e4..0ddbf1551d 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-config
@@ -132,7 +132,7 @@ OZONE_LOG_DIR=/var/log/hadoop
 
 no_proxy=om,scm,recon,s3g,kdc,localhost,127.0.0.1
 
-OZONE-SITE.XML_ozone.om.ranger.https-address=https://ranger:6182
+OZONE-SITE.XML_ozone.om.ranger.https-address=http://ranger:6080
 OZONE-SITE.XML_ozone.om.multitenancy.enabled=true
 OZONE-SITE.XML_ozone.om.ranger.https.admin.api.user=admin
 OZONE-SITE.XML_ozone.om.ranger.https.admin.api.passwd=passwd
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/mockserverInitialization.json b/hadoop-ozone/dist/src/main/compose/ozonesecure/mockserverInitialization.json
index fbd7735366..724798270f 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/mockserverInitialization.json
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/mockserverInitialization.json
@@ -62,5 +62,13 @@
     "httpResponse": {
       "body": "[{id: 444}]"
     }
+  },
+  {
+    "httpRequest": {
+      "path": "/service/plugins/services/"
+    },
+    "httpResponse": {
+      "body": "{\"startIndex\":0,\"pageSize\":200,\"totalCount\":13,\"resultSize\":13,\"sortType\":\"asc\",\"sortBy\":\"serviceId\",\"queryTimeMS\":1651104831041,\"services\":[{\"id\":7,\"guid\":\"b6cbaf6c-3911-4fa6-aeed-60dece4b111b\",\"isEnabled\":true,\"createdBy\":\"Admin\",\"updatedBy\":\"Admin\",\"createTime\":1651040438000,\"updateTime\":1651040438000,\"version\":1,\"type\":\"ozone\",\"name\":\"cm_ozone\",\"displayName\":\"cm_ozone\",\"description\":\"Ozone repo\",\"tagService\":\ [...]
+    }
   }
 ]
diff --git a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
index fa9c2b1e95..17da56105e 100644
--- a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure-tenant.robot
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 *** Settings ***
-Documentation       Smoke test for ozone secure tenant commands.
+Documentation       Smoke test for ozone secure tenant commands
 Library             OperatingSystem
 Library             String
 Library             BuiltIn
@@ -23,17 +23,11 @@ Resource            ../s3/commonawslib.robot
 Test Timeout        5 minutes
 
 *** Variables ***
-${RANGER_ENDPOINT_URL}  https://ranger:6182
+${RANGER_ENDPOINT_URL}  http://ranger:6080
 ${S3G_ENDPOINT_URL}     http://s3g:9878
 
-*** Keywords ***
-Init Ranger MockServer
-    ${output} =         Execute          curl -k ${RANGER_ENDPOINT_URL}
-                        Should contain   ${output}         {}
-
 *** Test Cases ***
 Create Tenant Success with Cluster Admin
-    Run Keyword         Init Ranger MockServer
     Run Keyword         Kinit test user     testuser     testuser.keytab
     ${output} =         Execute          ozone tenant --verbose create tenantone
                         Should contain   ${output}         "tenantId": "tenantone"
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessAuthorizerRangerPlugin.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessAuthorizerRangerPlugin.java
index 0eb52f96f0..dae673fde5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessAuthorizerRangerPlugin.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantAccessAuthorizerRangerPlugin.java
@@ -35,6 +35,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 import org.apache.http.auth.BasicUserPrincipal;
@@ -100,15 +101,15 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
 
     try {
       OzoneTenantRolePrincipal adminRole =
-          OzoneTenantRolePrincipal.getAdminRole("tenant1-AdminRole");
+          new OzoneTenantRolePrincipal("tenant1-AdminRole");
       OzoneTenantRolePrincipal userRole =
-          OzoneTenantRolePrincipal.getUserRole("tenant1-UserRole");
+          new OzoneTenantRolePrincipal("tenant1-UserRole");
 
       BasicUserPrincipal userPrincipal = new BasicUserPrincipal("user1Test");
       usersIdsCreated.add(
-          omm.assignUser(userPrincipal, userRole.getName(), false));
+          omm.assignUserToRole(userPrincipal, userRole.getName(), false));
       usersIdsCreated.add(
-          omm.assignUser(userPrincipal, adminRole.getName(), true));
+          omm.assignUserToRole(userPrincipal, adminRole.getName(), true));
 
       AccessPolicy tenant1VolumeAccessPolicy = createVolumeAccessPolicy(
           "vol1", "tenant1");
@@ -130,7 +131,7 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
       Assert.fail(e.getMessage());
     } finally {
       for (String id : policyIdsCreated) {
-        omm.deletePolicybyId(id);
+        omm.deletePolicyById(id);
       }
       for (String id : usersIdsCreated) {
         omm.deleteUser(id);
@@ -153,17 +154,17 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
 
     try {
       Assert.assertTrue(policyIdsCreated.size() == 0);
-      OzoneTenantRolePrincipal group1Principal =
-          OzoneTenantRolePrincipal.getAdminRole("tenant1");
-      OzoneTenantRolePrincipal group2Principal =
-          OzoneTenantRolePrincipal.getUserRole("tenant1");
-      omm.createRole(group1Principal, null);
+      OzoneTenantRolePrincipal group1Principal = new OzoneTenantRolePrincipal(
+          OMMultiTenantManager.getDefaultAdminRoleName("tenant1"));
+      OzoneTenantRolePrincipal group2Principal = new OzoneTenantRolePrincipal(
+          OMMultiTenantManager.getDefaultUserRoleName("tenant1"));
+      omm.createRole(group1Principal.getName(), null);
       groupIdsCreated.add(omm.getRole(group1Principal));
-      omm.createRole(group2Principal, group1Principal.getName());
+      omm.createRole(group2Principal.getName(), group1Principal.getName());
       groupIdsCreated.add(omm.getRole(group2Principal));
 
       userPrincipal = new BasicUserPrincipal("user1Test");
-      omm.assignUser(userPrincipal, group2Principal.getName(), false);
+      omm.assignUserToRole(userPrincipal, group2Principal.getName(), false);
 
       AccessPolicy tenant1VolumeAccessPolicy = createVolumeAccessPolicy(
           "vol1", "tenant1");
@@ -189,7 +190,7 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
       Assert.fail(e.getMessage());
     } finally {
       for (String name : policyIdsCreated) {
-        omm.deletePolicybyName(name);
+        omm.deletePolicyByName(name);
       }
       String userId = omm.getUserId(userPrincipal);
       omm.deleteUser(userId);
@@ -199,13 +200,12 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
     }
   }
 
-  private AccessPolicy createVolumeAccessPolicy(String vol, String tenant)
+  private AccessPolicy createVolumeAccessPolicy(String vol, String tenantId)
       throws IOException {
-    OzoneTenantRolePrincipal principal =
-        OzoneTenantRolePrincipal.getUserRole(tenant);
+    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
+        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
     AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
-        // principal already contains volume name
-        principal.getName() + "VolumeAccess");
+        OMMultiTenantManager.getDefaultBucketNamespacePolicyName(tenantId));
     OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
         .setResType(VOLUME).setStoreType(OZONE).setVolumeName(vol)
         .setBucketName("").setKeyName("").build();
@@ -216,13 +216,12 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
     return tenantVolumeAccessPolicy;
   }
 
-  private AccessPolicy allowCreateBucketPolicy(String vol, String tenant)
+  private AccessPolicy allowCreateBucketPolicy(String vol, String tenantId)
       throws IOException {
-    OzoneTenantRolePrincipal principal =
-        OzoneTenantRolePrincipal.getUserRole(tenant);
+    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
+        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
     AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
-        // principal already contains volume name
-        principal.getName() + "BucketAccess");
+        OMMultiTenantManager.getDefaultBucketPolicyName(tenantId));
     OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
         .setResType(BUCKET).setStoreType(OZONE).setVolumeName(vol)
         .setBucketName("*").setKeyName("").build();
@@ -230,12 +229,10 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
     return tenantVolumeAccessPolicy;
   }
 
-
-  // TODO: REMOVE THIS?
   private AccessPolicy allowAccessBucketPolicy(String vol, String bucketName,
-      String tenant) throws IOException {
-    OzoneTenantRolePrincipal principal =
-        OzoneTenantRolePrincipal.getUserRole(tenant);
+      String tenantId) throws IOException {
+    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
+        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
     AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
         principal.getName() + "AllowBucketAccess" + vol + bucketName +
             "Policy");
@@ -252,9 +249,9 @@ public class TestMultiTenantAccessAuthorizerRangerPlugin {
   }
 
   private AccessPolicy allowAccessKeyPolicy(String vol, String bucketName,
-      String tenant) throws IOException {
-    OzoneTenantRolePrincipal principal =
-        OzoneTenantRolePrincipal.getUserRole(tenant);
+      String tenantId) throws IOException {
+    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
+        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
     AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
         principal.getName() + "AllowBucketKeyAccess" + vol + bucketName +
             "Policy");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantVolume.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantVolume.java
index f2c8567296..92be4285fd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantVolume.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantVolume.java
@@ -17,9 +17,11 @@
  */
 package org.apache.hadoop.ozone.om.multitenant;
 
+import com.google.protobuf.ServiceException;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneVolume;
@@ -243,4 +245,20 @@ public class TestMultiTenantVolume {
         new S3Auth("unused1", "unused2", accessID, accessID));
     return new ObjectStore(conf, client);
   }
+
+  @Test
+  public void testOMRangerBGSyncRatisSetVersion()
+      throws IOException, ServiceException {
+    final long writtenVersion = 10L;
+
+    cluster.getOzoneManager().getMultiTenantManager()
+        .getOMRangerBGSyncService().setOMDBRangerServiceVersion(writtenVersion);
+
+    String readBackVersionStr = cluster.getOzoneManager().getMetadataManager()
+        .getMetaTable()
+        .get(OzoneConsts.RANGER_OZONE_SERVICE_VERSION_KEY);
+    long readBackVersion = Long.parseLong(readBackVersionStr);
+
+    Assert.assertEquals(writtenVersion, readBackVersion);
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestRangerBGSyncService.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestRangerBGSyncService.java
new file mode 100644
index 0000000000..04b633890d
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestRangerBGSyncService.java
@@ -0,0 +1,723 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.multitenant.AccessPolicy.AccessGrantType.ALLOW;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.CREATE;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.LIST;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ;
+import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ_ACL;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.BUCKET;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.VOLUME;
+import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
+import static org.apache.hadoop.security.authentication.util.KerberosName.DEFAULT_MECHANISM;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.framework;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.http.auth.BasicUserPrincipal;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import org.slf4j.event.Level;
+
+/**
+ * Tests Ozone Manager Multi-Tenancy feature Background Sync with Apache Ranger.
+ * Marking it as Ignore because it needs Ranger access point.
+ */
+@Ignore("TODO: Requires a Ranger endpoint")
+public class TestRangerBGSyncService {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRangerBGSyncService.class);
+
+  /**
+   * Timeout for each test.
+   */
+  @Rule
+  public Timeout timeout = new Timeout(180, TimeUnit.SECONDS);
+
+  private static final long TEST_SYNC_INTERVAL_SEC = 1L;
+  private static final long TEST_SYNC_TIMEOUT_SEC = 3L;
+
+  private static final int CHECK_SYNC_MILLIS = 1000;
+  private static final int WAIT_SYNC_TIMEOUT_MILLIS = 60000;
+
+  private TemporaryFolder folder = new TemporaryFolder();
+
+  private MultiTenantAccessAuthorizer auth;
+  private OMRangerBGSyncService bgSync;
+
+  // List of policy names created in Ranger
+  private final List<String> policiesCreated = new ArrayList<>();
+  // List of role ID created in Ranger
+  private final List<String> rolesCreated = new ArrayList<>();
+  // List of users created in Ranger
+  private final List<BasicUserPrincipal> usersCreated = new ArrayList<>();
+
+  private static OzoneConfiguration conf;
+  private OzoneManager ozoneManager;
+  private OMMetrics omMetrics;
+  private OMMetadataManager omMetadataManager;
+  private OMMultiTenantManager omMultiTenantManager;
+  private AuditLogger auditLogger;
+
+  private Tenant tenant;
+  private static final String TENANT_ID = "tenant1";
+
+  // UGI-related vars
+  private static final String USER_ALICE = "alice@EXAMPLE.COM";
+  private static final String USER_ALICE_SHORT = "alice";
+  private UserGroupInformation ugiAlice;
+  private static final String USER_BOB_SHORT = "bob";
+
+  private static void simulateOzoneSiteXmlConfig() {
+    // The following configs need to be set before the test can be enabled.
+    // Pass them in as JVM properties. e.g.:
+    //
+    // -Dozone.om.ranger.https-address=http://ranger:6080
+    // -Dozone.om.ranger.https.admin.api.user=admin
+    // -Dozone.om.ranger.https.admin.api.passwd=passwd
+
+    conf.setStrings(OZONE_RANGER_HTTPS_ADDRESS_KEY,
+        System.getProperty(OZONE_RANGER_HTTPS_ADDRESS_KEY));
+    conf.setStrings(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER,
+        System.getProperty(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER));
+    conf.setStrings(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD,
+        System.getProperty(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD));
+  }
+
+  @BeforeClass
+  public static void init() {
+    conf = new OzoneConfiguration();
+    simulateOzoneSiteXmlConfig();
+
+    GenericTestUtils.setLogLevel(OMRangerBGSyncService.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(
+        MultiTenantAccessAuthorizerRangerPlugin.LOG, Level.INFO);
+  }
+
+  @AfterClass
+  public static void shutdown() {
+  }
+
+  @Before
+  public void setUp() throws IOException {
+
+    KerberosName.setRuleMechanism(DEFAULT_MECHANISM);
+    KerberosName.setRules(
+        "RULE:[2:$1@$0](.*@EXAMPLE.COM)s/@.*//\n" +
+            "RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//\n" +
+            "DEFAULT");
+    ugiAlice = UserGroupInformation.createRemoteUser(USER_ALICE);
+    Assert.assertEquals(USER_ALICE_SHORT, ugiAlice.getShortUserName());
+
+    ozoneManager = mock(OzoneManager.class);
+
+    Server.Call call = spy(new Server.Call(1, 1, null, null,
+        RPC.RpcKind.RPC_BUILTIN, new byte[] {1, 2, 3}));
+    // Run as alice, so that Server.getRemoteUser() won't return null.
+    when(call.getRemoteUser()).thenReturn(ugiAlice);
+    Server.getCurCall().set(call);
+
+    String omID = UUID.randomUUID().toString();
+    final String path = GenericTestUtils.getTempPath(omID);
+    Path metaDirPath = Paths.get(path, "om-meta");
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
+
+    omMetrics = OMMetrics.create();
+    folder = new TemporaryFolder(new File("/tmp"));
+    folder.create();
+    conf.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    // No need to conf.set(OzoneConfigKeys.OZONE_ADMINISTRATORS, ...) here
+    //  as we did the trick earlier with mockito.
+    omMetadataManager = new OmMetadataManagerImpl(conf);
+    when(ozoneManager.getMetrics()).thenReturn(omMetrics);
+    when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+    when(ozoneManager.isRatisEnabled()).thenReturn(true);
+    auditLogger = mock(AuditLogger.class);
+    when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+    doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
+
+    // Multi-tenant related initializations
+    omMultiTenantManager = mock(OMMultiTenantManager.class);
+    tenant = mock(Tenant.class);
+    when(ozoneManager.getMultiTenantManager()).thenReturn(omMultiTenantManager);
+    when(ozoneManager.getConfiguration()).thenReturn(conf);
+    when(ozoneManager.isLeaderReady()).thenReturn(true);
+
+    when(omMultiTenantManager.getTenantVolumeName(TENANT_ID))
+        .thenReturn(TENANT_ID);
+    when(omMultiTenantManager.getTenantUserRoleName(TENANT_ID))
+        .thenReturn(OMMultiTenantManager.getDefaultUserRoleName(TENANT_ID));
+    when(omMultiTenantManager.getTenantAdminRoleName(TENANT_ID))
+        .thenReturn(OMMultiTenantManager.getDefaultAdminRoleName(TENANT_ID));
+    when(omMultiTenantManager.newDefaultVolumeAccessPolicy(eq(TENANT_ID),
+        Mockito.any(OzoneTenantRolePrincipal.class),
+        Mockito.any(OzoneTenantRolePrincipal.class)))
+        .thenReturn(newVolumeAccessPolicy(TENANT_ID, TENANT_ID));
+    when(omMultiTenantManager.newDefaultBucketAccessPolicy(eq(TENANT_ID),
+        Mockito.any(OzoneTenantRolePrincipal.class)))
+        .thenReturn(newBucketAccessPolicy(TENANT_ID, TENANT_ID));
+
+    // Raft client request handling
+    OzoneManagerRatisServer omRatisServer = mock(OzoneManagerRatisServer.class);
+    when(omRatisServer.getRaftPeerId())
+        .thenReturn(RaftPeerId.valueOf("peerId"));
+    when(omRatisServer.getRaftGroupId())
+        .thenReturn(RaftGroupId.randomId());
+
+    when(ozoneManager.getOmRatisServer()).thenReturn(omRatisServer);
+
+    try {
+      doAnswer(invocation -> {
+        OMRequest request = invocation.getArgument(0);
+        long v = request.getSetRangerServiceVersionRequest()
+            .getRangerServiceVersion();
+        LOG.info("Writing Ranger Ozone Service Version to DB: {}", v);
+        ozoneManager.getMetadataManager().getMetaTable().put(
+            OzoneConsts.RANGER_OZONE_SERVICE_VERSION_KEY, String.valueOf(v));
+        return null;
+      }).when(omRatisServer).submitRequest(Mockito.any(), Mockito.any());
+    } catch (ServiceException e) {
+      throw new RuntimeException(e);
+    }
+
+    when(tenant.getTenantAccessPolicies()).thenReturn(new ArrayList<>());
+
+    auth = new MultiTenantAccessAuthorizerRangerPlugin();
+    auth.init(conf);
+  }
+
+  @After
+  public void tearDown() {
+    bgSync.shutdown();
+    cleanupPoliciesRolesUsers();
+    omMetrics.unRegister();
+    framework().clearInlineMocks();
+  }
+
+  private AccessPolicy newVolumeAccessPolicy(String vol, String tenantId)
+      throws IOException {
+    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
+        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
+    OzoneTenantRolePrincipal adminRole = new OzoneTenantRolePrincipal(
+        OMMultiTenantManager.getDefaultAdminRoleName(tenantId));
+    AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
+        OMMultiTenantManager.getDefaultBucketNamespacePolicyName(tenantId));
+    OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
+        .setResType(VOLUME).setStoreType(OZONE).setVolumeName(vol)
+        .setBucketName("").setKeyName("").build();
+    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal, READ, ALLOW);
+    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal, LIST, ALLOW);
+    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal,
+        READ_ACL, ALLOW);
+    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, adminRole, ALL, ALLOW);
+    return tenantVolumeAccessPolicy;
+  }
+
+  private AccessPolicy newBucketAccessPolicy(String vol, String tenantId)
+      throws IOException {
+    OzoneTenantRolePrincipal principal = new OzoneTenantRolePrincipal(
+        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
+    AccessPolicy tenantVolumeAccessPolicy = new RangerAccessPolicy(
+        OMMultiTenantManager.getDefaultBucketPolicyName(tenantId));
+    OzoneObjInfo obj = OzoneObjInfo.Builder.newBuilder()
+        .setResType(BUCKET).setStoreType(OZONE).setVolumeName(vol)
+        .setBucketName("*").setKeyName("").build();
+    tenantVolumeAccessPolicy.addAccessPolicyElem(obj, principal, CREATE, ALLOW);
+    return tenantVolumeAccessPolicy;
+  }
+
+  long initBGSync() throws IOException {
+    bgSync = new OMRangerBGSyncService(ozoneManager, auth,
+        TEST_SYNC_INTERVAL_SEC, TimeUnit.SECONDS, TEST_SYNC_TIMEOUT_SEC);
+    return bgSync.getLatestRangerServiceVersion();
+  }
+
+  public void createRolesAndPoliciesInRanger(boolean populateDB) {
+
+    policiesCreated.clear();
+    rolesCreated.clear();
+
+    BasicUserPrincipal userAlice = new BasicUserPrincipal(USER_ALICE_SHORT);
+    BasicUserPrincipal userBob = new BasicUserPrincipal(USER_BOB_SHORT);
+    // Tenant name to be used for this test
+    final String tenantId = TENANT_ID;
+    // volume name = bucket namespace name
+    final String volumeName = tenantId;
+
+    final OzoneTenantRolePrincipal adminRole = new OzoneTenantRolePrincipal(
+        OMMultiTenantManager.getDefaultAdminRoleName(tenantId));
+    final OzoneTenantRolePrincipal userRole = new OzoneTenantRolePrincipal(
+        OMMultiTenantManager.getDefaultUserRoleName(tenantId));
+    final String bucketNamespacePolicyName =
+        OMMultiTenantManager.getDefaultBucketNamespacePolicyName(tenantId);
+    final String bucketPolicyName =
+        OMMultiTenantManager.getDefaultBucketPolicyName(tenantId);
+
+    // Add tenant entry in OM DB
+    if (populateDB) {
+      LOG.info("Creating OM DB tenant entries");
+      try {
+        // Tenant State entry
+        omMetadataManager.getTenantStateTable().put(tenantId,
+            new OmDBTenantState(
+                tenantId, volumeName, userRole.getName(), adminRole.getName(),
+                bucketNamespacePolicyName, bucketPolicyName));
+        // Access ID entry for alice
+        final String aliceAccessId = OMMultiTenantManager.getDefaultAccessId(
+            tenantId, userAlice.getName());
+        omMetadataManager.getTenantAccessIdTable().put(aliceAccessId,
+            new OmDBAccessIdInfo.Builder()
+                .setTenantId(tenantId)
+                .setUserPrincipal(userAlice.getName())
+                .setIsAdmin(false)
+                .setIsDelegatedAdmin(false)
+                .build());
+        // Access ID entry for bob
+        final String bobAccessId = OMMultiTenantManager.getDefaultAccessId(
+            tenantId, userBob.getName());
+        omMetadataManager.getTenantAccessIdTable().put(bobAccessId,
+            new OmDBAccessIdInfo.Builder()
+                .setTenantId(tenantId)
+                .setUserPrincipal(userBob.getName())
+                .setIsAdmin(false)
+                .setIsDelegatedAdmin(false)
+                .build());
+      } catch (IOException e) {
+        Assert.fail(e.getMessage());
+      }
+    }
+
+    try {
+      LOG.info("Creating admin role in Ranger: {}", adminRole.getName());
+      auth.createRole(adminRole.getName(), null);
+      String role1 = auth.getRole(adminRole);
+      rolesCreated.add(0, role1);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+
+    try {
+      LOG.info("Creating user role in Ranger: {}", userRole.getName());
+      auth.createRole(userRole.getName(), adminRole.getName());
+      String role2 = auth.getRole(userRole);
+      // Prepend user role (order matters when deleting due to dependency)
+      rolesCreated.add(0, role2);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+
+    try {
+      LOG.info("Creating user in Ranger: {}", USER_ALICE_SHORT);
+      auth.createUser(USER_ALICE_SHORT, "password1");
+      usersCreated.add(userAlice);
+      auth.assignUserToRole(userAlice, auth.getRole(userRole), false);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+
+    try {
+      LOG.info("Creating user in Ranger: {}", USER_BOB_SHORT);
+      auth.createUser(USER_BOB_SHORT, "password2");
+      usersCreated.add(userBob);
+      auth.assignUserToRole(userBob, auth.getRole(userRole), false);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+
+    try {
+      AccessPolicy tenant1VolumeAccessPolicy = newVolumeAccessPolicy(
+          volumeName, tenantId);
+      LOG.info("Creating VolumeAccess policy in Ranger: {}",
+          tenant1VolumeAccessPolicy.getPolicyName());
+      auth.createAccessPolicy(tenant1VolumeAccessPolicy);
+      policiesCreated.add(tenant1VolumeAccessPolicy.getPolicyName());
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+
+    try {
+      AccessPolicy tenant1BucketCreatePolicy = newBucketAccessPolicy(
+          volumeName, tenantId);
+      LOG.info("Creating BucketAccess policy in Ranger: {}",
+          tenant1BucketCreatePolicy.getPolicyName());
+      auth.createAccessPolicy(tenant1BucketCreatePolicy);
+      policiesCreated.add(tenant1BucketCreatePolicy.getPolicyName());
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  public void cleanupPolicies() {
+    for (String name : policiesCreated) {
+      try {
+        LOG.info("Deleting policy: {}", name);
+        auth.deletePolicyByName(name);
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+      }
+    }
+  }
+
+  public void cleanupRoles() {
+    for (String roleObj : rolesCreated) {
+      final JsonObject jObj = new JsonParser().parse(roleObj).getAsJsonObject();
+      final String roleId = jObj.get("id").getAsString();
+      final String roleName = jObj.get("name").getAsString();
+      try {
+        LOG.info("Deleting role: {}", roleName);
+        auth.deleteRole(roleId);
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+      }
+    }
+  }
+
+  public void cleanupUsers() {
+    for (BasicUserPrincipal user : usersCreated) {
+      try {
+        LOG.info("Deleting user: {}", user);
+        String userId = auth.getUserId(user);
+        auth.deleteUser(userId);
+      } catch (Exception e) {
+        LOG.error(e.getMessage());
+      }
+    }
+  }
+
+  public void cleanupOMDB() {
+    try {
+      omMetadataManager.getTenantStateTable().delete(TENANT_ID);
+      omMetadataManager.getTenantAccessIdTable().delete(
+          OMMultiTenantManager.getDefaultAccessId(TENANT_ID, USER_ALICE_SHORT));
+      omMetadataManager.getTenantAccessIdTable().delete(
+          OMMultiTenantManager.getDefaultAccessId(TENANT_ID, USER_BOB_SHORT));
+    } catch (IOException e) {
+      LOG.error(e.getMessage());
+    }
+  }
+
+  public void cleanupPoliciesRolesUsers() {
+    cleanupPolicies();
+    cleanupRoles();
+    cleanupUsers();
+
+    cleanupOMDB();
+  }
+
+  /**
+   * OM DB does not have the tenant state.
+   * Expect sync service to clean up all the leftover multi-tenancy
+   * policies and roles in Ranger.
+   */
+  @Test
+  public void testRemovePolicyAndRole() throws Exception {
+    long startingRangerVersion = initBGSync();
+
+    // Create roles and policies in ranger that are NOT
+    // backed up by OzoneManger Multi-Tenant tables
+    createRolesAndPoliciesInRanger(false);
+
+    final long rangerSvcVersionBefore = bgSync.getLatestRangerServiceVersion();
+    Assert.assertTrue(rangerSvcVersionBefore >= startingRangerVersion);
+
+    // Note: DB Service Version will be -1 if the test starts with an empty DB
+    final long dbSvcVersionBefore = bgSync.getOMDBRangerServiceVersion();
+    bgSync.start();
+    // Wait for sync to finish once.
+    // The counter is incremented at the beginning of the run, hence the ">"
+    GenericTestUtils.waitFor(() -> bgSync.getRangerSyncRunCount() > 1L,
+        CHECK_SYNC_MILLIS, WAIT_SYNC_TIMEOUT_MILLIS);
+    bgSync.shutdown();
+    final long dbSvcVersionAfter = bgSync.getOMDBRangerServiceVersion();
+    final long rangerSvcVersionAfter = bgSync.getLatestRangerServiceVersion();
+    Assert.assertEquals(rangerSvcVersionAfter, dbSvcVersionAfter);
+    Assert.assertTrue(dbSvcVersionAfter > dbSvcVersionBefore);
+    Assert.assertTrue(rangerSvcVersionAfter > rangerSvcVersionBefore);
+
+    // Verify that the Ranger policies and roles not backed up
+    // by OzoneManager Multi-Tenancy tables are cleaned up by sync thread
+
+    for (String policy : policiesCreated) {
+      final AccessPolicy policyRead = auth.getAccessPolicyByName(policy);
+      Assert.assertNull("This policy should have been deleted from Ranger: " +
+          policy, policyRead);
+    }
+
+    for (String roleObj : rolesCreated) {
+      final String roleName = new JsonParser().parse(roleObj)
+          .getAsJsonObject().get("name").getAsString();
+      final String roleObjRead = auth.getRole(roleName);
+      Assert.assertNull("This role should have been deleted from Ranger: " +
+          roleName, roleObjRead);
+    }
+  }
+
+  /**
+   * OM DB has the tenant state.
+   * Ranger has the consistent role status, and the policies are in-place.
+   * Expect sync service to check Ranger state but write nothing to Ranger.
+   */
+  @Test
+  public void testConsistentState() throws Exception {
+    long startingRangerVersion = initBGSync();
+
+    // Create roles and policies in ranger that are
+    // backed up by OzoneManger Multi-Tenant tables
+    createRolesAndPoliciesInRanger(true);
+
+    long rangerSvcVersionBefore = bgSync.getLatestRangerServiceVersion();
+    Assert.assertTrue(rangerSvcVersionBefore >= startingRangerVersion);
+
+    // Note: DB Service Version will be -1 if the test starts with an empty DB
+    final long dbSvcVersionBefore = bgSync.getOMDBRangerServiceVersion();
+    bgSync.start();
+    // Wait for sync to finish once.
+    // The counter is incremented at the beginning of the run, hence the ">"
+    GenericTestUtils.waitFor(() -> bgSync.getRangerSyncRunCount() > 1L,
+        CHECK_SYNC_MILLIS, WAIT_SYNC_TIMEOUT_MILLIS);
+    bgSync.shutdown();
+    final long dbSvcVersionAfter = bgSync.getOMDBRangerServiceVersion();
+    final long rangerSvcVersionAfter = bgSync.getLatestRangerServiceVersion();
+    Assert.assertEquals(rangerSvcVersionAfter, dbSvcVersionAfter);
+    Assert.assertEquals(rangerSvcVersionAfter, rangerSvcVersionBefore);
+    if (dbSvcVersionBefore != -1L) {
+      Assert.assertEquals(dbSvcVersionBefore, dbSvcVersionAfter);
+    }
+
+    for (String policy : policiesCreated) {
+      try {
+        final AccessPolicy policyRead = auth.getAccessPolicyByName(policy);
+
+        Assert.assertEquals(policy, policyRead.getPolicyName());
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+
+    for (String roleObj : rolesCreated) {
+      try {
+        final String roleName = new JsonParser().parse(roleObj)
+            .getAsJsonObject().get("name").getAsString();
+        String roleObjRead = auth.getRole(roleName);
+        final String roleNameReadBack = new JsonParser().parse(roleObjRead)
+            .getAsJsonObject().get("name").getAsString();
+        Assert.assertEquals(roleName, roleNameReadBack);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * OM DB has the tenant state.
+   * But the user list in a Ranger role is tampered with.
+   * Expect sync service to restore that Ranger role to the desired state.
+   */
+  @Test
+  public void testRecoverRangerRole() throws Exception {
+    long startingRangerVersion = initBGSync();
+
+    createRolesAndPoliciesInRanger(true);
+
+    long rangerVersionAfterCreation = bgSync.getLatestRangerServiceVersion();
+    Assert.assertTrue(rangerVersionAfterCreation >= startingRangerVersion);
+
+    // Delete a user from user role, expect Ranger sync thread to update it
+    String userRoleName = new JsonParser().parse(rolesCreated.get(0))
+        .getAsJsonObject().get("name").getAsString();
+    Assert.assertEquals(
+        OMMultiTenantManager.getDefaultUserRoleName(TENANT_ID), userRoleName);
+
+    auth.revokeUserFromRole(
+        new BasicUserPrincipal(USER_BOB_SHORT), auth.getRole(userRoleName));
+
+    HashSet<String> userSet = new HashSet<>();
+    userSet.add(USER_ALICE_SHORT);
+    userSet.add(USER_BOB_SHORT);
+
+    // Note: DB Service Version will be -1 if the test starts with an empty DB
+    final long dbSvcVersionBefore = bgSync.getOMDBRangerServiceVersion();
+    final long rangerSvcVersionBefore = bgSync.getLatestRangerServiceVersion();
+    final long currRunCount = bgSync.getRangerSyncRunCount();
+    bgSync.start();
+    // Wait for sync to finish once.
+    // The counter is incremented at the beginning of the run, hence the ">"
+    GenericTestUtils.waitFor(
+        () -> bgSync.getRangerSyncRunCount() > currRunCount + 1L,
+        CHECK_SYNC_MILLIS, WAIT_SYNC_TIMEOUT_MILLIS);
+    bgSync.shutdown();
+    final long dbSvcVersionAfter = bgSync.getOMDBRangerServiceVersion();
+    final long rangerSvcVersionAfter = bgSync.getLatestRangerServiceVersion();
+    Assert.assertEquals(rangerSvcVersionAfter, dbSvcVersionAfter);
+    Assert.assertTrue(dbSvcVersionAfter > dbSvcVersionBefore);
+    Assert.assertTrue(rangerSvcVersionAfter > rangerSvcVersionBefore);
+
+    for (String policy : policiesCreated) {
+      final AccessPolicy verifier = auth.getAccessPolicyByName(policy);
+      Assert.assertNotNull("Policy should exist in Ranger: " + policy,
+          verifier);
+      Assert.assertEquals(policy, verifier.getPolicyName());
+    }
+
+    for (String role : rolesCreated) {
+      final String roleName = new JsonParser().parse(role).getAsJsonObject()
+          .get("name").getAsString();
+      if (!roleName.equals(userRoleName)) {
+        continue;
+      }
+      final String roleObjRead = auth.getRole(roleName);
+      final JsonObject jsonObj = new JsonParser().parse(roleObjRead)
+          .getAsJsonObject();
+      final JsonArray verifier = jsonObj.get("users").getAsJsonArray();
+      Assert.assertEquals(2, verifier.size());
+      // Verify that users are in the role
+      for (int i = 0; i < verifier.size();  ++i) {
+        String user = verifier.get(i).getAsJsonObject()
+            .get("name").getAsString();
+        Assert.assertTrue(userSet.contains(user));
+        userSet.remove(user);
+      }
+      Assert.assertTrue(userSet.isEmpty());
+      break;
+    }
+  }
+
+  /**
+   * OM DB has the tenant state. But tenant policies are deleted from Ranger.
+   * Expect sync service to recover both policies to their default states.
+   */
+  @Test
+  public void testRecreateDeletedRangerPolicy() throws Exception {
+    long startingRangerVersion = initBGSync();
+
+    // Create roles and policies in ranger that are
+    // backed up by OzoneManger Multi-Tenant tables
+    createRolesAndPoliciesInRanger(true);
+
+    long rangerVersionAfterCreation = bgSync.getLatestRangerServiceVersion();
+    Assert.assertTrue(rangerVersionAfterCreation >= startingRangerVersion);
+
+    // Delete both policies, expect Ranger sync thread to recover both
+    auth.deletePolicyByName(
+        OMMultiTenantManager.getDefaultBucketNamespacePolicyName(TENANT_ID));
+    auth.deletePolicyByName(
+        OMMultiTenantManager.getDefaultBucketPolicyName(TENANT_ID));
+
+    final long rangerSvcVersionBefore = bgSync.getLatestRangerServiceVersion();
+    // Note: DB Service Version will be -1 if the test starts with an empty DB
+    final long dbSvcVersionBefore = bgSync.getOMDBRangerServiceVersion();
+    bgSync.start();
+    // Wait for sync to finish once.
+    // The counter is incremented at the beginning of the run, hence the ">"
+    GenericTestUtils.waitFor(() -> bgSync.getRangerSyncRunCount() > 1L,
+        CHECK_SYNC_MILLIS, WAIT_SYNC_TIMEOUT_MILLIS);
+    bgSync.shutdown();
+    long dbSvcVersionAfter = bgSync.getOMDBRangerServiceVersion();
+    final long rangerSvcVersionAfter = bgSync.getLatestRangerServiceVersion();
+    Assert.assertEquals(rangerSvcVersionAfter, dbSvcVersionAfter);
+    Assert.assertTrue(dbSvcVersionAfter > dbSvcVersionBefore);
+    Assert.assertTrue(rangerSvcVersionAfter > rangerSvcVersionBefore);
+
+    for (String policy : policiesCreated) {
+      try {
+        final AccessPolicy policyRead = auth.getAccessPolicyByName(policy);
+
+        Assert.assertEquals(policy, policyRead.getPolicyName());
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+
+    for (String roleObj : rolesCreated) {
+      try {
+        final String roleName = new JsonParser().parse(roleObj)
+            .getAsJsonObject().get("name").getAsString();
+        String roleObjRead = auth.getRole(roleName);
+        final String roleNameReadBack = new JsonParser().parse(roleObjRead)
+            .getAsJsonObject().get("name").getAsString();
+        Assert.assertEquals(roleName, roleNameReadBack);
+      } catch (Exception e) {
+        e.printStackTrace();
+        Assert.fail(e.getMessage());
+      }
+    }
+  }
+
+}
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 750ece9573..deb77db639 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -118,6 +118,8 @@ enum Type {
   TenantListUser = 105;
 
   SetS3Secret = 106;
+
+  SetRangerServiceVersion = 107;
 }
 
 message OMRequest {
@@ -218,6 +220,8 @@ message OMRequest {
   optional TenantListUserRequest            tenantListUserRequest          = 105;
 
   optional SetS3SecretRequest               SetS3SecretRequest             = 106;
+
+  optional SetRangerServiceVersionRequest   SetRangerServiceVersionRequest = 107;
 }
 
 message OMResponse {
@@ -301,8 +305,8 @@ message OMResponse {
   optional ListTenantResponse                ListTenantResponse            = 98;
 
   optional TenantGetUserInfoResponse         TenantGetUserInfoResponse     = 99;
-  optional TenantAssignUserAccessIdResponse  TenantAssignUserAccessIdResponse= 100;
-  optional TenantRevokeUserAccessIdResponse  TenantRevokeUserAccessIdResponse= 101;
+  optional TenantAssignUserAccessIdResponse  TenantAssignUserAccessIdResponse = 100;
+  optional TenantRevokeUserAccessIdResponse  TenantRevokeUserAccessIdResponse = 101;
 
   optional TenantAssignAdminResponse         TenantAssignAdminResponse     = 102;
   optional TenantRevokeAdminResponse         TenantRevokeAdminResponse     = 103;
@@ -311,6 +315,8 @@ message OMResponse {
   optional TenantListUserResponse            tenantListUserResponse        = 105;
 
   optional SetS3SecretResponse               SetS3SecretResponse           = 106;
+
+  optional SetRangerServiceVersionResponse   SetRangerServiceVersionResponse = 107;
 }
 
 enum Status {
@@ -1486,6 +1492,12 @@ message RevokeS3SecretRequest {
 message CreateTenantRequest {
     optional string tenantId = 1;  // Tenant name
     optional string volumeName = 2;
+    optional string userRoleName = 3;
+    optional string adminRoleName = 4;
+}
+
+message SetRangerServiceVersionRequest {
+    required uint64 rangerServiceVersion = 1;
 }
 
 message DeleteTenantRequest {
@@ -1522,6 +1534,9 @@ message CreateTenantResponse {
 
 }
 
+message SetRangerServiceVersionResponse {
+}
+
 message DeleteTenantResponse {
     optional string volumeName = 1;
     optional int64 volRefCount = 2;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
index 6d2d2ab0d4..96ac1e45aa 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java
@@ -18,12 +18,16 @@ package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.helpers.TenantUserList;
+import org.apache.hadoop.ozone.om.multitenant.AccessPolicy;
+import org.apache.hadoop.ozone.om.multitenant.OMRangerBGSyncService;
+import org.apache.hadoop.ozone.om.multitenant.OzoneTenantRolePrincipal;
 import org.apache.hadoop.ozone.om.multitenant.Tenant;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -31,7 +35,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.http.auth.BasicUserPrincipal;
 import org.slf4j.Logger;
 
-import static org.apache.hadoop.ozone.OzoneConsts.TENANT_ID_USERNAME_DELIMITER;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_ENABLED;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
@@ -43,7 +46,7 @@ import static org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl.OZONE_OM_TENAN
  * OM MultiTenant manager interface.
  */
 public interface OMMultiTenantManager {
-  /*
+  /* TODO: Outdated
    * Init multi-tenant manager. Performs initialization e.g.
    *  - Initialize Multi-Tenant-Gatekeeper-Plugin
    *  - Validate Multi-Tenant Bucket-NameSpaces
@@ -55,15 +58,23 @@ public interface OMMultiTenantManager {
    *       . superusers  <-in-sync-> OzoneConf,
    *       . OM-DB state <-in-sync-> IMultiTenantGateKeeperPluginState
    *       . OM DB state is always the source of truth.
-   *
-   * @throws IOException
    */
-//  void start() throws IOException;
-//
-//  /**
-//   * Stop multi-tenant manager.
-//   */
-//  void stop() throws Exception;
+
+  /**
+   * Start background thread(s) in the multi-tenant manager.
+   */
+  void start() throws IOException;
+
+  /**
+   * Stop background thread(s) in the multi-tenant manager.
+   */
+  void stop() throws IOException;
+
+  /**
+   * Returns the instance of OMRangerBGSyncService.
+   */
+  @VisibleForTesting
+  OMRangerBGSyncService getOMRangerBGSyncService();
 
   /**
    * Returns the corresponding OzoneManager instance.
@@ -76,9 +87,12 @@ public interface OMMultiTenantManager {
    * Given a TenantID String, Create and return Tenant Interface.
    *
    * @param tenantID
+   * @param userRoleName
+   * @param adminRoleName
    * @return Tenant interface.
    */
-  Tenant createTenantAccessInAuthorizer(String tenantID) throws IOException;
+  Tenant createTenantAccessInAuthorizer(String tenantID, String userRoleName,
+      String adminRoleName) throws IOException;
 
   /**
    * Given a TenantID, destroys all state associated with that tenant.
@@ -103,10 +117,10 @@ public interface OMMultiTenantManager {
 
   /**
    * Revoke user accessId.
-   * @param accessID
+   * @param accessId
    * @throws IOException
    */
-  void revokeUserAccessId(String accessID) throws IOException;
+  void revokeUserAccessId(String accessId) throws IOException;
 
   /**
    * A placeholder method to remove a failed-to-assign accessId from
@@ -131,7 +145,7 @@ public interface OMMultiTenantManager {
    * @return access ID in the form of tenantName$username
    */
   static String getDefaultAccessId(String tenantId, String userPrincipal) {
-    return tenantId + TENANT_ID_USERNAME_DELIMITER + userPrincipal;
+    return tenantId + OzoneConsts.TENANT_ID_USERNAME_DELIMITER + userPrincipal;
   }
 
   /**
@@ -239,6 +253,20 @@ public interface OMMultiTenantManager {
    */
   String getTenantVolumeName(String tenantId) throws IOException;
 
+  /**
+   * Retrieve user role name of the given tenant.
+   * @param tenantId tenant name
+   * @return tenant user role name
+   */
+  String getTenantUserRoleName(String tenantId) throws IOException;
+
+  /**
+   * Retrieve admin role name of the given tenant.
+   * @param tenantId tenant name
+   * @return tenant user role name
+   */
+  String getTenantAdminRoleName(String tenantId) throws IOException;
+
   boolean isUserAccessIdPrincipalOrTenantAdmin(String accessId,
       UserGroupInformation ugi) throws IOException;
 
@@ -321,4 +349,17 @@ public interface OMMultiTenantManager {
 
     return true;
   }
+
+  /**
+   * Returns default VolumeAccess policy given tenant and role names.
+   */
+  AccessPolicy newDefaultVolumeAccessPolicy(String tenantId,
+      OzoneTenantRolePrincipal userRole, OzoneTenantRolePrincipal adminRole)
+      throws IOException;
+
+  /**
+   * Returns default BucketAccess policy given tenant and user role name.
+   */
+  AccessPolicy newDefaultBucketAccessPolicy(String tenantId,
+      OzoneTenantRolePrincipal userRole) throws IOException;
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
index 1c359be888..36f902587b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
@@ -17,6 +17,11 @@
  */
 package org.apache.hadoop.ozone.om;
 
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_RANGER_SYNC_TIMEOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_RANGER_SYNC_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INTERNAL_ERROR;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_ACCESS_ID;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_AUTHORIZER_ERROR;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_NOT_FOUND;
@@ -33,15 +38,19 @@ import static org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType.OZONE;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Optional;
+
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.db.Table;
@@ -56,6 +65,8 @@ import org.apache.hadoop.ozone.om.helpers.TenantUserList;
 import org.apache.hadoop.ozone.om.multitenant.AccessPolicy;
 import org.apache.hadoop.ozone.om.multitenant.BucketNameSpace;
 import org.apache.hadoop.ozone.om.multitenant.CachedTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.om.multitenant.OMRangerBGSyncService;
 import org.apache.hadoop.ozone.om.multitenant.OzoneTenant;
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessAuthorizer;
 import org.apache.hadoop.ozone.om.multitenant.MultiTenantAccessAuthorizerDummyPlugin;
@@ -72,7 +83,6 @@ import org.apache.http.auth.BasicUserPrincipal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
@@ -92,38 +102,81 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
   private final OzoneManager ozoneManager;
   private final OMMetadataManager omMetadataManager;
   private final OzoneConfiguration conf;
-  private final ReentrantReadWriteLock controlPathLock;
+  // tenantCache: tenantId -> CachedTenantState
   private final Map<String, CachedTenantState> tenantCache;
+  private final ReentrantReadWriteLock tenantCacheLock;
+  private final OMRangerBGSyncService omRangerBGSyncService;
 
-  OMMultiTenantManagerImpl(OzoneManager ozoneManager, OzoneConfiguration conf)
+  public OMMultiTenantManagerImpl(OzoneManager ozoneManager,
+                                  OzoneConfiguration conf)
       throws IOException {
     this.conf = conf;
-    this.controlPathLock = new ReentrantReadWriteLock();
     this.ozoneManager = ozoneManager;
     this.omMetadataManager = ozoneManager.getMetadataManager();
     this.tenantCache = new ConcurrentHashMap<>();
-    boolean devSkipRanger = conf.getBoolean(OZONE_OM_TENANT_DEV_SKIP_RANGER,
-        false);
+    this.tenantCacheLock = new ReentrantReadWriteLock();
+
+    loadTenantCacheFromDB();
+
+    boolean devSkipRanger =
+        conf.getBoolean(OZONE_OM_TENANT_DEV_SKIP_RANGER, false);
     if (devSkipRanger) {
       this.authorizer = new MultiTenantAccessAuthorizerDummyPlugin();
     } else {
       this.authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
     }
-    this.authorizer.init(conf);
-    loadUsersFromDB();
+    try {
+      this.authorizer.init(conf);
+    } catch (OMException ex) {
+      if (ex.getResult().equals(INTERNAL_ERROR)) {
+        LOG.error("Failed to initialize {}, falling back to dummy authorizer",
+            authorizer.getClass().getSimpleName());
+        this.authorizer = new MultiTenantAccessAuthorizerDummyPlugin();
+      } else {
+        throw ex;
+      }
+    }
+
+    // Define the internal time unit for the config
+    final TimeUnit internalTimeUnit = TimeUnit.SECONDS;
+    // Get the interval in internal time unit
+    long rangerSyncInterval = ozoneManager.getConfiguration().getTimeDuration(
+        OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL,
+        OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL_DEFAULT.getDuration(),
+        OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL_DEFAULT.getUnit(),
+        internalTimeUnit);
+    // Get the timeout in internal time unit
+    long rangerSyncTimeout = ozoneManager.getConfiguration().getTimeDuration(
+        OZONE_OM_MULTITENANCY_RANGER_SYNC_TIMEOUT,
+        OZONE_OM_MULTITENANCY_RANGER_SYNC_TIMEOUT_DEFAULT.getDuration(),
+        OZONE_OM_MULTITENANCY_RANGER_SYNC_TIMEOUT_DEFAULT.getUnit(),
+        internalTimeUnit);
+    // Initialize the Ranger Sync Thread
+    omRangerBGSyncService = new OMRangerBGSyncService(ozoneManager, authorizer,
+        rangerSyncInterval, internalTimeUnit, rangerSyncTimeout);
+    // Start the Ranger Sync Thread
+    this.start();
   }
 
+  public OMRangerBGSyncService getOMRangerBGSyncService() {
+    return omRangerBGSyncService;
+  }
 
-// start() and stop() lifeycle methods can be added when there is a background
-// work going on.
-//  @Override
-//  public void start() throws IOException {
-//  }
-//
-//  @Override
-//  public void stop() throws Exception {
-//
-//  }
+  /**
+   * Start the Ranger policy and role sync thread.
+   */
+  @Override
+  public void start() throws IOException {
+    omRangerBGSyncService.start();
+  }
+
+  /**
+   * Stop the Ranger policy and role sync thread.
+   */
+  @Override
+  public void stop() throws IOException {
+    omRangerBGSyncService.shutdown();
+  }
 
   @Override
   public OMMetadataManager getOmMetadataManager() {
@@ -159,35 +212,39 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
    *      We can do all of this as part of holding a coarse lock and synchronize
    *      these control path operations.
    *
-   * @param tenantID
+   * @param tenantId
+   * @param userRoleName
+   * @param adminRoleName
    * @return Tenant
    * @throws IOException
    */
   @Override
-  public Tenant createTenantAccessInAuthorizer(String tenantID)
+  public Tenant createTenantAccessInAuthorizer(String tenantId,
+      String userRoleName, String adminRoleName)
       throws IOException {
 
-    Tenant tenant = new OzoneTenant(tenantID);
+    Tenant tenant = new OzoneTenant(tenantId);
     try {
-      controlPathLock.writeLock().lock();
+      tenantCacheLock.writeLock().lock();
 
       // Create admin role first
-      final OzoneTenantRolePrincipal adminRole =
-          OzoneTenantRolePrincipal.getAdminRole(tenantID);
-      String adminRoleId = authorizer.createRole(adminRole, null);
+      String adminRoleId = authorizer.createRole(adminRoleName, null);
       tenant.addTenantAccessRole(adminRoleId);
 
       // Then create user role, and add admin role as its delegated admin
-      final OzoneTenantRolePrincipal userRole =
-          OzoneTenantRolePrincipal.getUserRole(tenantID);
-      String userRoleId = authorizer.createRole(userRole, adminRole.getName());
+      String userRoleId = authorizer.createRole(userRoleName, adminRoleName);
       tenant.addTenantAccessRole(userRoleId);
 
       BucketNameSpace bucketNameSpace = tenant.getTenantBucketNameSpace();
-      // bucket namespace is volume name ??
+      // Bucket namespace is volume
       for (OzoneObj volume : bucketNameSpace.getBucketNameSpaceObjects()) {
         String volumeName = volume.getVolumeName();
 
+        final OzoneTenantRolePrincipal userRole =
+            new OzoneTenantRolePrincipal(userRoleName);
+        final OzoneTenantRolePrincipal adminRole =
+            new OzoneTenantRolePrincipal(adminRoleName);
+
         // Allow Volume List access
         AccessPolicy tenantVolumeAccessPolicy = newDefaultVolumeAccessPolicy(
             volumeName, userRole, adminRole);
@@ -203,26 +260,39 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
         tenant.addTenantAccessPolicy(tenantBucketCreatePolicy);
       }
 
-      tenantCache.put(tenantID, new CachedTenantState(tenantID));
-    } catch (Exception e) {
+      if (tenantCache.containsKey(tenantId)) {
+        LOG.warn("Cache entry for tenant '{}' somehow already exists, "
+            + "will be overwritten", tenantId);  // TODO: throw exception?
+      }
+
+      // TODO: Move tenantCache update to a separate call createTenantAccessInDB
+      //  createTenantAccessInAuthorizer is called preExecute to update Ranger
+      //  createTenantAccessInDB will be called in validateAndUpdateCache
+      //  Do the same to all other InAuthorizer calls as well.
+      // New entry in tenant cache
+      tenantCache.put(tenantId, new CachedTenantState(
+          tenantId, userRoleName, adminRoleName));
+
+    } catch (IOException e) {
       try {
         removeTenantAccessFromAuthorizer(tenant);
-      } catch (Exception exception) {
+      } catch (IOException ignored) {
         // Best effort cleanup.
       }
-      throw new IOException(e.getMessage());
+      throw e;
     } finally {
-      controlPathLock.writeLock().unlock();
+      tenantCacheLock.writeLock().unlock();
     }
     return tenant;
   }
 
   @Override
-  public void removeTenantAccessFromAuthorizer(Tenant tenant) throws Exception {
+  public void removeTenantAccessFromAuthorizer(Tenant tenant)
+      throws IOException {
     try {
-      controlPathLock.writeLock().lock();
+      tenantCacheLock.writeLock().lock();
       for (AccessPolicy policy : tenant.getTenantAccessPolicies()) {
-        authorizer.deletePolicybyId(policy.getPolicyID());
+        authorizer.deletePolicyById(policy.getPolicyID());
       }
       for (String roleId : tenant.getTenantRoles()) {
         authorizer.deleteRole(roleId);
@@ -232,8 +302,8 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
             tenant.getTenantId());
         tenantCache.remove(tenant.getTenantId());
       }
-    }  finally {
-      controlPathLock.writeLock().unlock();
+    } finally {
+      tenantCacheLock.writeLock().unlock();
     }
   }
 
@@ -259,57 +329,75 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
    */
   @Override
   public String assignUserToTenant(BasicUserPrincipal principal,
-                                 String tenantId,
-                                 String accessId) throws IOException {
-    ImmutablePair<String, String> userAccessIdPair =
-        new ImmutablePair<>(principal.getName(), accessId);
+      String tenantId, String accessId) throws IOException {
+
+    final CachedAccessIdInfo cacheEntry =
+        new CachedAccessIdInfo(principal.getName(), false);
+
     try {
-      controlPathLock.writeLock().lock();
-
-      LOG.info("Adding user '{}' to tenant '{}' in-memory state.",
-          principal.getName(), tenantId);
-      CachedTenantState cachedTenantState =
-          tenantCache.getOrDefault(tenantId,
-              new CachedTenantState(tenantId));
-      cachedTenantState.getTenantUsers().add(userAccessIdPair);
-
-      final OzoneTenantRolePrincipal roleTenantAllUsers =
-          OzoneTenantRolePrincipal.getUserRole(tenantId);
-      String roleJsonStr = authorizer.getRole(roleTenantAllUsers);
-      String roleId = authorizer.assignUser(principal, roleJsonStr, false);
+      tenantCacheLock.writeLock().lock();
+
+      CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+      Preconditions.checkNotNull(cachedTenantState,
+          "Cache entry for tenant '" + tenantId + "' does not exist");
+
+      LOG.info("Adding user '{}' access ID '{}' to tenant '{}' in-memory cache",
+          principal.getName(), accessId, tenantId);
+      cachedTenantState.getAccessIdInfoMap().put(accessId, cacheEntry);
+
+      final String tenantUserRoleName =
+          tenantCache.get(tenantId).getTenantUserRoleName();
+      final OzoneTenantRolePrincipal tenantUserRolePrincipal =
+          new OzoneTenantRolePrincipal(tenantUserRoleName);
+      String roleJsonStr = authorizer.getRole(tenantUserRolePrincipal);
+      final String roleId =
+          authorizer.assignUserToRole(principal, roleJsonStr, false);
       return roleId;
-    } catch (Exception e) {
+    } catch (IOException e) {
+      // Clean up
       revokeUserAccessId(accessId);
-      tenantCache.get(tenantId).getTenantUsers().remove(userAccessIdPair);
+      tenantCache.get(tenantId).getAccessIdInfoMap().remove(accessId);
+
       throw new OMException(e.getMessage(), TENANT_AUTHORIZER_ERROR);
     } finally {
-      controlPathLock.writeLock().unlock();
+      tenantCacheLock.writeLock().unlock();
     }
   }
 
   @Override
-  public void revokeUserAccessId(String accessID) throws IOException {
+  public void revokeUserAccessId(String accessId) throws IOException {
     try {
-      controlPathLock.writeLock().lock();
-      OmDBAccessIdInfo omDBAccessIdInfo =
-          omMetadataManager.getTenantAccessIdTable().get(accessID);
+      tenantCacheLock.writeLock().lock();
+      final OmDBAccessIdInfo omDBAccessIdInfo =
+          omMetadataManager.getTenantAccessIdTable().get(accessId);
       if (omDBAccessIdInfo == null) {
         throw new OMException(INVALID_ACCESS_ID);
       }
-      String tenantId = omDBAccessIdInfo.getTenantId();
+      final String tenantId = omDBAccessIdInfo.getTenantId();
       if (tenantId == null) {
         LOG.error("Tenant doesn't exist");
         return;
       }
-      tenantCache.get(tenantId).getTenantUsers()
-          .remove(new ImmutablePair<>(omDBAccessIdInfo.getUserPrincipal(),
-              accessID));
-      // TODO: Determine how to replace this code.
-//      final String userID = authorizer.getUserId(userPrincipal);
-//      authorizer.deleteUser(userID);
 
+      final BasicUserPrincipal principal =
+          new BasicUserPrincipal(omDBAccessIdInfo.getUserPrincipal());
+
+      LOG.info("Removing user '{}' access ID '{}' from tenant '{}' in-memory "
+              + "cache",
+          principal.getName(), accessId, tenantId);
+      tenantCache.get(tenantId).getAccessIdInfoMap().remove(accessId);
+
+      // Delete user from role in Ranger
+      final String tenantUserRoleName =
+          tenantCache.get(tenantId).getTenantUserRoleName();
+      final OzoneTenantRolePrincipal tenantUserRolePrincipal =
+          new OzoneTenantRolePrincipal(tenantUserRoleName);
+      String roleJsonStr = authorizer.getRole(tenantUserRolePrincipal);
+      final String roleId =
+          authorizer.revokeUserFromRole(principal, roleJsonStr);
+      Preconditions.checkNotNull(roleId);
     } finally {
-      controlPathLock.writeLock().unlock();
+      tenantCacheLock.writeLock().unlock();
     }
   }
 
@@ -318,12 +406,15 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
    */
   public void removeUserAccessIdFromCache(String accessId, String userPrincipal,
                                           String tenantId) {
+
+    tenantCacheLock.writeLock().lock();
     try {
-      tenantCache.get(tenantId).getTenantUsers().remove(
-          new ImmutablePair<>(userPrincipal, accessId));
+      tenantCache.get(tenantId).getAccessIdInfoMap().remove(accessId);
     } catch (NullPointerException e) {
       // tenantCache is somehow empty. Ignore for now.
-      // But how?
+      LOG.warn("Exception when removing accessId from cache", e);
+    } finally {
+      tenantCacheLock.writeLock().unlock();
     }
   }
 
@@ -331,7 +422,7 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
   public String getUserNameGivenAccessId(String accessId) {
     Preconditions.checkNotNull(accessId);
     try {
-      controlPathLock.readLock().lock();
+      tenantCacheLock.readLock().lock();
       OmDBAccessIdInfo omDBAccessIdInfo =
           omMetadataManager.getTenantAccessIdTable().get(accessId);
       if (omDBAccessIdInfo != null) {
@@ -343,7 +434,7 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
       LOG.error("Unexpected error while obtaining DB Access Info for {}",
           accessId, ioEx);
     } finally {
-      controlPathLock.readLock().unlock();
+      tenantCacheLock.readLock().unlock();
     }
     return null;
   }
@@ -356,10 +447,8 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     if (callerUgi == null) {
       return false;
     } else {
-      return isTenantAdmin(
-              callerUgi.getShortUserName(), tenantId, delegated)
-          || isTenantAdmin(
-              callerUgi.getUserName(), tenantId, delegated)
+      return isTenantAdmin(callerUgi.getShortUserName(), tenantId, delegated)
+          || isTenantAdmin(callerUgi.getUserName(), tenantId, delegated)
           || ozoneManager.isAdmin(callerUgi.getShortUserName())
           || ozoneManager.isAdmin(callerUgi.getUserName());
     }
@@ -412,26 +501,40 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
   public TenantUserList listUsersInTenant(String tenantID, String prefix)
       throws IOException {
 
-    if (!omMetadataManager.getTenantStateTable().isExist(tenantID)) {
-      throw new IOException("Tenant '" + tenantID + "' not found!");
-    }
-
     List<UserAccessIdInfo> userAccessIds = new ArrayList<>();
-    CachedTenantState cachedTenantState = tenantCache.get(tenantID);
-    if (cachedTenantState == null) {
-      throw new IOException("Inconsistent in memory Tenant cache '" + tenantID
-          + "' not found in cache, but present in OM DB!");
-    }
 
-    cachedTenantState.getTenantUsers().stream()
-        .filter(
-            k -> StringUtils.isEmpty(prefix) || k.getKey().startsWith(prefix))
-        .forEach(
-            k -> userAccessIds.add(
+    tenantCacheLock.readLock().lock();
+
+    try {
+      if (!omMetadataManager.getTenantStateTable().isExist(tenantID)) {
+        throw new IOException("Tenant '" + tenantID + "' not found!");
+      }
+
+      CachedTenantState cachedTenantState = tenantCache.get(tenantID);
+
+      if (cachedTenantState == null) {
+        throw new IOException("Inconsistent in memory Tenant cache '" + tenantID
+            + "' not found in cache, but present in OM DB!");
+      }
+
+      cachedTenantState.getAccessIdInfoMap().entrySet().stream()
+          .filter(
+              // Include if user principal matches the prefix
+              k -> StringUtils.isEmpty(prefix) ||
+                  k.getValue().getUserPrincipal().startsWith(prefix))
+          .forEach(k -> {
+            final String accessId = k.getKey();
+            final CachedAccessIdInfo cacheEntry = k.getValue();
+            userAccessIds.add(
                 UserAccessIdInfo.newBuilder()
-                    .setUserPrincipal(k.getKey())
-                    .setAccessId(k.getValue())
-                    .build()));
+                    .setUserPrincipal(cacheEntry.getUserPrincipal())
+                    .setAccessId(accessId)
+                    .build());
+          });
+
+    } finally {
+      tenantCacheLock.readLock().unlock();
+    }
 
     return new TenantUserList(userAccessIds);
   }
@@ -448,43 +551,78 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
   }
 
   @Override
-  public void assignTenantAdmin(String accessID, boolean delegated)
+  public void assignTenantAdmin(String accessId, boolean delegated)
       throws IOException {
     try {
-      controlPathLock.writeLock().lock();
+      tenantCacheLock.writeLock().lock();
+
       // tenantId (tenant name) is necessary to retrieve role name
-      Optional<String> optionalTenant = getTenantForAccessID(accessID);
+      Optional<String> optionalTenant = getTenantForAccessID(accessId);
       if (!optionalTenant.isPresent()) {
-        throw new OMException("No tenant found for access ID " + accessID,
+        throw new OMException("No tenant found for access ID " + accessId,
             INVALID_ACCESS_ID);
       }
       final String tenantId = optionalTenant.get();
 
+      final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+      final String tenantAdminRoleName =
+          cachedTenantState.getTenantAdminRoleName();
       final OzoneTenantRolePrincipal existingAdminRole =
-          OzoneTenantRolePrincipal.getAdminRole(tenantId);
+          new OzoneTenantRolePrincipal(tenantAdminRoleName);
+
       final String roleJsonStr = authorizer.getRole(existingAdminRole);
-      final String userPrincipal = getUserNameGivenAccessId(accessID);
+      final String userPrincipal = getUserNameGivenAccessId(accessId);
       // Add user principal (not accessId!) to the role
-      final String roleId = authorizer.assignUser(
+      final String roleId = authorizer.assignUserToRole(
           new BasicUserPrincipal(userPrincipal), roleJsonStr, delegated);
       assert (roleId != null);
 
-      // TODO: update some in-memory mappings?
+      // Update cache
+      cachedTenantState.getAccessIdInfoMap().get(accessId).setIsAdmin(true);
 
     } catch (IOException e) {
-      revokeTenantAdmin(accessID);
+      revokeTenantAdmin(accessId);
       throw e;
     } finally {
-      controlPathLock.writeLock().unlock();
+      tenantCacheLock.writeLock().unlock();
     }
   }
 
   @Override
-  public void revokeTenantAdmin(String accessID) throws IOException {
+  public void revokeTenantAdmin(String accessId) throws IOException {
+    try {
+      tenantCacheLock.writeLock().lock();
+
+      // tenantId (tenant name) is necessary to retrieve role name
+      Optional<String> optionalTenant = getTenantForAccessID(accessId);
+      if (!optionalTenant.isPresent()) {
+        throw new OMException("No tenant found for access ID " + accessId,
+            INVALID_ACCESS_ID);
+      }
+      final String tenantId = optionalTenant.get();
+
+      final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+      final String tenantAdminRoleName =
+          cachedTenantState.getTenantAdminRoleName();
+      final OzoneTenantRolePrincipal existingAdminRole =
+          new OzoneTenantRolePrincipal(tenantAdminRoleName);
+
+      final String roleJsonStr = authorizer.getRole(existingAdminRole);
+      final String userPrincipal = getUserNameGivenAccessId(accessId);
+      // Add user principal (not accessId!) to the role
+      final String roleId = authorizer.revokeUserFromRole(
+          new BasicUserPrincipal(userPrincipal), roleJsonStr);
+      assert (roleId != null);
 
+      // Update cache
+      cachedTenantState.getAccessIdInfoMap().get(accessId).setIsAdmin(false);
+
+    } finally {
+      tenantCacheLock.writeLock().unlock();
+    }
   }
 
-  private AccessPolicy newDefaultVolumeAccessPolicy(String tenantId,
+  public AccessPolicy newDefaultVolumeAccessPolicy(String tenantId,
       OzoneTenantRolePrincipal userRole, OzoneTenantRolePrincipal adminRole)
       throws IOException {
 
@@ -503,7 +641,7 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     return policy;
   }
 
-  private AccessPolicy newDefaultBucketAccessPolicy(String tenantId,
+  public AccessPolicy newDefaultBucketAccessPolicy(String tenantId,
       OzoneTenantRolePrincipal userRole) throws IOException {
 
     final String bucketAccessPolicyName =
@@ -541,35 +679,53 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     return conf;
   }
 
-  public void loadUsersFromDB() {
-    Table<String, OmDBAccessIdInfo> tenantAccessIdTable =
+  public void loadTenantCacheFromDB() {
+    final Table<String, OmDBAccessIdInfo> tenantAccessIdTable =
         omMetadataManager.getTenantAccessIdTable();
-    TableIterator<String, ? extends KeyValue<String, OmDBAccessIdInfo>>
-        iterator = tenantAccessIdTable.iterator();
+    final TableIterator<String, ? extends KeyValue<String, OmDBAccessIdInfo>>
+        accessIdTableIter = tenantAccessIdTable.iterator();
     int userCount = 0;
 
+    final Table<String, OmDBTenantState> tenantStateTable =
+        omMetadataManager.getTenantStateTable();
+
     try {
-      while (iterator.hasNext()) {
-        KeyValue<String, OmDBAccessIdInfo> next = iterator.next();
-        String accessId = next.getKey();
-        OmDBAccessIdInfo value = next.getValue();
-        String tenantId = value.getTenantId();
-        String user = value.getUserPrincipal();
-
-        CachedTenantState cachedTenantState = tenantCache
-            .computeIfAbsent(tenantId, k -> new CachedTenantState(tenantId));
-        cachedTenantState.getTenantUsers().add(
-            new ImmutablePair<>(user, accessId));
+      while (accessIdTableIter.hasNext()) {
+        final KeyValue<String, OmDBAccessIdInfo> next =
+            accessIdTableIter.next();
+
+        final String accessId = next.getKey();
+        final OmDBAccessIdInfo value = next.getValue();
+
+        final String tenantId = value.getTenantId();
+        final String userPrincipal = value.getUserPrincipal();
+        final boolean isAdmin = value.getIsAdmin();
+
+        final OmDBTenantState tenantState = tenantStateTable.get(tenantId);
+        // If the TenantState doesn't exist, it means the accessId entry is
+        //  orphaned or incorrect, likely metadata inconsistency
+        Preconditions.checkNotNull(tenantState,
+            "OmDBTenantState should have existed for " + tenantId);
+
+        final String tenantUserRoleName = tenantState.getUserRoleName();
+        final String tenantAdminRoleName = tenantState.getAdminRoleName();
+
+        // Enter tenant cache entry when it is the first hit for this tenant
+        final CachedTenantState cachedTenantState = tenantCache.computeIfAbsent(
+            tenantId, k -> new CachedTenantState(
+                tenantId, tenantUserRoleName, tenantAdminRoleName));
+
+        cachedTenantState.getAccessIdInfoMap().put(accessId,
+            new CachedAccessIdInfo(userPrincipal, isAdmin));
         userCount++;
       }
-      LOG.info("Loaded {} tenants and {} tenant-users from the database.",
+      LOG.info("Loaded {} tenants and {} tenant users from the database",
           tenantCache.size(), userCount);
-    } catch (Exception ex) {
-      LOG.error("Error while loading user list. ", ex);
+    } catch (IOException ex) {
+      LOG.error("Error while loading user list", ex);
     }
   }
 
-
   @Override
   public void checkAdmin() throws OMException {
 
@@ -617,6 +773,9 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
   @Override
   public String getTenantVolumeName(String tenantId) throws IOException {
 
+    // TODO: lock here?
+    // tenantCacheLock.readLock().lock();
+
     final OmDBTenantState tenantState =
         omMetadataManager.getTenantStateTable().get(tenantId);
 
@@ -637,6 +796,44 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     return volumeName;
   }
 
+  @Override
+  public String getTenantUserRoleName(String tenantId) throws IOException {
+
+    tenantCacheLock.readLock().lock();
+
+    try {
+      final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+
+      if (cachedTenantState == null) {
+        throw new OMException("Tenant not found in cache: " + tenantId,
+            TENANT_NOT_FOUND);
+      }
+
+      return cachedTenantState.getTenantUserRoleName();
+    } finally {
+      tenantCacheLock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public String getTenantAdminRoleName(String tenantId) throws IOException {
+
+    tenantCacheLock.readLock().lock();
+
+    try {
+      final CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+
+      if (cachedTenantState == null) {
+        throw new OMException("Tenant not found in cache: " + tenantId,
+            TENANT_NOT_FOUND);
+      }
+
+      return cachedTenantState.getTenantAdminRoleName();
+    } finally {
+      tenantCacheLock.readLock().unlock();
+    }
+  }
+
   @Override
   public boolean isUserAccessIdPrincipalOrTenantAdmin(String accessId,
       UserGroupInformation ugi) throws IOException {
@@ -683,7 +880,6 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
   @Override
   public boolean isTenantEmpty(String tenantId) throws IOException {
 
-
     if (!tenantCache.containsKey(tenantId)) {
       throw new OMException("Tenant does not exist for tenantId: " + tenantId,
           TENANT_NOT_FOUND);
@@ -692,8 +888,65 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
     return tenantCache.get(tenantId).isTenantEmpty();
   }
 
-  @VisibleForTesting
-  Map<String, CachedTenantState> getTenantCache() {
+  public Map<String, CachedTenantState> getTenantCache() {
     return tenantCache;
   }
+
+  /**
+   * Generate and return a mapping from roles to a set of user principals from
+   * tenantCache.
+   */
+  public HashMap<String, HashSet<String>> getAllRolesFromCache() {
+    final HashMap<String, HashSet<String>> mtRoles = new HashMap<>();
+
+    tenantCacheLock.readLock().lock();
+
+    try {
+      // tenantCache: tenantId -> CachedTenantState
+      for (Map.Entry<String, CachedTenantState> e1 : tenantCache.entrySet()) {
+        final CachedTenantState cachedTenantState = e1.getValue();
+
+        final String userRoleName = cachedTenantState.getTenantUserRoleName();
+        mtRoles.computeIfAbsent(userRoleName, any -> new HashSet<>());
+        final String adminRoleName = cachedTenantState.getTenantAdminRoleName();
+        mtRoles.computeIfAbsent(adminRoleName, any -> new HashSet<>());
+
+        final Map<String, CachedAccessIdInfo> accessIdInfoMap =
+            cachedTenantState.getAccessIdInfoMap();
+
+        // accessIdInfoMap: accessId -> CachedAccessIdInfo
+        for (Map.Entry<String, CachedAccessIdInfo> e2 :
+            accessIdInfoMap.entrySet()) {
+          final CachedAccessIdInfo cachedAccessIdInfo = e2.getValue();
+
+          final String userPrincipal = cachedAccessIdInfo.getUserPrincipal();
+          final boolean isAdmin = cachedAccessIdInfo.getIsAdmin();
+
+          addUserToMtRoles(mtRoles, userRoleName, userPrincipal);
+
+          if (isAdmin) {
+            addUserToMtRoles(mtRoles, adminRoleName, userPrincipal);
+          }
+        }
+      }
+    } finally {
+      tenantCacheLock.readLock().unlock();
+    }
+
+    return mtRoles;
+  }
+
+  /**
+   * Helper function to add user principal to a role in mtRoles.
+   */
+  private void addUserToMtRoles(HashMap<String, HashSet<String>> mtRoles,
+      String roleName, String userPrincipal) {
+    if (!mtRoles.containsKey(roleName)) {
+      mtRoles.put(roleName, new HashSet<>(
+          Collections.singletonList(userPrincipal)));
+    } else {
+      final HashSet<String> usersInTheRole = mtRoles.get(roleName);
+      usersInTheRole.add(userPrincipal);
+    }
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index dde0fcc59d..fcaa3a1932 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -141,6 +141,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
    * | principalToAccessIdsTable | userPrincipal -> OmDBUserPrincipalInfo   |
    * |----------------------------------------------------------------------|
    *
+   *
    * Simple Tables:
    * |----------------------------------------------------------------------|
    * |  Column Family     |        VALUE                                    |
@@ -1366,8 +1367,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   }
 
   @Override
-  public Table<String, OmDBUserPrincipalInfo>
-      getPrincipalToAccessIdsTable() {
+  public Table<String, OmDBUserPrincipalInfo> getPrincipalToAccessIdsTable() {
     return principalToAccessIdsTable;
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index c8e1ed5a29..d2848748a1 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -3594,6 +3594,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   void stopServices() throws Exception {
     keyManager.stop();
     stopSecretManager();
+    if (multiTenantManager != null) {
+      multiTenantManager.stop();
+    }
     metadataManager.stop();
     stopTrashEmptier();
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/CachedTenantState.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/CachedTenantState.java
index 66f1c6ba73..ee19930d69 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/CachedTenantState.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/CachedTenantState.java
@@ -17,33 +17,72 @@
 
 package org.apache.hadoop.ozone.om.multitenant;
 
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.commons.lang3.tuple.Pair;
+import java.util.HashMap;
 
 /**
  * A collection of things that we want to maintain about a tenant in memory.
  */
 public class CachedTenantState {
 
-  private String tenantId;
-  private Set<Pair<String, String>> tenantUserAccessIds;
+  private final String tenantId;
+  private final String tenantUserRoleName;
+  private final String tenantAdminRoleName;
+  // accessId -> userPrincipal and isAdmin flag
+  private final HashMap<String, CachedAccessIdInfo> accessIdInfoMap;
 
-  public CachedTenantState(String tenantId) {
-    this.tenantId = tenantId;
-    tenantUserAccessIds = new HashSet<>();
+  public String getTenantUserRoleName() {
+    return tenantUserRoleName;
   }
 
-  public Set<Pair<String, String>> getTenantUsers() {
-    return tenantUserAccessIds;
+  public String getTenantAdminRoleName() {
+    return tenantAdminRoleName;
+  }
+
+  /**
+   * Stores cached Access ID info.
+   */
+  public static class CachedAccessIdInfo {
+    private final String userPrincipal;
+    /**
+     * Stores if the accessId is a tenant admin (either delegated or not).
+     */
+    private boolean isAdmin;
+
+    public CachedAccessIdInfo(String userPrincipal, boolean isAdmin) {
+      this.userPrincipal = userPrincipal;
+      this.isAdmin = isAdmin;
+    }
+
+    public String getUserPrincipal() {
+      return userPrincipal;
+    }
+
+    public void setIsAdmin(boolean isAdmin) {
+      this.isAdmin = isAdmin;
+    }
+
+    public boolean getIsAdmin() {
+      return isAdmin;
+    }
+  }
+
+  public CachedTenantState(String tenantId,
+      String tenantUserRoleName, String tenantAdminRoleName) {
+    this.tenantId = tenantId;
+    this.tenantUserRoleName = tenantUserRoleName;
+    this.tenantAdminRoleName = tenantAdminRoleName;
+    this.accessIdInfoMap = new HashMap<>();
   }
 
   public String getTenantId() {
     return tenantId;
   }
 
+  public HashMap<String, CachedAccessIdInfo> getAccessIdInfoMap() {
+    return accessIdInfoMap;
+  }
+
   public boolean isTenantEmpty() {
-    return tenantUserAccessIds.size() == 0;
+    return accessIdInfoMap.isEmpty();
   }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java
similarity index 75%
rename from hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java
rename to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java
index 72c41edcd6..fc2d652711 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizer.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone.om.multitenant;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -29,7 +30,6 @@ import org.apache.hadoop.ozone.security.acl.IOzoneObj;
 import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.http.auth.BasicUserPrincipal;
 
-
 /**
  * Public API for Ozone MultiTenant Gatekeeper. Security providers providing
  * support for Ozone MultiTenancy should implement this.
@@ -44,13 +44,13 @@ public interface MultiTenantAccessAuthorizer extends IAccessAuthorizer {
    * @param configuration
    * @throws IOException
    */
-  void init(Configuration configuration) throws IOException;;
+  void init(Configuration configuration) throws IOException;
 
   /**
    * Shutdown for the MultiTenantGateKeeper.
-   * @throws Exception
+   * @throws IOException
    */
-  void shutdown() throws Exception;
+  void shutdown() throws IOException;
 
   /**
    * Assign user to an existing role in the Authorizer.
@@ -62,9 +62,28 @@ public interface MultiTenantAccessAuthorizer extends IAccessAuthorizer {
    * MultiTenantGateKeeperplugin Implementation. E.g. a Ranger
    * based Implementation can return some ID thats relevant for it.
    */
-  String assignUser(BasicUserPrincipal principal, String existingRole,
+  String assignUserToRole(BasicUserPrincipal principal, String existingRole,
       boolean isAdmin) throws IOException;
 
+  /**
+   * Update the exising role details and push the changes to Ranger.
+   *
+   * @param principal contains user name, must be an existing user in Ranger.
+   * @param existingRole An existing role's JSON response String from Ranger.
+   * @return roleId (not useful for now)
+   * @throws IOException
+   */
+  String revokeUserFromRole(BasicUserPrincipal principal,
+                                   String existingRole) throws IOException;
+
+  /**
+   * Assign all the users to an existing role.
+   * @param users list of user principals
+   * @param existingRole roleName
+   */
+  String assignAllUsers(HashSet<String> users,
+                               String existingRole) throws IOException;
+
   /**
    * @param principal
    * @return Unique userID maintained by the authorizer plugin.
@@ -80,6 +99,15 @@ public interface MultiTenantAccessAuthorizer extends IAccessAuthorizer {
   String getRole(OzoneTenantRolePrincipal principal)
       throws IOException;
 
+  /**
+   * Returs the details of a role, given the rolename.
+   * @param roleName
+   * @return
+   * @throws IOException
+   */
+  String getRole(String roleName)
+      throws IOException;
+
   /**
    * Delete the user userID in MultiTenantGateKeeper plugin.
    * @param opaqueUserID : unique ID that was returned by
@@ -97,7 +125,17 @@ public interface MultiTenantAccessAuthorizer extends IAccessAuthorizer {
    * MultiTenantGateKeeper plugin Implementation e.g. corresponding ID on the
    * Ranger end for a ranger based implementation .
    */
-  String createRole(OzoneTenantRolePrincipal role, String adminRoleName)
+  String createRole(String role, String adminRoleName)
+      throws IOException;
+
+  /**
+   * Creates a new user.
+   * @param userName
+   * @param password
+   * @return
+   * @throws IOException
+   */
+  String createUser(String userName, String password)
       throws IOException;
 
   /**
@@ -111,31 +149,39 @@ public interface MultiTenantAccessAuthorizer extends IAccessAuthorizer {
    *
    * @param policy
    * @return unique and opaque policy ID that is maintained by the plugin.
-   * @throws Exception
+   * @throws IOException
    */
-  String createAccessPolicy(AccessPolicy policy) throws Exception;
+  String createAccessPolicy(AccessPolicy policy) throws IOException;
 
   /**
    *
    * @param policyName
    * @return unique and opaque policy ID that is maintained by the plugin.
-   * @throws Exception
+   * @throws IOException
    */
-  AccessPolicy getAccessPolicyByName(String policyName) throws Exception;
+  AccessPolicy getAccessPolicyByName(String policyName) throws IOException;
+
+  /**
+   * given a policy Id, returs the policy.
+   * @param policyId
+   * @return
+   * @throws IOException
+   */
+  AccessPolicy getAccessPolicyById(String policyId) throws IOException;
 
   /**
    *
    * @param policyId that was returned earlier by the createAccessPolicy().
-   * @throws Exception
+   * @throws IOException
    */
-  void deletePolicybyId(String policyId) throws IOException;
+  void deletePolicyById(String policyId) throws IOException;
 
   /**
    *
    * @param policyName unique policyName.
-   * @throws Exception
+   * @throws IOException
    */
-  void deletePolicybyName(String policyName) throws Exception;
+  void deletePolicyByName(String policyName) throws IOException;
   /**
    * Grant user aclType access to bucketNameSpace.
    * @param bucketNameSpace
@@ -211,4 +257,10 @@ public interface MultiTenantAccessAuthorizer extends IAccessAuthorizer {
   @Override
   boolean checkAccess(IOzoneObj ozoneObject, RequestContext context)
       throws OMException;
+
+  long getLatestOzoneServiceVersion() throws IOException;
+
+  String getAllMultiTenantPolicies() throws IOException;
+
+  MultiTenantAccessController getMultiTenantAccessController();
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java
similarity index 69%
rename from hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java
rename to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java
index f73f05003c..cb6e198993 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerDummyPlugin.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.security.acl.RequestContext;
 import org.apache.http.auth.BasicUserPrincipal;
 
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
 
 /**
@@ -40,14 +41,26 @@ public class MultiTenantAccessAuthorizerDummyPlugin implements
   }
 
   @Override
-  public void shutdown() throws Exception {
+  public void shutdown() throws IOException {
 
   }
 
   @Override
-  public String assignUser(BasicUserPrincipal principal, String existingRole,
-      boolean isAdmin) throws IOException {
-    return "assignUser-roleName-returned";
+  public String assignUserToRole(BasicUserPrincipal principal,
+      String existingRole, boolean isAdmin) {
+    return "roleId";
+  }
+
+  @Override
+  public String revokeUserFromRole(BasicUserPrincipal principal,
+      String existingRole) {
+    return "roleId";
+  }
+
+  @Override
+  public String assignAllUsers(HashSet<String> users, String existingRole)
+      throws IOException {
+    return null;
   }
 
   @Override
@@ -60,13 +73,24 @@ public class MultiTenantAccessAuthorizerDummyPlugin implements
     return null;
   }
 
+  @Override
+  public String getRole(String roleName) throws IOException {
+    return null;
+  }
+
   @Override
   public void deleteUser(String opaqueUserID) throws IOException {
 
   }
 
   @Override
-  public String createRole(OzoneTenantRolePrincipal role, String adminRoleName)
+  public String createRole(String role, String adminRoleName)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public String createUser(String userName, String password)
       throws IOException {
     return null;
   }
@@ -77,23 +101,27 @@ public class MultiTenantAccessAuthorizerDummyPlugin implements
   }
 
   @Override
-  public String createAccessPolicy(AccessPolicy policy) throws Exception {
+  public String createAccessPolicy(AccessPolicy policy) throws IOException {
+    return null;
+  }
+
+  @Override
+  public AccessPolicy getAccessPolicyByName(String policyName) {
     return null;
   }
 
   @Override
-  public AccessPolicy getAccessPolicyByName(String policyName)
-      throws Exception {
+  public AccessPolicy getAccessPolicyById(String policyName) {
     return null;
   }
 
   @Override
-  public void deletePolicybyId(String policyId) throws IOException {
+  public void deletePolicyById(String policyId) throws IOException {
 
   }
 
   @Override
-  public void deletePolicybyName(String policyName) throws Exception {
+  public void deletePolicyByName(String policyName) throws IOException {
 
   }
 
@@ -144,4 +172,19 @@ public class MultiTenantAccessAuthorizerDummyPlugin implements
       throws OMException {
     return false;
   }
+
+  @Override
+  public long getLatestOzoneServiceVersion() {
+    return -1;
+  }
+
+  @Override
+  public String getAllMultiTenantPolicies() {
+    return null;
+  }
+
+  @Override
+  public MultiTenantAccessController getMultiTenantAccessController() {
+    return null;
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
similarity index 53%
rename from hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
rename to hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
index f4d5952cc0..0e4d37b8a1 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessAuthorizerRangerPlugin.java
@@ -20,13 +20,19 @@ package org.apache.hadoop.ozone.om.multitenant;
 import static java.net.HttpURLConnection.HTTP_OK;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_CREATE_POLICY_HTTP_ENDPOINT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_CREATE_ROLE_HTTP_ENDPOINT;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_DELETE_GROUP_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_CREATE_USER_HTTP_ENDPOINT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_DELETE_POLICY_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_DELETE_ROLE_HTTP_ENDPOINT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_DELETE_USER_HTTP_ENDPOINT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_POLICY_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_POLICY_ID_HTTP_ENDPOINT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_GET_USER_HTTP_ENDPOINT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ADMIN_ROLE_ADD_USER_HTTP_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_ALL_POLICIES_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_DOWNLOAD_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OM_RANGER_OZONE_SERVICE_ENDPOINT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_TENANT_RANGER_POLICY_LABEL;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
@@ -41,8 +47,10 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
+import java.net.HttpURLConnection;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -74,18 +82,37 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
   public static final Logger LOG = LoggerFactory
       .getLogger(MultiTenantAccessAuthorizerRangerPlugin.class);
 
+  private MultiTenantAccessController accessController;
+
   private OzoneConfiguration conf;
-  private boolean ignoreServerCert = false;
+  private boolean ignoreServerCert = true;
   private int connectionTimeout;
   private int connectionRequestTimeout;
   private String authHeaderValue;
   private String rangerHttpsAddress;
+  // Stores Ranger cm_ozone service ID. This value should not change (unless
+  // somehow Ranger cm_ozone service is deleted and re-created while OM is
+  // still running and not reloaded / restarted).
+  private int rangerOzoneServiceId;
 
   @Override
   public void init(Configuration configuration) throws IOException {
     conf = new OzoneConfiguration(configuration);
+    accessController = new RangerRestMultiTenantAccessController(conf);
     rangerHttpsAddress = conf.get(OZONE_RANGER_HTTPS_ADDRESS_KEY);
+    if (rangerHttpsAddress == null) {
+      throw new OMException("Config ozone.om.ranger.https-address is not set! "
+          + "Multi-Tenancy feature requires Apache Ranger to function properly",
+          OMException.ResultCodes.INTERNAL_ERROR);
+    }
     initializeRangerConnection();
+
+    // Get Ranger Ozone service ID
+    rangerOzoneServiceId = retrieveRangerOzoneServiceId();
+  }
+
+  int getRangerOzoneServiceId() {
+    return rangerOzoneServiceId;
   }
 
   private void initializeRangerConnection() {
@@ -152,7 +179,7 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
 
 
   @Override
-  public void shutdown() throws Exception {
+  public void shutdown() throws IOException {
     // TBD
   }
 
@@ -214,7 +241,18 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
         rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT +
             principal.getName();
 
-    HttpsURLConnection conn = makeHttpsGetCall(endpointUrl, "GET", false);
+    HttpURLConnection conn = makeHttpGetCall(endpointUrl, "GET", false);
+    return getResponseData(conn);
+  }
+
+  @Override
+  public String getRole(String roleName) throws IOException {
+
+    String endpointUrl =
+        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_ROLE_HTTP_ENDPOINT +
+            roleName;
+
+    HttpURLConnection conn = makeHttpGetCall(endpointUrl, "GET", false);
     return getResponseData(conn);
   }
 
@@ -224,7 +262,7 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
         rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_USER_HTTP_ENDPOINT +
         principal.getName();
 
-    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl,
+    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl,
         "GET", false);
     String response = getResponseData(conn);
     String userIDCreated = null;
@@ -248,6 +286,60 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
     return userIDCreated;
   }
 
+  /**
+   * Update the exising role details and push the changes to Ranger.
+   *
+   * @param principal contains user name, must be an existing user in Ranger.
+   * @param existingRole An existing role's JSON response String from Ranger.
+   * @return roleId (not useful for now)
+   * @throws IOException
+   */
+  @Override
+  public String revokeUserFromRole(BasicUserPrincipal principal,
+      String existingRole) throws IOException {
+    JsonObject roleObj = new JsonParser().parse(existingRole).getAsJsonObject();
+    // Parse Json
+    final String roleId = roleObj.get("id").getAsString();
+    LOG.debug("Got roleId: {}", roleId);
+
+    JsonArray oldUsersArray = roleObj.getAsJsonArray("users");
+    JsonArray newUsersArray = new JsonArray();
+
+    for (int i = 0; i < oldUsersArray.size(); ++i) {
+      JsonObject newUserEntry = oldUsersArray.get(i).getAsJsonObject();
+      if (!newUserEntry.get("name").getAsString().equals(principal.getName())) {
+        newUsersArray.add(newUserEntry);
+      }
+      // Update Json array
+    }
+    roleObj.add("users", newUsersArray);
+
+    LOG.debug("Updated: {}", roleObj);
+
+    final String endpointUrl = rangerHttpsAddress +
+        OZONE_OM_RANGER_ADMIN_ROLE_ADD_USER_HTTP_ENDPOINT + roleId;
+    final String jsonData = roleObj.toString();
+
+    HttpURLConnection conn =
+        makeHttpCall(endpointUrl, jsonData, "PUT", false);
+    if (conn.getResponseCode() != HTTP_OK) {
+      throw new IOException("Ranger REST API failure: " + conn.getResponseCode()
+          + " " + conn.getResponseMessage()
+          + ". Error updating Ranger role.");
+    }
+    String resp = getResponseData(conn);
+    String returnedRoleId;
+    try {
+      JsonObject jObject = new JsonParser().parse(resp).getAsJsonObject();
+      returnedRoleId = jObject.get("id").getAsString();
+      LOG.debug("Ranger returns roleId: {}", roleId);
+    } catch (JsonParseException e) {
+      e.printStackTrace();
+      throw e;
+    }
+    return returnedRoleId;
+  }
+
   /**
    * Update the exising role details and push the changes to Ranger.
    *
@@ -257,8 +349,8 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
    * @return roleId (not useful for now)
    * @throws IOException
    */
-  public String assignUser(BasicUserPrincipal principal, String existingRole,
-      boolean isAdmin) throws IOException {
+  public String assignUserToRole(BasicUserPrincipal principal,
+      String existingRole, boolean isAdmin) throws IOException {
 
     JsonObject roleObj = new JsonParser().parse(existingRole).getAsJsonObject();
     // Parse Json
@@ -279,7 +371,61 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
         OZONE_OM_RANGER_ADMIN_ROLE_ADD_USER_HTTP_ENDPOINT + roleId;
     final String jsonData = roleObj.toString();
 
-    HttpsURLConnection conn =
+    HttpURLConnection conn =
+        makeHttpCall(endpointUrl, jsonData, "PUT", false);
+    if (conn.getResponseCode() != HTTP_OK) {
+      throw new IOException("Ranger REST API failure: " + conn.getResponseCode()
+          + " " + conn.getResponseMessage()
+          + ". Error updating Ranger role.");
+    }
+    String resp = getResponseData(conn);
+    String returnedRoleId;
+    try {
+      JsonObject jObject = new JsonParser().parse(resp).getAsJsonObject();
+      returnedRoleId = jObject.get("id").getAsString();
+      LOG.debug("Ranger returns roleId: {}", roleId);
+    } catch (JsonParseException e) {
+      e.printStackTrace();
+      throw e;
+    }
+    return returnedRoleId;
+  }
+
+  /**
+   * Update the exising role details and push the changes to Ranger.
+   *
+   * @param users must be existing users in Ranger.
+   * @param existingRole An existing role's JSON response String from Ranger.
+   * @return roleId (not useful for now)
+   * @throws IOException
+   */
+  @Override
+  public String assignAllUsers(HashSet<String> users,
+                               String existingRole) throws IOException {
+
+    JsonObject roleObj = new JsonParser().parse(existingRole).getAsJsonObject();
+    // Parse Json
+    final String roleId = roleObj.get("id").getAsString();
+    LOG.debug("Got roleId: {}", roleId);
+
+    JsonArray usersArray = new JsonArray();
+    for (String user: users) {
+      JsonObject newUserEntry = new JsonObject();
+      newUserEntry.addProperty("name", user);
+      newUserEntry.addProperty("isAdmin", false);
+      usersArray.add(newUserEntry);
+    }
+    // Update Json array
+    roleObj.remove("users"); // remove the old users
+    roleObj.add("users", usersArray);
+
+    LOG.debug("Updated: {}", roleObj);
+
+    final String endpointUrl = rangerHttpsAddress +
+        OZONE_OM_RANGER_ADMIN_ROLE_ADD_USER_HTTP_ENDPOINT + roleId;
+    final String jsonData = roleObj.toString();
+
+    HttpURLConnection conn =
         makeHttpCall(endpointUrl, jsonData, "PUT", false);
     if (conn.getResponseCode() != HTTP_OK) {
       throw new IOException("Ranger REST API failure: " + conn.getResponseCode()
@@ -308,17 +454,18 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
         + "}";
   }
 
-  public String createRole(OzoneTenantRolePrincipal role, String adminRoleName)
+  public String createRole(String role, String adminRoleName)
       throws IOException {
 
     String endpointUrl =
         rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_CREATE_ROLE_HTTP_ENDPOINT;
 
-    String jsonData = getCreateRoleJsonStr(role.toString(), adminRoleName);
+    String jsonData = getCreateRoleJsonStr(role, adminRoleName);
 
-    final HttpsURLConnection conn = makeHttpCall(endpointUrl,
+    final HttpURLConnection conn = makeHttpCall(endpointUrl,
         jsonData, "POST", false);
     if (conn.getResponseCode() != HTTP_OK) {
+      // TODO: Do not throw on 400 ?
       throw new IOException("Ranger REST API failure: " + conn.getResponseCode()
           + " " + conn.getResponseMessage()
           + ". Role name '" + role + "' likely already exists in Ranger");
@@ -336,11 +483,49 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
     return roleId;
   }
 
-  public String createAccessPolicy(AccessPolicy policy) throws Exception {
+  private String getCreateUserJsonStr(String userName, String password) {
+    return "{"
+        + "  \"name\":\"" +  userName + "\","
+        + "  \"password\":\"" +  password + "\","
+        + "  \"firstName\":\"" +  userName + "\","
+        + "  \"userRoleList\":[\"ROLE_USER\"]"
+        + "}";
+  }
+
+  public String createUser(String userName, String password)
+      throws IOException {
+
+    String endpointUrl =
+        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_CREATE_USER_HTTP_ENDPOINT;
+
+    String jsonData = getCreateUserJsonStr(userName, password);
+
+    final HttpURLConnection conn = makeHttpCall(endpointUrl,
+        jsonData, "POST", false);
+    if (conn.getResponseCode() != HTTP_OK) {
+      throw new IOException("Ranger REST API failure: " + conn.getResponseCode()
+          + " " + conn.getResponseMessage()
+          + ". User name '" + userName + "' likely already exists in Ranger");
+    }
+    String userInfo = getResponseData(conn);
+    String userId;
+    try {
+      JsonObject jObject = new JsonParser().parse(userInfo).getAsJsonObject();
+      userId = jObject.get("id").getAsString();
+      LOG.debug("Ranger returned userId: {}", userId);
+    } catch (JsonParseException e) {
+      e.printStackTrace();
+      throw e;
+    }
+    return userId;
+  }
+
+
+  public String createAccessPolicy(AccessPolicy policy) throws IOException {
     String rangerAdminUrl =
         rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_CREATE_POLICY_HTTP_ENDPOINT;
 
-    HttpsURLConnection conn = makeHttpCall(rangerAdminUrl,
+    HttpURLConnection conn = makeHttpCall(rangerAdminUrl,
         policy.serializePolicyToJsonString(),
         "POST", false);
     String policyInfo = getResponseData(conn);
@@ -358,75 +543,173 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
   }
 
   public AccessPolicy getAccessPolicyByName(String policyName)
-      throws Exception {
+      throws IOException {
     String rangerAdminUrl =
         rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_POLICY_HTTP_ENDPOINT +
         policyName;
 
-    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl,
+    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl,
+        "GET", false);
+    String policyInfo = getResponseData(conn);
+    JsonArray jArry = new JsonParser().parse(policyInfo).getAsJsonArray();
+    if (jArry.size() > 0) {
+      JsonObject jsonObject = jArry.get(0).getAsJsonObject();
+      AccessPolicy policy = new RangerAccessPolicy(policyName);
+      policy.deserializePolicyFromJsonString(jsonObject);
+      return policy;
+    } else {
+      // Returns null when policyInfo is an empty array
+      return null;
+    }
+  }
+
+  @Override
+  public AccessPolicy getAccessPolicyById(String policyId)
+      throws IOException {
+    String rangerAdminUrl =
+        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_GET_POLICY_ID_HTTP_ENDPOINT +
+            policyId;
+
+    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl,
         "GET", false);
     String policyInfo = getResponseData(conn);
     JsonArray jArry = new JsonParser().parse(policyInfo).getAsJsonArray();
     JsonObject jsonObject = jArry.get(0).getAsJsonObject();
-    AccessPolicy policy = new RangerAccessPolicy(policyName);
+    AccessPolicy policy =
+        new RangerAccessPolicy(jsonObject.get("name").getAsString());
     policy.deserializePolicyFromJsonString(jsonObject);
     return policy;
   }
 
+  /**
+   * Returns the service ID for Ozone service in Ranger.
+   * TODO: Error handling when Ozone service doesn't exist in Ranger.
+   */
+  public int retrieveRangerOzoneServiceId() throws IOException {
+
+    String rangerAdminUrl =
+        rangerHttpsAddress + OZONE_OM_RANGER_OZONE_SERVICE_ENDPOINT;
+    int id = 0;
+
+    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl,
+        "GET", false);
+    String sInfo = getResponseData(conn);
+    JsonObject jObject = new JsonParser().parse(sInfo).getAsJsonObject();
+    JsonArray jArry = jObject.getAsJsonArray("services");
+    for (int i = 0; i < jArry.size(); ++i) {
+      JsonObject serviceObj = jArry.get(i).getAsJsonObject();
+      String serviceName = serviceObj.get("type").getAsString();
+      if (!serviceName.equals("ozone")) {
+        continue;
+      }
+      id = serviceObj.get("id").getAsInt();
+    }
+    return id;
+  }
+
+  public long getLatestOzoneServiceVersion() throws IOException {
+    String rangerAdminUrl = rangerHttpsAddress
+        + OZONE_OM_RANGER_OZONE_SERVICE_ENDPOINT + getRangerOzoneServiceId();
+
+    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl, "GET", false);
+    String sInfo = getResponseData(conn);
+    JsonObject jObject = new JsonParser().parse(sInfo).getAsJsonObject();
+    return jObject.get("policyVersion").getAsLong();
+  }
+
+  public String getIncrementalRangerChanges(long baseVersion)
+      throws IOException {
+    String rangerAdminUrl =
+        rangerHttpsAddress + OZONE_OM_RANGER_DOWNLOAD_ENDPOINT + baseVersion;
+
+    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl, "GET", false);
+    String sInfo = getResponseData(conn);
+    return sInfo;
+  }
+
+  public String getAllMultiTenantPolicies() throws IOException {
+
+    // Note: Ranger incremental policies API is broken. So we use policy label
+    // filter to get all Multi-Tenant policies.
+
+    String rangerAdminUrl = rangerHttpsAddress
+        + OZONE_OM_RANGER_ALL_POLICIES_ENDPOINT + getRangerOzoneServiceId()
+        + "?policyLabelsPartial=" + OZONE_TENANT_RANGER_POLICY_LABEL;
+
+    // Also note: policyLabels (not partial) arg doesn't seem to work for Ranger
+    // at this point. When Ranger fixed this we could use exact match instead,
+    // then we can remove the verification logic in
+    // loadAllPoliciesRolesFromRanger().
+
+    HttpURLConnection conn = makeHttpGetCall(rangerAdminUrl, "GET", false);
+    final String jsonStr = getResponseData(conn);
+
+    if (jsonStr == null) {
+      throw new IOException("Invalid response from " + rangerAdminUrl);
+    }
+
+    return jsonStr;
+  }
+
+  @Override
+  public MultiTenantAccessController getMultiTenantAccessController() {
+    return this.accessController;
+  }
+
   public void deleteUser(String userId) throws IOException {
 
     String rangerAdminUrl =
         rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_DELETE_USER_HTTP_ENDPOINT
             + userId + "?forceDelete=true";
 
-    HttpsURLConnection conn = makeHttpCall(rangerAdminUrl, null,
+    HttpURLConnection conn = makeHttpCall(rangerAdminUrl, null,
         "DELETE", false);
     int respnseCode = conn.getResponseCode();
     if (respnseCode != 200 && respnseCode != 204) {
-      throw new IOException("Couldnt delete user " + userId);
+      throw new IOException("Couldn't delete user " + userId);
     }
   }
 
-  public void deleteRole(String groupId) throws IOException {
+  public void deleteRole(String roleName) throws IOException {
 
     String rangerAdminUrl =
-        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_DELETE_GROUP_HTTP_ENDPOINT
-            + groupId + "?forceDelete=true";
+        rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_DELETE_ROLE_HTTP_ENDPOINT
+            + roleName + "?forceDelete=true";
 
-    HttpsURLConnection conn = makeHttpCall(rangerAdminUrl, null,
+    HttpURLConnection conn = makeHttpCall(rangerAdminUrl, null,
         "DELETE", false);
     int respnseCode = conn.getResponseCode();
     if (respnseCode != 200 && respnseCode != 204) {
-      throw new IOException("Couldnt delete group " + groupId);
+      throw new IOException("Couldn't delete role " + roleName);
     }
   }
 
   @Override
-  public void deletePolicybyName(String policyName) throws Exception {
+  public void deletePolicyByName(String policyName) throws IOException {
     AccessPolicy policy = getAccessPolicyByName(policyName);
     String  policyID = policy.getPolicyID();
     LOG.debug("policyID is: {}", policyID);
-    deletePolicybyId(policyID);
+    deletePolicyById(policyID);
   }
 
-  public void deletePolicybyId(String policyId) throws IOException {
+  public void deletePolicyById(String policyId) throws IOException {
 
     String rangerAdminUrl =
         rangerHttpsAddress + OZONE_OM_RANGER_ADMIN_DELETE_POLICY_HTTP_ENDPOINT
             + policyId + "?forceDelete=true";
     try {
-      HttpsURLConnection conn = makeHttpCall(rangerAdminUrl, null,
+      HttpURLConnection conn = makeHttpCall(rangerAdminUrl, null,
           "DELETE", false);
       int respnseCode = conn.getResponseCode();
       if (respnseCode != 200 && respnseCode != 204) {
-        throw new IOException("Couldnt delete policy " + policyId);
+        throw new IOException("Couldn't delete policy " + policyId);
       }
     } catch (Exception e) {
-      throw new IOException("Couldnt delete policy " + policyId, e);
+      throw new IOException("Couldn't delete policy " + policyId, e);
     }
   }
 
-  private String getResponseData(HttpsURLConnection urlConnection)
+  private String getResponseData(HttpURLConnection urlConnection)
       throws IOException {
     StringBuilder response = new StringBuilder();
     try (BufferedReader br = new BufferedReader(
@@ -437,20 +720,50 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
         response.append(responseLine.trim());
       }
       LOG.debug("Got response: {}", response);
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw e;
+    } catch (IOException e) {
+      // Common exceptions:
+      // 1. Server returned HTTP response code: 401
+      //   - Possibly incorrect Ranger credentials
+      // 2. Server returned HTTP response code: 400
+      //   - Policy or role does not exist
+      switch (urlConnection.getResponseCode()) {
+      case 400:
+        LOG.error("The policy or role likely does not exist in Ranger");
+        return null;
+      case 401:
+        LOG.error("Check Ranger credentials");
+//        break;
+      default:
+        e.printStackTrace();
+        throw e;
+      }
     }
     return response.toString();
   }
 
-  private HttpsURLConnection makeHttpCall(String urlString,
-                                              String jsonInputString,
-                                              String method, boolean isSpnego)
+  private HttpURLConnection openURLConnection(URL url) throws IOException {
+    final HttpURLConnection urlConnection;
+    if (url.getProtocol().equals("https")) {
+      urlConnection = (HttpsURLConnection) url.openConnection();
+    } else if (url.getProtocol().equals("http")) {
+      urlConnection = (HttpURLConnection) url.openConnection();
+    } else {
+      throw new IOException("Unsupported protocol: " + url.getProtocol() +
+          "URL: " + url);
+    }
+    return urlConnection;
+  }
+
+  /**
+   * Can make either http or https request.
+   */
+  private HttpURLConnection makeHttpCall(String urlString,
+      String jsonInputString, String method, boolean isSpnego)
       throws IOException {
 
     URL url = new URL(urlString);
-    HttpsURLConnection urlConnection = (HttpsURLConnection)url.openConnection();
+    final HttpURLConnection urlConnection = openURLConnection(url);
+
     urlConnection.setRequestMethod(method);
     urlConnection.setConnectTimeout(connectionTimeout);
     urlConnection.setReadTimeout(connectionRequestTimeout);
@@ -470,11 +783,15 @@ public class MultiTenantAccessAuthorizerRangerPlugin implements
     return urlConnection;
   }
 
-  private HttpsURLConnection makeHttpsGetCall(String urlString,
+  /**
+   * Can make either http or https request.
+   */
+  private HttpURLConnection makeHttpGetCall(String urlString,
       String method, boolean isSpnego) throws IOException {
 
     URL url = new URL(urlString);
-    HttpsURLConnection urlConnection = (HttpsURLConnection)url.openConnection();
+    final HttpURLConnection urlConnection = openURLConnection(url);
+
     urlConnection.setRequestMethod(method);
     urlConnection.setConnectTimeout(connectionTimeout);
     urlConnection.setReadTimeout(connectionRequestTimeout);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java
index 3d50108f78..1b32088aa4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/MultiTenantAccessController.java
@@ -254,6 +254,7 @@ public interface MultiTenantAccessController {
     private final String description;
     private final Map<String, Collection<Acl>> roleAcls;
     private final Set<String> labels;
+    private final boolean isEnabled;
 
     private Policy(Builder builder) {
       name = builder.name;
@@ -263,6 +264,7 @@ public interface MultiTenantAccessController {
       description = builder.description;
       roleAcls = builder.roleAcls;
       labels = builder.labels;
+      isEnabled = builder.isEnabled;
     }
 
     public Set<String> getVolumes() {
@@ -316,6 +318,10 @@ public interface MultiTenantAccessController {
           Objects.equals(getLabels(), policy.getLabels());
     }
 
+    public boolean isEnabled() {
+      return isEnabled;
+    }
+
     /**
      * Builder class for a policy.
      */
@@ -327,6 +333,7 @@ public interface MultiTenantAccessController {
       private String description;
       private final Map<String, Collection<Acl>> roleAcls;
       private final Set<String> labels;
+      private boolean isEnabled;
 
       public Builder() {
         this.volumes = new HashSet<>();
@@ -341,6 +348,11 @@ public interface MultiTenantAccessController {
         return this;
       }
 
+      public Builder setEnabled(boolean enabled) {
+        this.isEnabled = enabled;
+        return this;
+      }
+
       public Builder addVolume(String volume) {
         this.volumes.add(volume);
         return this;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java
new file mode 100644
index 0000000000..323a494b97
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java
@@ -0,0 +1,773 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetRangerServiceVersionRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_TENANT_RANGER_POLICY_LABEL;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger. This recovers or cleans up (Multi-Tenant related)
+ * Ranger policies and roles in case of OM crashes or Ranger failure.
+ *
+ * Multi-Tenant related Ranger policies and roles are *eventually* consistent
+ * with OM DB tenant state. OM DB is the source of truth.
+ *
+ * While the sync thread is updating Ranger, user or other applications
+ * editing Ranger Ozone policies or roles could interfere with the update.
+ * In this case, a sync run might leave Ranger in a de-synced state, due to
+ * limited maximum number of update attempts for each run.
+ * But this should eventually be corrected in future sync runs.
+ *
+ * See the comment block in triggerRangerSyncOnce() for more on the core logic.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+
+  private volatile boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(name, id, userSet);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      BGRole bgRole = (BGRole) o;
+      return name.equals(bgRole.name)
+          && id.equals(bgRole.id)
+          && userSet.equals(bgRole.userSet);
+    }
+  }
+
+  enum PolicyType {
+    BUCKET_NAMESPACE_POLICY,
+    BUCKET_POLICY
+  }
+
+  /**
+   * Helper class that stores the tenant name and policy type.
+   */
+  static class PolicyInfo {
+
+    private final String tenantId;
+    private final PolicyType policyType;
+
+    PolicyInfo(String tenantId, PolicyType policyType) {
+      this.tenantId = tenantId;
+      this.policyType = policyType;
+    }
+
+    public String getTenantId() {
+      return tenantId;
+    }
+
+    public PolicyType getPolicyType() {
+      return policyType;
+    }
+
+    @Override
+    public String toString() {
+      return "PolicyInfo{" +
+          "tenantId='" + tenantId + '\'' + ", policyType=" + policyType + '}';
+    }
+  }
+
+  // This map keeps all the policies found in OM DB. These policies should be
+  // in Ranger. If not, the default policy will be (re)created.
+  //
+  // Maps from policy name to PolicyInfo (tenant name and policy type) in Ranger
+  private final HashMap<String, PolicyInfo> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout) {
+
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    if (authorizer != null) {
+      this.authorizer = authorizer;
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer not set. Using dummy authorizer");
+      this.authorizer = new MultiTenantAccessAuthorizerDummyPlugin();
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  @Override
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  @Override
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long dbOzoneServiceVersion = getOMDBRangerServiceVersion();
+      long rangerOzoneServiceVersion = getLatestRangerServiceVersion();
+
+      // Sync thread enters the while-loop when Ranger service (e.g. cm_ozone)
+      // version doesn't match the current service version persisted in the DB.
+      //
+      // When Ranger state related to the Ozone service (e.g. policies or roles)
+      // is updated, it bumps up the service version.
+      // At the end of the loop, Ranger service version is retrieved again.
+      // So in this case the loop most likely be entered a second time. But
+      // this time it is very likely that executeOMDBToRangerSync() won't push
+      // policy or role updates to Ranger as it should already be in-sync from
+      // the previous loop. If this is the case, the DB service version will be
+      // written again, and the loop should be exited.
+      //
+      // If Ranger service version is bumped again, it indicates that Ranger
+      // roles or policies were updated by ongoing OM multi-tenancy requests,
+      // or manually by a user.
+      //
+      // A maximum of MAX_ATTEMPT times will be attempted each time the sync
+      // service is run. MAX_ATTEMPT should at least be 2 to make sure OM DB
+      // has the up-to-date Ranger service version most of the times.
+      while (dbOzoneServiceVersion != rangerOzoneServiceVersion) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.warn("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}. "
+                + "Ranger service version: {}, DB version :{}",
+            runCount.get(), attempt,
+            rangerOzoneServiceVersion, dbOzoneServiceVersion);
+
+        executeOMDBToRangerSync(dbOzoneServiceVersion);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+              rangerOzoneServiceVersion, dbOzoneServiceVersion);
+        }
+        // Submit Ratis Request to sync the new service version in OM DB
+        setOMDBRangerServiceVersion(rangerOzoneServiceVersion);
+
+        // TODO: Acquire lock
+
+        // Check Ranger ozone service version again
+        dbOzoneServiceVersion = rangerOzoneServiceVersion;
+        rangerOzoneServiceVersion = getLatestRangerServiceVersion();
+      }
+    } catch (IOException | ServiceException e) {
+      LOG.warn("Exception during Ranger Sync", e);
+      // TODO: Check specific exception once switched to
+      //  RangerRestMultiTenantAccessController
+//    } finally {
+//      // TODO: Release lock
+    }
+
+  }
+
+  /**
+   * Query Ranger endpoint to get the latest Ozone service version.
+   */
+  long getLatestRangerServiceVersion() throws IOException {
+    return authorizer.getLatestOzoneServiceVersion();
+  }
+
+  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(CLIENT_ID)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  void setOMDBRangerServiceVersion(long version) throws ServiceException {
+    // OM DB update goes through Ratis
+    SetRangerServiceVersionRequest.Builder versionSyncRequest =
+        SetRangerServiceVersionRequest.newBuilder()
+            .setRangerServiceVersion(version);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.SetRangerServiceVersion)
+        .setSetRangerServiceVersionRequest(versionSyncRequest)
+        .setClientId(CLIENT_ID.toString())
+        .build();
+
+    try {
+      RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("SetRangerServiceVersion request failed. "
+          + "Will retry at next run.");
+      throw e;
+    }
+  }
+
+  long getOMDBRangerServiceVersion() throws IOException {
+    final String dbValue = ozoneManager.getMetadataManager().getMetaTable()
+        .get(OzoneConsts.RANGER_OZONE_SERVICE_VERSION_KEY);
+    if (dbValue == null) {
+      return -1L;
+    } else {
+      return Long.parseLong(dbValue);
+    }
+  }
+
+  private void executeOMDBToRangerSync(long baseVersion) throws IOException {
+    clearPolicyAndRoleMaps();
+
+    // TODO: Acquire global lock
+    loadAllPoliciesAndRoleNamesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOM();
+    // TODO: Release global lock
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    processAllRolesFromOMDB();
+  }
+
+  private void clearPolicyAndRoleMaps() {
+    mtRangerPoliciesToBeCreated.clear();
+    mtRangerPoliciesToBeDeleted.clear();
+    mtRangerRoles.clear();
+    mtOMDBRoles.clear();
+  }
+
+  /**
+   * TODO: Test and make sure invalid JSON response from Ranger won't crash OM.
+   */
+  private void loadAllPoliciesAndRoleNamesFromRanger(long baseVersion)
+      throws IOException {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("baseVersion is {}", baseVersion);
+    }
+
+    String allPolicies = authorizer.getAllMultiTenantPolicies();
+    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
+    JsonArray policies = jObject.getAsJsonArray("policies");
+    for (int i = 0; i < policies.size(); ++i) {
+      JsonObject policy = policies.get(i).getAsJsonObject();
+      JsonArray policyLabels = policy.getAsJsonArray("policyLabels");
+
+      // Verify that the policy has the OzoneTenant label
+      boolean hasOzoneTenantLabel = false;
+      // Loop just in case multiple labels are attached to the tenant policy
+      for (int j = 0; j < policyLabels.size(); j++) {
+        final String currentLabel = policyLabels.get(j).getAsString();
+        // Look for exact match
+        if (currentLabel.equals(OZONE_TENANT_RANGER_POLICY_LABEL)) {
+          hasOzoneTenantLabel = true;
+          break;
+        }
+      }
+
+      if (!hasOzoneTenantLabel) {
+        // Shouldn't get policies without the label often as it is
+        // specified in the query param, unless a user removed the tag during
+        // the sync
+        LOG.warn("Ignoring Ranger policy without the {} label: {}",
+            OZONE_TENANT_RANGER_POLICY_LABEL, policy.get("name").getAsString());
+        continue;
+      }
+
+      // Temporarily put the policy in the to-delete list,
+      // valid entries will be removed later
+      mtRangerPoliciesToBeDeleted.put(
+          policy.get("name").getAsString(),
+          policy.get("id").getAsString());
+
+      final JsonArray policyItems = policy.getAsJsonArray("policyItems");
+      for (int j = 0; j < policyItems.size(); ++j) {
+        JsonObject policyItem = policyItems.get(j).getAsJsonObject();
+        JsonArray roles = policyItem.getAsJsonArray("roles");
+        for (int k = 0; k < roles.size(); ++k) {
+          if (!mtRangerRoles.containsKey(roles.get(k).getAsString())) {
+            // We only get the role name here. We need to query and populate it.
+            mtRangerRoles.put(roles.get(k).getAsString(),
+                new BGRole(roles.get(k).getAsString()));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper function to throw benign exception if the current OM is no longer
+   * the leader in case a leader transition happened during the sync. So the
+   * sync run can abort earlier.
+   *
+   * Note: EACH Ranger request can take 3-7 seconds as tested in UT.
+   */
+  private void checkLeader() throws IOException {
+    if (!ozoneManager.isLeaderReady()) {
+      throw new OMNotLeaderException("This OM is no longer the leader. Abort");
+    }
+  }
+
+  private void loadAllRolesFromRanger() throws IOException {
+    for (Map.Entry<String, BGRole> entry: mtRangerRoles.entrySet()) {
+      final String roleName = entry.getKey();
+      checkLeader();
+      final String roleDataString = authorizer.getRole(roleName);
+      final JsonObject roleObject =
+          new JsonParser().parse(roleDataString).getAsJsonObject();
+      final BGRole role = entry.getValue();
+      role.setId(roleObject.get("id").getAsString());
+      final JsonArray userArray = roleObject.getAsJsonArray("users");
+      for (int i = 0; i < userArray.size(); ++i) {
+        role.addUserPrincipal(userArray.get(i).getAsJsonObject().get("name")
+            .getAsString());
+      }
+    }
+  }
+
+  /**
+   * Helper function to add/remove a policy name to/from mtRangerPolicies lists.
+   */
+  private void mtRangerPoliciesOpHelper(
+      String policyName, PolicyInfo policyInfo) {
+
+    if (mtRangerPoliciesToBeDeleted.containsKey(policyName)) {
+      // This entry is in sync with Ranger, remove it from the set
+      // Eventually mtRangerPolicies will only contain entries that
+      // are not in OMDB and should be removed from Ranger.
+      mtRangerPoliciesToBeDeleted.remove(policyName);
+    } else {
+      // We could not find a policy in ranger that should have been there.
+      mtRangerPoliciesToBeCreated.put(policyName, policyInfo);
+    }
+  }
+
+  private void processAllPoliciesFromOMDB() throws IOException {
+
+    // Iterate all DB tenant states. For each tenant,
+    // queue or dequeue bucketNamespacePolicyName and bucketPolicyName
+    try (TableIterator<String, ? extends KeyValue<String, OmDBTenantState>>
+        tenantStateTableIt = metadataManager.getTenantStateTable().iterator()) {
+
+      while (tenantStateTableIt.hasNext()) {
+        final KeyValue<String, OmDBTenantState> tableKeyValue =
+            tenantStateTableIt.next();
+        final OmDBTenantState dbTenantState = tableKeyValue.getValue();
+        final String tenantId = dbTenantState.getTenantId();
+        final String volumeName = dbTenantState.getBucketNamespaceName();
+        Preconditions.checkNotNull(volumeName);
+
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketNamespacePolicyName(),
+            new PolicyInfo(tenantId, PolicyType.BUCKET_NAMESPACE_POLICY));
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketPolicyName(),
+            new PolicyInfo(tenantId, PolicyType.BUCKET_POLICY));
+      }
+    }
+
+    for (Map.Entry<String, PolicyInfo> entry :
+        mtRangerPoliciesToBeCreated.entrySet()) {
+      final String policyName = entry.getKey();
+      LOG.warn("Expected policy not found in Ranger: {}", policyName);
+      checkLeader();
+      // Attempt to recreate the default volume/bucket policy if it's missing
+      attemptToCreateDefaultPolicy(entry.getValue());
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeDeleted.entrySet()) {
+      final String policyName = entry.getKey();
+      LOG.info("Deleting policy from Ranger: {}", policyName);
+      checkLeader();
+      authorizer.deletePolicyByName(policyName);
+    }
+
+  }
+
+  /**
+   * Attempts to (re)create a tenant default volume or bucket policy in Ranger.
+   */
+  private void attemptToCreateDefaultPolicy(PolicyInfo policyInfo)
+      throws IOException {
+
+    final String tenantId = policyInfo.getTenantId();
+
+    final String volumeName =
+        multiTenantManager.getTenantVolumeName(tenantId);
+    final String userRoleName =
+        multiTenantManager.getTenantUserRoleName(tenantId);
+
+    final AccessPolicy accessPolicy;
+
+    switch (policyInfo.getPolicyType()) {
+    case BUCKET_NAMESPACE_POLICY:
+      LOG.info("Recovering VolumeAccess policy for tenant: {}", tenantId);
+
+      final String adminRoleName =
+          multiTenantManager.getTenantAdminRoleName(tenantId);
+
+      accessPolicy = multiTenantManager.newDefaultVolumeAccessPolicy(volumeName,
+          new OzoneTenantRolePrincipal(userRoleName),
+          new OzoneTenantRolePrincipal(adminRoleName));
+      break;
+
+    case BUCKET_POLICY:
+      LOG.info("Recovering BucketAccess policy for tenant: {}", tenantId);
+
+      accessPolicy = multiTenantManager.newDefaultBucketAccessPolicy(volumeName,
+              new OzoneTenantRolePrincipal(userRoleName));
+      break;
+
+    default:
+      throw new OMException("Unknown policy type in " + policyInfo,
+          ResultCodes.INTERNAL_ERROR);
+    }
+
+    String id = authorizer.createAccessPolicy(accessPolicy);
+    LOG.info("Created policy, ID: {}", id);
+  }
+
+  private void loadAllRolesFromOM() throws IOException {
+    if (multiTenantManager instanceof OMMultiTenantManagerImpl) {
+      loadAllRolesFromCache();
+    } else {
+      LOG.warn("Cache is not supported for {}. Loading roles directly from DB",
+          multiTenantManager.getClass().getSimpleName());
+      loadAllRolesFromDB();
+    }
+  }
+
+  private void loadAllRolesFromCache() {
+
+    final OMMultiTenantManagerImpl impl =
+        (OMMultiTenantManagerImpl) multiTenantManager;
+
+    mtOMDBRoles.putAll(impl.getAllRolesFromCache());
+  }
+
+  private void loadAllRolesFromDB() throws IOException {
+    // We have the following in OM DB
+    //  tenantStateTable: tenantId -> TenantState (has user and admin role name)
+    //  tenantAccessIdTable : accessId -> OmDBAccessIdInfo
+
+    final Table<String, OmDBTenantState> tenantStateTable =
+        metadataManager.getTenantStateTable();
+
+    // Iterate all DB ExtendedUserAccessIdInfo. For each accessId,
+    // add to userRole. And add to adminRole if isAdmin is set.
+    try (TableIterator<String, ? extends KeyValue<String, OmDBAccessIdInfo>>
+        tenantAccessIdTableIter =
+        metadataManager.getTenantAccessIdTable().iterator()) {
+
+      while (tenantAccessIdTableIter.hasNext()) {
+        final KeyValue<String, OmDBAccessIdInfo> tableKeyValue =
+            tenantAccessIdTableIter.next();
+        final OmDBAccessIdInfo dbAccessIdInfo = tableKeyValue.getValue();
+
+        final String tenantId = dbAccessIdInfo.getTenantId();
+        final String userPrincipal = dbAccessIdInfo.getUserPrincipal();
+
+        final OmDBTenantState dbTenantState = tenantStateTable.get(tenantId);
+
+        if (dbTenantState == null) {
+          // tenant state might have been deleted by some other ongoing requests
+          LOG.warn("OmDBTenantState for tenant '{}' doesn't exist!", tenantId);
+          continue;
+        }
+
+        final String userRoleName = dbTenantState.getUserRoleName();
+        mtOMDBRoles.computeIfAbsent(userRoleName, any -> new HashSet<>());
+        final String adminRoleName = dbTenantState.getAdminRoleName();
+        mtOMDBRoles.computeIfAbsent(adminRoleName, any -> new HashSet<>());
+
+        // Every tenant user should be in the tenant's userRole
+        addUserToMtOMDBRoles(userRoleName, userPrincipal);
+
+        // If the accessId has admin flag set, add to adminRole as well
+        if (dbAccessIdInfo.getIsAdmin()) {
+          addUserToMtOMDBRoles(adminRoleName, userPrincipal);
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper function to add user principal to a role in mtOMDBRoles.
+   */
+  private void addUserToMtOMDBRoles(String roleName, String userPrincipal) {
+    if (!mtOMDBRoles.containsKey(roleName)) {
+      mtOMDBRoles.put(roleName, new HashSet<>(
+          Collections.singletonList(userPrincipal)));
+    } else {
+      final HashSet<String> usersInTheRole = mtOMDBRoles.get(roleName);
+      usersInTheRole.add(userPrincipal);
+    }
+  }
+
+  private void processAllRolesFromOMDB() throws IOException {
+    // Lets First make sure that all the Roles in OM DB are present in Ranger
+    // as well as the corresponding userlist matches matches.
+    for (Map.Entry<String, HashSet<String>> entry : mtOMDBRoles.entrySet()) {
+      final String roleName = entry.getKey();
+      boolean pushRoleToRanger = false;
+      if (mtRangerRoles.containsKey(roleName)) {
+        final HashSet<String> rangerUserList =
+            mtRangerRoles.get(roleName).getUserSet();
+        final HashSet<String> userSet = entry.getValue();
+        for (String userPrincipal : userSet) {
+          if (rangerUserList.contains(userPrincipal)) {
+            rangerUserList.remove(userPrincipal);
+          } else {
+            // We found a user in OM DB Role that doesn't exist in Ranger Role.
+            // Lets just push the role from OM DB to Ranger
+            pushRoleToRanger = true;
+            break;
+          }
+        }
+        // We have processed all the Userlist entries in the OMDB. If
+        // ranger Userlist is not empty, Ranger has users that OM DB does not.
+        if (!rangerUserList.isEmpty()) {
+          pushRoleToRanger = true;
+        }
+      } else {
+        // 1. The role is missing from Ranger, or;
+        // 2. A policy (that uses this role) is missing from Ranger, causing
+        // mtRangerRoles to be populated incorrectly. In this case the roles
+        // are there just fine. If not, will be corrected in the next run anyway
+        checkLeader();
+        try {
+          authorizer.createRole(roleName, null);
+        } catch (IOException e) {
+          // Tolerate create role failure, possibly due to role already exists
+          LOG.error(e.getMessage());
+        }
+        pushRoleToRanger = true;
+      }
+      if (pushRoleToRanger) {
+        LOG.info("Updating role in Ranger: {}", roleName);
+        checkLeader();
+        pushOMDBRoleToRanger(roleName);
+      }
+      // We have processed this role in OM DB now. Lets remove it from
+      // mtRangerRoles. Eventually whatever is left in mtRangerRoles
+      // are extra entries, that should not have been in Ranger.
+      mtRangerRoles.remove(roleName);
+    }
+
+    // A hack (for now) to delete UserRole before AdminRole
+    final Set<String> rolesToDelete = new TreeSet<>(Collections.reverseOrder());
+    rolesToDelete.addAll(mtRangerRoles.keySet());
+
+    for (String roleName : rolesToDelete) {
+      LOG.warn("Deleting role from Ranger: {}", roleName);
+      checkLeader();
+      try {
+        final String roleObj = authorizer.getRole(roleName);
+        authorizer.deleteRole(new JsonParser().parse(roleObj)
+            .getAsJsonObject().get("id").getAsString());
+      } catch (IOException e) {
+        // The role might have been deleted already.
+        // Or the role could be referenced in other roles or policies.
+        LOG.error("Failed to delete role: {}", roleName);
+        throw e;
+      }
+      // TODO: Server returned HTTP response code: 400
+      //  if already deleted or is depended on
+    }
+  }
+
+  private void pushOMDBRoleToRanger(String roleName) throws IOException {
+    HashSet<String> omdbUserList = mtOMDBRoles.get(roleName);
+    String roleJsonStr = authorizer.getRole(roleName);
+    authorizer.assignAllUsers(omdbUserList, roleJsonStr);
+  }
+
+  /**
+   * Return the number of runs the sync is triggered.
+   *
+   * This doesn't count attempts inside each sync run.
+   */
+  public long getRangerSyncRunCount() {
+    return runCount.get();
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
new file mode 100644
index 0000000000..0c6db53e1a
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/RangerRestMultiTenantAccessController.java
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.multitenant;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonParser;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.http.auth.BasicUserPrincipal;
+import org.apache.kerby.util.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Type;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_HTTPS_ADMIN_API_USER;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_HTTPS_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_TIMEOUT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_CONNECTION_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_IGNORE_SERVER_CERT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_OM_IGNORE_SERVER_CERT_DEFAULT;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_RANGER_SERVICE;
+
+/**
+ * Access controller for multi-tenancy implemented using Ranger's REST API.
+ * This class is for testing and is not intended for production use.
+ */
+public class RangerRestMultiTenantAccessController
+    implements MultiTenantAccessController {
+
+  public static final String OZONE_RANGER_POLICY_HTTP_ENDPOINT =
+      "/service/public/v2/api/policy/";
+
+  public static final String OZONE_RANGER_ROLE_HTTP_ENDPOINT =
+      "/service/public/v2/api/roles/";
+
+  private String getPolicyByNameEndpoint(String policyName) {
+    // /service/public/v2/api/service/{servicename}/policy/{policyname}
+    return rangerHttpsAddress + "/service/public/v2/api/service/" +
+        rangerService + "/policy/" + policyName;
+  }
+
+  private String getRoleByNameEndpoint(String roleName) {
+    // /service/public/v2/api/roles/name/
+    return rangerHttpsAddress + "/service/public/v2/api/roles/name/" + roleName;
+  }
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(RangerRestMultiTenantAccessController.class);
+
+  private final OzoneConfiguration conf;
+  private boolean ignoreServerCert = false;
+  private int connectionTimeout;
+  private int connectionRequestTimeout;
+  private String authHeaderValue;
+  private final String rangerHttpsAddress;
+  private final Gson jsonConverter;
+  private final String rangerService;
+  private final Map<IAccessAuthorizer.ACLType, String> aclToString;
+  private final Map<String, IAccessAuthorizer.ACLType> stringToAcl;
+  private long lastPolicyUpdateTimeEpochMillis = -1;
+
+  public RangerRestMultiTenantAccessController(Configuration configuration)
+      throws IOException {
+    conf = new OzoneConfiguration(configuration);
+    rangerHttpsAddress = conf.get(OZONE_RANGER_HTTPS_ADDRESS_KEY);
+    rangerService = conf.get(OZONE_RANGER_SERVICE);
+
+    GsonBuilder gsonBuilder = new GsonBuilder();
+    gsonBuilder.registerTypeAdapter(Policy.class, policySerializer);
+    gsonBuilder.registerTypeAdapter(Policy.class, policyDeserializer);
+    gsonBuilder.registerTypeAdapter(Role.class, roleSerializer);
+    gsonBuilder.registerTypeAdapter(Role.class, roleDeserializer);
+    gsonBuilder.registerTypeAdapter(BasicUserPrincipal.class, userSerializer);
+    jsonConverter = gsonBuilder.create();
+
+    aclToString = new EnumMap<>(IAccessAuthorizer.ACLType.class);
+    stringToAcl = new HashMap<>();
+    fillRangerAclStrings();
+    initializeRangerConnection();
+  }
+
+  private void fillRangerAclStrings() {
+    aclToString.put(IAccessAuthorizer.ACLType.ALL, "all");
+    aclToString.put(IAccessAuthorizer.ACLType.LIST, "list");
+    aclToString.put(IAccessAuthorizer.ACLType.READ, "read");
+    aclToString.put(IAccessAuthorizer.ACLType.WRITE, "write");
+    aclToString.put(IAccessAuthorizer.ACLType.CREATE, "create");
+    aclToString.put(IAccessAuthorizer.ACLType.DELETE, "delete");
+    aclToString.put(IAccessAuthorizer.ACLType.READ_ACL, "read_acl");
+    aclToString.put(IAccessAuthorizer.ACLType.WRITE_ACL, "write_acl");
+    aclToString.put(IAccessAuthorizer.ACLType.NONE, "");
+
+    stringToAcl.put("all", IAccessAuthorizer.ACLType.ALL);
+    stringToAcl.put("list", IAccessAuthorizer.ACLType.LIST);
+    stringToAcl.put("read", IAccessAuthorizer.ACLType.READ);
+    stringToAcl.put("write", IAccessAuthorizer.ACLType.WRITE);
+    stringToAcl.put("create", IAccessAuthorizer.ACLType.CREATE);
+    stringToAcl.put("delete", IAccessAuthorizer.ACLType.DELETE);
+    stringToAcl.put("read_acl", IAccessAuthorizer.ACLType.READ_ACL);
+    stringToAcl.put("write_acl", IAccessAuthorizer.ACLType.WRITE_ACL);
+    stringToAcl.put("", IAccessAuthorizer.ACLType.NONE);
+  }
+
+  private void initializeRangerConnection() {
+    setupRangerConnectionConfig();
+    if (ignoreServerCert) {
+      setupRangerIgnoreServerCertificate();
+    }
+    setupRangerConnectionAuthHeader();
+  }
+
+  private void setupRangerConnectionConfig() {
+    connectionTimeout = (int) conf.getTimeDuration(
+        OZONE_RANGER_OM_CONNECTION_TIMEOUT,
+        conf.get(
+            OZONE_RANGER_OM_CONNECTION_TIMEOUT,
+            OZONE_RANGER_OM_CONNECTION_TIMEOUT_DEFAULT),
+        TimeUnit.MILLISECONDS);
+    connectionRequestTimeout = (int)conf.getTimeDuration(
+        OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT,
+        conf.get(
+            OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT,
+            OZONE_RANGER_OM_CONNECTION_REQUEST_TIMEOUT_DEFAULT),
+        TimeUnit.MILLISECONDS
+    );
+    ignoreServerCert = conf.getBoolean(
+        OZONE_RANGER_OM_IGNORE_SERVER_CERT,
+        OZONE_RANGER_OM_IGNORE_SERVER_CERT_DEFAULT);
+  }
+
+  private void setupRangerIgnoreServerCertificate() {
+    // Create a trust manager that does not validate certificate chains
+    TrustManager[] trustAllCerts = new TrustManager[]{
+        new X509TrustManager() {
+          public java.security.cert.X509Certificate[] getAcceptedIssuers() {
+            return null;
+          }
+          public void checkClientTrusted(
+              java.security.cert.X509Certificate[] certs, String authType) {
+          }
+          public void checkServerTrusted(
+              java.security.cert.X509Certificate[] certs, String authType) {
+          }
+        }
+    };
+
+    try {
+      SSLContext sc = SSLContext.getInstance("SSL");
+      sc.init(null, trustAllCerts, new java.security.SecureRandom());
+      HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+    } catch (Exception e) {
+      LOG.info("Setting DefaultSSLSocketFactory failed.");
+    }
+  }
+
+  private void setupRangerConnectionAuthHeader() {
+    String userName = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_USER);
+    String passwd = conf.get(OZONE_OM_RANGER_HTTPS_ADMIN_API_PASSWD);
+    String auth = userName + ":" + passwd;
+    byte[] encodedAuth =
+        Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
+    authHeaderValue = "Basic " +
+        new String(encodedAuth, StandardCharsets.UTF_8);
+  }
+
+
+  @Override
+  public void createPolicy(Policy policy) throws IOException {
+    String rangerAdminUrl =
+        rangerHttpsAddress + OZONE_RANGER_POLICY_HTTP_ENDPOINT;
+    HttpsURLConnection conn = makeHttpsPostCall(rangerAdminUrl,
+        jsonConverter.toJsonTree(policy).getAsJsonObject());
+    if (!successfulResponseCode(conn.getResponseCode())) {
+      throw new IOException(String.format("Failed to create policy %s. " +
+          "Http response code: %d", policy.getName(), conn.getResponseCode()));
+    }
+    getResponseData(conn);
+  }
+
+  @Override
+  public void deletePolicy(String policyName) throws IOException {
+    String rangerAdminUrl = getPolicyByNameEndpoint(policyName);
+    HttpsURLConnection conn = makeHttpsDeleteCall(rangerAdminUrl);
+    if (!successfulResponseCode(conn.getResponseCode())) {
+      throw new IOException(String.format("Failed to delete policy '%s'. " +
+          "Http response code: %d", policyName, conn.getResponseCode()));
+    }
+  }
+
+  public Map<Long, Policy> getPolicies() throws Exception {
+    // This API gets all policies for all services. The
+    // /public/v2/api/policies/{serviceDefName}/for-resource endpoint is
+    // supposed to get policies for only a specified service, but it does not
+    // seem to work. This implementation should be ok for testing purposes as
+    // this class is intended.
+    String rangerAdminUrl =
+        rangerHttpsAddress + OZONE_RANGER_POLICY_HTTP_ENDPOINT;
+    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl);
+    if (!successfulResponseCode(conn.getResponseCode())) {
+      throw new IOException(String.format("Failed to get all policies. " +
+          "Http response code: %d", conn.getResponseCode()));
+    }
+    String allPoliciesString = getResponseData(conn);
+    // Filter out policies not for Ozone service.
+    JsonArray jsonPoliciesArray = new JsonParser().parse(allPoliciesString)
+        .getAsJsonArray();
+    Map<Long, Policy> policies = new HashMap<>();
+    for (JsonElement jsonPolicy: jsonPoliciesArray) {
+      JsonObject jsonPolicyObject = jsonPolicy.getAsJsonObject();
+      String service = jsonPolicyObject.get("service").getAsString();
+      if (service.equals(rangerService)) {
+        long id = jsonPolicyObject.get("id").getAsLong();
+        policies.put(id, jsonConverter.fromJson(jsonPolicyObject,
+            Policy.class));
+      }
+    }
+
+    return policies;
+  }
+
+  @Override
+  public Policy getPolicy(String policyName) throws IOException {
+    String rangerAdminUrl = getPolicyByNameEndpoint(policyName);
+
+    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl);
+    if (!successfulResponseCode(conn.getResponseCode())) {
+      throw new IOException(String.format("Failed to get policy '%s'. " +
+          "Http response code: %d", policyName, conn.getResponseCode()));
+    }
+    String policyInfo = getResponseData(conn);
+    return jsonConverter.fromJson(policyInfo, Policy.class);
+  }
+
+  @Override
+  public List<Policy> getLabeledPolicies(String label) throws Exception {
+    throw new NotImplementedException("Not Implemented");
+  }
+
+  @Override
+  public void updatePolicy(Policy policy) throws Exception {
+    throw new NotImplementedException("Not Implemented");
+  }
+
+  public void updatePolicy(long policyID, Policy policy) throws IOException {
+    String rangerAdminUrl =
+        rangerHttpsAddress + OZONE_RANGER_POLICY_HTTP_ENDPOINT + policyID;
+
+    HttpsURLConnection conn = makeHttpsPutCall(rangerAdminUrl,
+        jsonConverter.toJsonTree(policy));
+    if (!successfulResponseCode(conn.getResponseCode())) {
+      throw new IOException(String.format("Failed to update policy %d. " +
+          "Http response code: %d", policyID, conn.getResponseCode()));
+    }
+  }
+
+  @Override
+  public void createRole(Role role) throws IOException {
+    String rangerAdminUrl =
+        rangerHttpsAddress + OZONE_RANGER_ROLE_HTTP_ENDPOINT;
+
+    HttpsURLConnection conn = makeHttpsPostCall(rangerAdminUrl,
+        jsonConverter.toJsonTree(role).getAsJsonObject());
+    if (!successfulResponseCode(conn.getResponseCode())) {
+      throw new IOException(String.format("Failed to create role %s. " +
+          "Http response code: %d", role.getName(), conn.getResponseCode()));
+    }
+    String responseString = getResponseData(conn);
+    JsonObject jObject = new JsonParser().parse(responseString)
+        .getAsJsonObject();
+//    return jObject.get("id").getAsLong();
+  }
+
+  @Override
+  public void deleteRole(String roleName) throws IOException {
+    String rangerAdminUrl = getRoleByNameEndpoint(roleName);
+    HttpsURLConnection conn = makeHttpsDeleteCall(rangerAdminUrl);
+    if (!successfulResponseCode(conn.getResponseCode())) {
+      throw new IOException(String.format("Failed to delete role '%s'. " +
+          "Http response code: %d", roleName, conn.getResponseCode()));
+    }
+  }
+
+  @Override
+  public long getRangerServiceVersion() throws Exception {
+    throw new NotImplementedException("Not Implemented");
+  }
+
+  public Map<Long, Role> getRoles() throws Exception {
+    String rangerAdminUrl =
+        rangerHttpsAddress + OZONE_RANGER_ROLE_HTTP_ENDPOINT;
+    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl);
+    if (!successfulResponseCode(conn.getResponseCode())) {
+      throw new IOException(String.format("Failed to get all roles. " +
+          "Http response code: %d", conn.getResponseCode()));
+    }
+
+    String allRolesString = getResponseData(conn);
+    JsonArray rolesArrayJson =
+        new JsonParser().parse(allRolesString).getAsJsonArray();
+    Map<Long, Role> roles = new HashMap<>();
+    for (JsonElement roleJson: rolesArrayJson) {
+      long id = roleJson.getAsJsonObject().get("id").getAsLong();
+      roles.put(id, jsonConverter.fromJson(roleJson, Role.class));
+    }
+
+    return roles;
+  }
+
+  @Override
+  public Role getRole(String roleName) throws IOException {
+    String rangerAdminUrl = getRoleByNameEndpoint(roleName);
+
+    HttpsURLConnection conn = makeHttpsGetCall(rangerAdminUrl);
+    if (!successfulResponseCode(conn.getResponseCode())) {
+      throw new IOException(String.format("Failed to get role '%s'. " +
+          "Http response code: %d", roleName, conn.getResponseCode()));
+    }
+    String roleInfo = getResponseData(conn);
+    return jsonConverter.fromJson(roleInfo, Role.class);
+  }
+
+  @Override
+  public void updateRole(long roleID, Role role) throws IOException {
+    String rangerAdminUrl =
+        rangerHttpsAddress + OZONE_RANGER_ROLE_HTTP_ENDPOINT + roleID;
+
+    HttpsURLConnection conn = makeHttpsPutCall(rangerAdminUrl,
+        jsonConverter.toJsonTree(role));
+    if (!successfulResponseCode(conn.getResponseCode())) {
+      throw new IOException(String.format("Failed to update role %d. " +
+          "Http response code: %d", roleID, conn.getResponseCode()));
+    }
+  }
+
+  private HttpsURLConnection makeHttpsPutCall(String url, JsonElement content)
+      throws IOException {
+    HttpsURLConnection connection = makeBaseHttpsURLConnection(url);
+    connection.setRequestMethod("PUT");
+    return addJsonContentToConnection(connection, content);
+  }
+
+  private HttpsURLConnection makeHttpsPostCall(String url, JsonElement content)
+      throws IOException {
+    HttpsURLConnection connection = makeBaseHttpsURLConnection(url);
+    connection.setRequestMethod("POST");
+    return addJsonContentToConnection(connection, content);
+  }
+
+  private HttpsURLConnection addJsonContentToConnection(
+      HttpsURLConnection connection, JsonElement content) throws IOException {
+    connection.setDoOutput(true);
+    connection.setRequestProperty("Content-Type", "application/json;");
+    try (OutputStream os = connection.getOutputStream()) {
+      byte[] input = content.toString().getBytes(StandardCharsets.UTF_8);
+      os.write(input, 0, input.length);
+      os.flush();
+    }
+
+    return connection;
+  }
+
+  private HttpsURLConnection makeHttpsGetCall(String urlString)
+      throws IOException {
+    HttpsURLConnection connection = makeBaseHttpsURLConnection(urlString);
+    connection.setRequestMethod("GET");
+    return connection;
+  }
+
+  private HttpsURLConnection makeHttpsDeleteCall(String urlString)
+      throws IOException {
+    HttpsURLConnection connection = makeBaseHttpsURLConnection(urlString);
+    connection.setRequestMethod("DELETE");
+    return connection;
+  }
+
+  private HttpsURLConnection makeBaseHttpsURLConnection(String urlString)
+      throws IOException {
+    URL url = new URL(urlString);
+    HttpsURLConnection urlConnection = (HttpsURLConnection)url.openConnection();
+    urlConnection.setConnectTimeout(connectionTimeout);
+    urlConnection.setReadTimeout(connectionRequestTimeout);
+    urlConnection.setRequestProperty("Accept", "application/json");
+    urlConnection.setRequestProperty("Authorization", authHeaderValue);
+
+    return urlConnection;
+  }
+
+  private String getResponseData(HttpsURLConnection urlConnection)
+      throws IOException {
+    StringBuilder response = new StringBuilder();
+    try (BufferedReader br = new BufferedReader(
+        new InputStreamReader(
+            urlConnection.getInputStream(), StandardCharsets.UTF_8))) {
+      String responseLine;
+      while ((responseLine = br.readLine()) != null) {
+        response.append(responseLine.trim());
+      }
+    }
+    return response.toString();
+  }
+
+  private boolean successfulResponseCode(long responseCode) {
+    return responseCode >= 200 && responseCode < 300;
+  }
+
+  /// SERIALIZATION ///
+
+  private final JsonDeserializer<Policy> policyDeserializer =
+      new JsonDeserializer<Policy>() {
+        @Override public Policy deserialize(JsonElement jsonElement, Type type,
+            JsonDeserializationContext jsonDeserializationContext)
+            throws JsonParseException {
+          JsonObject policyJson = jsonElement.getAsJsonObject();
+          String name = policyJson.get("name").getAsString();
+          Policy.Builder policyB = new Policy.Builder();
+          policyB.setName(name);
+          if (policyJson.has("description")) {
+            policyB.setDescription(policyJson.get("description").getAsString());
+          }
+          policyB.setEnabled(policyJson.get("isEnabled").getAsBoolean());
+
+          // Read volume, bucket, keys from json.
+          JsonObject resourcesJson =
+              policyJson.get("resources").getAsJsonObject();
+          // All Ozone Ranger policies specify at least a volume.
+          JsonObject jsonVolumeResource =
+              resourcesJson.get("volume").getAsJsonObject();
+          JsonArray volumes = jsonVolumeResource.get("values").getAsJsonArray();
+          volumes.forEach(vol -> policyB.addVolume(vol.getAsString()));
+
+          if (resourcesJson.has("bucket")) {
+            JsonObject jsonBucketResource =
+                resourcesJson.get("bucket").getAsJsonObject();
+            JsonArray buckets =
+                jsonBucketResource.get("values").getAsJsonArray();
+            buckets.forEach(bucket -> policyB.addBucket(bucket.getAsString()));
+          }
+
+          if (resourcesJson.has("key")) {
+            JsonObject jsonKeysResource =
+                resourcesJson.get("key").getAsJsonObject();
+            JsonArray keys = jsonKeysResource.get("values").getAsJsonArray();
+            keys.forEach(key -> policyB.addKey(key.getAsString()));
+          }
+
+          // Read Roles and their ACLs.
+          JsonArray policyItemsJson = policyJson.getAsJsonArray("policyItems");
+          for (JsonElement policyItemElement : policyItemsJson) {
+            JsonObject policyItemJson = policyItemElement.getAsJsonObject();
+            JsonArray jsonRoles = policyItemJson.getAsJsonArray("roles");
+            JsonArray jsonAclArray = policyItemJson.getAsJsonArray("accesses");
+
+            for (JsonElement jsonAclElem : jsonAclArray) {
+              JsonObject jsonAcl = jsonAclElem.getAsJsonObject();
+              String aclType = jsonAcl.get("type").getAsString();
+              Acl acl;
+              if (jsonAcl.get("isAllowed").getAsBoolean()) {
+                acl = Acl.allow(stringToAcl.get(aclType));
+              } else {
+                acl = Acl.deny(stringToAcl.get(aclType));
+              }
+
+              for (JsonElement roleNameJson : jsonRoles) {
+                policyB.addRoleAcl(roleNameJson.getAsString(),
+                    Collections.singleton(acl));
+              }
+            }
+          }
+
+          return policyB.build();
+        }
+      };
+
+  private final JsonDeserializer<Role> roleDeserializer =
+      new JsonDeserializer<Role>() {
+        @Override public Role deserialize(JsonElement jsonElement, Type type,
+            JsonDeserializationContext jsonDeserializationContext)
+            throws JsonParseException {
+          JsonObject roleJson = jsonElement.getAsJsonObject();
+          String name = roleJson.get("name").getAsString();
+          Role.Builder role = new Role.Builder();
+          role.setName(name);
+          if (roleJson.has("description")) {
+            role.setDescription(roleJson.get("description").getAsString());
+          }
+          for (JsonElement jsonUser : roleJson.get("users").getAsJsonArray()) {
+            String userName =
+                jsonUser.getAsJsonObject().get("name").getAsString();
+            role.addUser(new BasicUserPrincipal(userName));
+          }
+
+          return role.build();
+        }
+      };
+
+  private final JsonSerializer<Policy> policySerializer =
+      new JsonSerializer<Policy>() {
+        @Override public JsonElement serialize(Policy javaPolicy,
+            Type typeOfSrc, JsonSerializationContext context) {
+          JsonObject jsonPolicy = new JsonObject();
+          jsonPolicy.addProperty("name", javaPolicy.getName());
+          jsonPolicy.addProperty("service", rangerService);
+          jsonPolicy.addProperty("isEnabled", javaPolicy.isEnabled());
+          if (javaPolicy.getDescription().isPresent()) {
+            jsonPolicy.addProperty("description",
+                javaPolicy.getDescription().get());
+          }
+
+          // All resources under this policy are added to this object.
+          JsonObject jsonResources = new JsonObject();
+
+          // Add volumes. Ranger requires at least one volume to be specified.
+          JsonArray jsonVolumeNameArray = new JsonArray();
+          for (String volumeName : javaPolicy.getVolumes()) {
+            jsonVolumeNameArray.add(new JsonPrimitive(volumeName));
+          }
+          JsonObject jsonVolumeResource = new JsonObject();
+          jsonVolumeResource.add("values", jsonVolumeNameArray);
+          jsonVolumeResource.addProperty("isRecursive", false);
+          jsonVolumeResource.addProperty("isExcludes", false);
+          jsonResources.add("volume", jsonVolumeResource);
+
+          // Add buckets.
+          JsonArray jsonBucketNameArray = new JsonArray();
+          for (String bucketName : javaPolicy.getBuckets()) {
+            jsonBucketNameArray.add(new JsonPrimitive(bucketName));
+          }
+
+          if (jsonBucketNameArray.size() > 0) {
+            JsonObject jsonBucketResource = new JsonObject();
+            jsonBucketResource.add("values", jsonBucketNameArray);
+            jsonBucketResource.addProperty("isRecursive", false);
+            jsonBucketResource.addProperty("isExcludes", false);
+            jsonResources.add("bucket", jsonBucketResource);
+          }
+
+          // Add keys.
+          JsonArray jsonKeyNameArray = new JsonArray();
+          for (String keyName : javaPolicy.getKeys()) {
+            jsonKeyNameArray.add(new JsonPrimitive(keyName));
+          }
+          if (jsonKeyNameArray.size() > 0) {
+            JsonObject jsonKeyResource = new JsonObject();
+            jsonKeyResource.add("values", jsonKeyNameArray);
+            jsonKeyResource.addProperty("isRecursive", false);
+            jsonKeyResource.addProperty("isExcludes", false);
+            jsonResources.add("key", jsonKeyResource);
+          }
+
+          jsonPolicy.add("resources", jsonResources);
+
+          // Add roles and their acls to the policy.
+          JsonArray jsonPolicyItemArray = new JsonArray();
+
+          // Make a new policy item for each role in the map.
+          Map<String, Collection<Acl>> roleAcls = javaPolicy.getRoleAcls();
+          for (Map.Entry<String, Collection<Acl>> entry : roleAcls.entrySet()) {
+            // Add role to the policy item.
+            String roleName = entry.getKey();
+            JsonObject jsonPolicyItem = new JsonObject();
+            JsonArray jsonRoles = new JsonArray();
+            jsonRoles.add(new JsonPrimitive(roleName));
+            jsonPolicyItem.add("roles", jsonRoles);
+
+            // Add acls to the policy item.
+            JsonArray jsonAclArray = new JsonArray();
+            for (Acl acl : entry.getValue()) {
+              JsonObject jsonAcl = new JsonObject();
+              jsonAcl.addProperty("type", aclToString.get(acl.getAclType()));
+              jsonAcl.addProperty("isAllowed", acl.isAllowed());
+              jsonAclArray.add(jsonAcl);
+              jsonPolicyItem.add("accesses", jsonAclArray);
+            }
+            jsonPolicyItemArray.add(jsonPolicyItem);
+          }
+          jsonPolicy.add("policyItems", jsonPolicyItemArray);
+
+          return jsonPolicy;
+        }
+      };
+
+  private final JsonSerializer<Role> roleSerializer =
+      new JsonSerializer<Role>() {
+        @Override public JsonElement serialize(Role javaRole, Type typeOfSrc,
+            JsonSerializationContext context) {
+          JsonObject jsonRole = new JsonObject();
+          jsonRole.addProperty("name", javaRole.getName());
+
+          JsonArray jsonUserArray = new JsonArray();
+          for (BasicUserPrincipal javaUser : javaRole.getUsers()) {
+            jsonUserArray.add(jsonConverter.toJsonTree(javaUser));
+          }
+
+          jsonRole.add("users", jsonUserArray);
+          return jsonRole;
+        }
+      };
+
+  private final JsonSerializer<BasicUserPrincipal> userSerializer =
+      new JsonSerializer<BasicUserPrincipal>() {
+        @Override public JsonElement serialize(BasicUserPrincipal user,
+            Type typeOfSrc, JsonSerializationContext context) {
+          JsonObject jsonMember = new JsonObject();
+          jsonMember.addProperty("name", user.getName());
+          jsonMember.addProperty("isAdmin", false);
+          return jsonMember;
+        }
+      };
+
+  public void setPolicyLastUpdateTime(long mtime) {
+    lastPolicyUpdateTimeEpochMillis = mtime;
+  }
+
+  public long getPolicyLastUpdateTime() {
+    return lastPolicyUpdateTimeEpochMillis;
+  }
+
+  public HashSet<String> getRoleList() {
+    return null;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index 09b92a22fe..5915992943 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.ozone.om.request.s3.security.OMSetSecretRequest;
 import org.apache.hadoop.ozone.om.request.s3.security.S3GetSecretRequest;
 import org.apache.hadoop.ozone.om.request.s3.security.S3RevokeSecretRequest;
 import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantAssignUserAccessIdRequest;
+import org.apache.hadoop.ozone.om.request.s3.tenant.OMSetRangerServiceVersionRequest;
 import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantAssignAdminRequest;
 import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantCreateRequest;
 import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantDeleteRequest;
@@ -207,6 +208,8 @@ public final class OzoneManagerRatisUtils {
     case TenantRevokeAdmin:
       ozoneManager.checkS3MultiTenancyEnabled();
       return new OMTenantRevokeAdminRequest(omRequest);
+    case SetRangerServiceVersion:
+      return new OMSetRangerServiceVersionRequest(omRequest);
 
     /*
      * Key requests that can have multiple variants based on the bucket layout
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
index a0feff7d51..ec7fa75d5b 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/OMClientRequest.java
@@ -64,6 +64,7 @@ public abstract class OMClientRequest implements RequestAuditor {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(OMClientRequest.class);
+
   private OMRequest omRequest;
 
   private UserGroupInformation userGroupInformation;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMSetRangerServiceVersionRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMSetRangerServiceVersionRequest.java
new file mode 100644
index 0000000000..82bd101265
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMSetRangerServiceVersionRequest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.request.s3.tenant;
+
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMSetRangerServiceVersionResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetRangerServiceVersionRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetRangerServiceVersionResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+
+/**
+ * Handles OMSetRangerServiceVersionRequest.
+ *
+ * This is an Ozone Manager internal request issued only by the Ranger
+ * Background Sync service (OMRangerBGSyncService). This request writes
+ * OzoneServiceVersion (retrieved from Ranger) to OM DB during the sync.
+ */
+public class OMSetRangerServiceVersionRequest extends OMClientRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMSetRangerServiceVersionRequest.class);
+
+  public OMSetRangerServiceVersionRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMClientResponse validateAndUpdateCache(
+      OzoneManager ozoneManager, long transactionLogIndex,
+      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
+
+    OMClientResponse omClientResponse;
+    final OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    final SetRangerServiceVersionRequest request =
+        getOmRequest().getSetRangerServiceVersionRequest();
+    final long proposedVersion = request.getRangerServiceVersion();
+    final String proposedVersionStr = String.valueOf(proposedVersion);
+
+    omMetadataManager.getMetaTable().addCacheEntry(
+        new CacheKey<>(OzoneConsts.RANGER_OZONE_SERVICE_VERSION_KEY),
+        new CacheValue<>(Optional.of(proposedVersionStr), transactionLogIndex));
+    omResponse.setSetRangerServiceVersionResponse(
+        SetRangerServiceVersionResponse.newBuilder().build());
+
+    omClientResponse = new OMSetRangerServiceVersionResponse(
+        omResponse.build(),
+        OzoneConsts.RANGER_OZONE_SERVICE_VERSION_KEY,
+        proposedVersionStr);
+    addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
+        ozoneManagerDoubleBufferHelper);
+
+    return omClientResponse;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java
index d301ca963c..98553664a6 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignAdminRequest.java
@@ -123,6 +123,8 @@ public class OMTenantAssignAdminRequest extends OMClientRequest {
     } else {
       delegated = true;
     }
+
+    // TODO: Acquire some lock
     // Call OMMTM to add user to tenant admin role
     ozoneManager.getMultiTenantManager().assignTenantAdmin(
         request.getAccessId(), delegated);
@@ -223,14 +225,13 @@ public class OMTenantAssignAdminRequest extends OMClientRequest {
       omClientResponse = new OMTenantAssignAdminResponse(
           createErrorOMResponse(omResponse, ex));
     } finally {
-      if (omClientResponse != null) {
-        omClientResponse.setFlushFuture(ozoneManagerDoubleBufferHelper
-            .add(omClientResponse, transactionLogIndex));
-      }
+      addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
+          ozoneManagerDoubleBufferHelper);
       if (acquiredVolumeLock) {
         Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
+      // TODO: Release some lock
     }
 
     // Audit
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
index 053b766168..940dfdfc4f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantAssignUserAccessIdRequest.java
@@ -150,6 +150,7 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
     // Below call implies user existence check in authorizer.
     // If the user doesn't exist, Ranger return 400 and the call should throw.
 
+    // TODO: Acquire some lock
     // Call OMMTM
     // Inform MultiTenantManager of user assignment so it could
     //  initialize some policies in Ranger.
@@ -226,7 +227,7 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
     final String tenantId = request.getTenantId();
     final String userPrincipal = request.getUserPrincipal();
 
-    assert (accessId.equals(request.getAccessId()));
+    Preconditions.checkState(accessId.equals(request.getAccessId()));
     IOException exception = null;
 
     String volumeName = null;
@@ -339,10 +340,8 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
       omClientResponse = new OMTenantAssignUserAccessIdResponse(
           createErrorOMResponse(omResponse, ex));
     } finally {
-      if (omClientResponse != null) {
-        omClientResponse.setFlushFuture(ozoneManagerDoubleBufferHelper
-            .add(omClientResponse, transactionLogIndex));
-      }
+      addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
+          ozoneManagerDoubleBufferHelper);
       if (acquiredS3SecretLock) {
         omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, accessId);
       }
@@ -350,6 +349,7 @@ public class OMTenantAssignUserAccessIdRequest extends OMClientRequest {
         Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
+      // TODO: Release some lock
     }
 
     // Audit
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
index b87e76a6e5..a33e74bd1a 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantCreateRequest.java
@@ -166,16 +166,25 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
             .setModificationTime(initialTime)
             .build();
 
+    final String userRoleName =
+        OMMultiTenantManager.getDefaultUserRoleName(tenantId);
+    final String adminRoleName =
+        OMMultiTenantManager.getDefaultAdminRoleName(tenantId);
+
+    // TODO: Acquire some lock
+
     // If we fail after pre-execute. handleRequestFailure() callback
     // would clean up any state maintained by the getMultiTenantManager.
     tenantInContext = ozoneManager.getMultiTenantManager()
-        .createTenantAccessInAuthorizer(tenantId);
+        .createTenantAccessInAuthorizer(tenantId, userRoleName, adminRoleName);
 
     final OMRequest.Builder omRequestBuilder = omRequest.toBuilder()
         .setCreateTenantRequest(
             CreateTenantRequest.newBuilder()
                 .setTenantId(tenantId)
-                .setVolumeName(volumeName))
+                .setVolumeName(volumeName)
+                .setUserRoleName(userRoleName)
+                .setAdminRoleName(adminRoleName))
         .setCreateVolumeRequest(
             CreateVolumeRequest.newBuilder()
                 .setVolumeInfo(updatedVolumeInfo));
@@ -217,14 +226,20 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
     final String owner = getOmRequest().getUserInfo().getUserName();
     Map<String, String> auditMap = new HashMap<>();
     OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+
     final CreateTenantRequest request = getOmRequest().getCreateTenantRequest();
     final String tenantId = request.getTenantId();
+    final String userRoleName = request.getUserRoleName();
+    final String adminRoleName = request.getAdminRoleName();
+
     final VolumeInfo volumeInfo =
         getOmRequest().getCreateVolumeRequest().getVolumeInfo();
     final String volumeName = volumeInfo.getVolume();
+    Preconditions.checkNotNull(volumeName);
     Preconditions.checkState(request.getVolumeName().equals(volumeName),
         "CreateTenantRequest's volumeName value should match VolumeInfo's");
     final String dbVolumeKey = omMetadataManager.getVolumeKey(volumeName);
+
     IOException exception = null;
 
     try {
@@ -285,10 +300,6 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
           OMMultiTenantManager.getDefaultBucketNamespacePolicyName(tenantId);
       final String bucketPolicyName =
           OMMultiTenantManager.getDefaultBucketPolicyName(tenantId);
-      final String userRoleName =
-          OMMultiTenantManager.getDefaultUserRoleName(tenantId);
-      final String adminRoleName =
-          OMMultiTenantManager.getDefaultAdminRoleName(tenantId);
       final OmDBTenantState omDBTenantState = new OmDBTenantState(
           tenantId, bucketNamespaceName, userRoleName, adminRoleName,
           bucketNamespacePolicyName, bucketPolicyName);
@@ -323,17 +334,15 @@ public class OMTenantCreateRequest extends OMVolumeRequest {
           createErrorOMResponse(omResponse, ex));
       exception = ex;
     } finally {
-      if (omClientResponse != null) {
-        omClientResponse.setFlushFuture(ozoneManagerDoubleBufferHelper
-            .add(omClientResponse, transactionLogIndex));
-      }
+      addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
+          ozoneManagerDoubleBufferHelper);
       if (acquiredUserLock) {
         omMetadataManager.getLock().releaseWriteLock(USER_LOCK, owner);
       }
       if (acquiredVolumeLock) {
-        Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
+      // TODO: Release some lock
     }
 
     // Perform audit logging
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
index 8bedef485d..6c09c6a192 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantDeleteRequest.java
@@ -72,6 +72,7 @@ public class OMTenantDeleteRequest extends OMVolumeRequest {
     // Check Ozone cluster admin privilege
     ozoneManager.getMultiTenantManager().checkAdmin();
 
+    // TODO: Acquire some lock
     // TODO: TBD: Call ozoneManager.getMultiTenantManager().deleteTenant() ?
 
     return super.preExecute(ozoneManager);
@@ -114,7 +115,7 @@ public class OMTenantDeleteRequest extends OMVolumeRequest {
       final OmDBTenantState dbTenantState =
           omMetadataManager.getTenantStateTable().get(tenantId);
       volumeName = dbTenantState.getBucketNamespaceName();
-      assert (volumeName != null);
+      Preconditions.checkNotNull(volumeName);
 
       LOG.debug("Tenant '{}' has volume '{}'", tenantId, volumeName);
       // decVolumeRefCount is true if volumeName is not empty string
@@ -185,9 +186,9 @@ public class OMTenantDeleteRequest extends OMVolumeRequest {
       addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
           ozoneManagerDoubleBufferHelper);
       if (acquiredVolumeLock) {
-        Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
+      // TODO: Release some lock
     }
 
     // Perform audit logging
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java
index 991dbe85e3..0b3d974ddb 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeAdminRequest.java
@@ -116,6 +116,7 @@ public class OMTenantRevokeAdminRequest extends OMClientRequest {
           OMException.ResultCodes.INVALID_TENANT_ID);
     }
 
+    // TODO: Acquire some lock
     // Remove user (inferred from access ID) from tenant admin role in Ranger
     ozoneManager.getMultiTenantManager().revokeTenantAdmin(accessId);
 
@@ -199,14 +200,13 @@ public class OMTenantRevokeAdminRequest extends OMClientRequest {
       omClientResponse = new OMTenantRevokeAdminResponse(
           createErrorOMResponse(omResponse, ex));
     } finally {
-      if (omClientResponse != null) {
-        omClientResponse.setFlushFuture(ozoneManagerDoubleBufferHelper
-            .add(omClientResponse, transactionLogIndex));
-      }
+      addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
+          ozoneManagerDoubleBufferHelper);
       if (acquiredVolumeLock) {
         Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
+      // TODO: Release some lock
     }
 
     // Audit
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
index 24ff96e103..46f8b0ec69 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMTenantRevokeUserAccessIdRequest.java
@@ -127,8 +127,8 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
           ResultCodes.PERMISSION_DENIED);
     }
 
+    // TODO: Acquire some lock
     // Call OMMTM to revoke user access to tenant
-    // TODO: Check destroyUser() behavior
     ozoneManager.getMultiTenantManager().revokeUserAccessId(accessId);
 
     final Builder omRequestBuilder = omRequest.toBuilder()
@@ -219,10 +219,8 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
       omClientResponse = new OMTenantRevokeUserAccessIdResponse(
           createErrorOMResponse(omResponse, ex));
     } finally {
-      if (omClientResponse != null) {
-        omClientResponse.setFlushFuture(ozoneManagerDoubleBufferHelper
-            .add(omClientResponse, transactionLogIndex));
-      }
+      addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
+          ozoneManagerDoubleBufferHelper);
       if (acquiredS3SecretLock) {
         omMetadataManager.getLock().releaseWriteLock(S3_SECRET_LOCK, accessId);
       }
@@ -230,6 +228,7 @@ public class OMTenantRevokeUserAccessIdRequest extends OMClientRequest {
         Preconditions.checkNotNull(volumeName);
         omMetadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
       }
+      // TODO: Release some lock
     }
 
     // Audit
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMSetRangerServiceVersionResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMSetRangerServiceVersionResponse.java
new file mode 100644
index 0000000000..d1b21bc994
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMSetRangerServiceVersionResponse.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.response.s3.tenant;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.META_TABLE;
+
+import java.io.IOException;
+
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Response for OMSetRangerServiceVersionRequest.
+ */
+@CleanupTableInfo(cleanupTables = {META_TABLE})
+public class OMSetRangerServiceVersionResponse extends OMClientResponse {
+  private String serviceVersionKey;
+  private String serviceVersionValueStr;
+
+  public OMSetRangerServiceVersionResponse(@Nonnull OMResponse omResponse,
+                                            @Nonnull String dbKey,
+                                            @Nonnull String versionStr) {
+    super(omResponse);
+    this.serviceVersionKey = dbKey;
+    this.serviceVersionValueStr = versionStr;
+  }
+
+  /**
+   * For when the request is not successful.
+   * For a successful request, the other constructor should be used.
+   */
+  public OMSetRangerServiceVersionResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    omMetadataManager.getMetaTable().putWithBatch(
+        batchOperation, serviceVersionKey, serviceVersionValueStr);
+  }
+
+  @VisibleForTesting
+  public String getNewServiceVersion() {
+    return serviceVersionValueStr;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java
index a57d5409d8..60a888e446 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/upgrade/OMLayoutFeature.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.ozone.upgrade.LayoutFeature;
 public enum OMLayoutFeature implements LayoutFeature {
   //////////////////////////////  //////////////////////////////
   INITIAL_VERSION(0, "Initial Layout Version"),
+
+  // TODO: Make this 2 after bringing in EC
   MULTITENANCY_SCHEMA(1, "Multi-Tenancy Schema");
 
   ///////////////////////////////  /////////////////////////////
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManagerImpl.java
index f2ea4af21c..3370673315 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOMMultiTenantManagerImpl.java
@@ -19,12 +19,16 @@
 package org.apache.hadoop.ozone.om;
 
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_DIRS;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl.OZONE_OM_TENANT_DEV_SKIP_RANGER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import com.google.common.base.Optional;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -48,7 +52,7 @@ import org.mockito.Mockito;
 public class TestOMMultiTenantManagerImpl {
 
   private OMMultiTenantManagerImpl tenantManager;
-  private static String tenantName = "tenant1";
+  private static final String TENANT_ID = "tenant1";
 
   @Rule
   public TemporaryFolder folder = new TemporaryFolder();
@@ -61,42 +65,52 @@ public class TestOMMultiTenantManagerImpl {
     conf.set(OZONE_OM_TENANT_DEV_SKIP_RANGER, "true");
     OMMetadataManager omMetadataManager = new OmMetadataManagerImpl(conf);
 
-    final String bucketNamespaceName = tenantName;
+    final String bucketNamespaceName = TENANT_ID;
     final String bucketNamespacePolicyName =
-        OMMultiTenantManager.getDefaultBucketNamespacePolicyName(tenantName);
+        OMMultiTenantManager.getDefaultBucketNamespacePolicyName(TENANT_ID);
     final String bucketPolicyName =
-        OMMultiTenantManager.getDefaultBucketPolicyName(tenantName);
+        OMMultiTenantManager.getDefaultBucketPolicyName(TENANT_ID);
     final String userRoleName =
-        OMMultiTenantManager.getDefaultUserRoleName(tenantName);
+        OMMultiTenantManager.getDefaultUserRoleName(TENANT_ID);
     final String adminRoleName =
-        OMMultiTenantManager.getDefaultAdminRoleName(tenantName);
-    final OmDBTenantState omDBTenantState = new OmDBTenantState(
-        tenantName, bucketNamespaceName, userRoleName, adminRoleName,
+        OMMultiTenantManager.getDefaultAdminRoleName(TENANT_ID);
+    final OmDBTenantState omDBTenantState = new OmDBTenantState(TENANT_ID,
+        bucketNamespaceName, userRoleName, adminRoleName,
         bucketNamespacePolicyName, bucketPolicyName);
 
-    omMetadataManager.getTenantStateTable().put(tenantName, omDBTenantState);
+    omMetadataManager.getTenantStateTable().put(TENANT_ID, omDBTenantState);
 
     omMetadataManager.getTenantAccessIdTable().put("seed-accessId1",
-        new OmDBAccessIdInfo(tenantName, "seed-user1", false, false));
+        new OmDBAccessIdInfo(TENANT_ID, "seed-user1", false, false));
 
     OzoneManager ozoneManager = Mockito.mock(OzoneManager.class);
     Mockito.when(ozoneManager.getMetadataManager())
         .thenReturn(omMetadataManager);
 
+    OzoneConfiguration ozoneConfiguration =
+        Mockito.mock(OzoneConfiguration.class);
+    Mockito.when(ozoneConfiguration.getTimeDuration(
+        OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL,
+        OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL_DEFAULT.getDuration(),
+        OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL_DEFAULT.getUnit(),
+        TimeUnit.SECONDS))
+        .thenReturn(10L);
+    Mockito.when(ozoneManager.getConfiguration())
+        .thenReturn(ozoneConfiguration);
+
     tenantManager = new OMMultiTenantManagerImpl(ozoneManager, conf);
     assertEquals(1, tenantManager.getTenantCache().size());
-    assertEquals(1,
-        tenantManager.getTenantCache().get(tenantName).getTenantUsers().size());
-
+    assertEquals(1, tenantManager.getTenantCache().get(TENANT_ID)
+        .getAccessIdInfoMap().size());
   }
 
   @Test
   public void testListUsersInTenant() throws Exception {
-    tenantManager.assignUserToTenant(new BasicUserPrincipal("user1"),
-        tenantName, "accessId1");
+    tenantManager.assignUserToTenant(
+        new BasicUserPrincipal("user1"), TENANT_ID, "accessId1");
 
     TenantUserList tenantUserList =
-        tenantManager.listUsersInTenant(tenantName, "");
+        tenantManager.listUsersInTenant(TENANT_ID, "");
     List<UserAccessIdInfo> userAccessIds = tenantUserList.getUserAccessIds();
     assertEquals(2, userAccessIds.size());
 
@@ -116,7 +130,7 @@ public class TestOMMultiTenantManagerImpl {
           tenantManager.listUsersInTenant("tenant2", null);
         });
 
-    assertTrue(tenantManager.listUsersInTenant(tenantName, "abc")
+    assertTrue(tenantManager.listUsersInTenant(TENANT_ID, "abc")
         .getUserAccessIds().isEmpty());
   }
 
@@ -128,9 +142,9 @@ public class TestOMMultiTenantManagerImpl {
     assertEquals(1, tenantManager.getTenantCache().size());
 
     tenantManager.revokeUserAccessId("seed-accessId1");
-    assertTrue(tenantManager.getTenantCache()
-        .get(tenantName).getTenantUsers().isEmpty());
-    assertTrue(tenantManager.listUsersInTenant(tenantName, null)
+    assertTrue(tenantManager.getTenantCache().get(TENANT_ID)
+        .getAccessIdInfoMap().isEmpty());
+    assertTrue(tenantManager.listUsersInTenant(TENANT_ID, null)
         .getUserAccessIds().isEmpty());
   }
 
@@ -139,6 +153,6 @@ public class TestOMMultiTenantManagerImpl {
     Optional<String> optionalTenant = tenantManager.getTenantForAccessID(
         "seed-accessId1");
     assertTrue(optionalTenant.isPresent());
-    assertEquals(tenantName, optionalTenant.get());
+    assertEquals(TENANT_ID, optionalTenant.get());
   }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
index c7bdd8f994..e23b8f6886 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java
@@ -62,7 +62,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.UUID;
 
-import static org.apache.hadoop.ozone.OzoneConsts.TENANT_ID_USERNAME_DELIMITER;
 import static org.apache.hadoop.security.authentication.util.KerberosName.DEFAULT_MECHANISM;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
@@ -92,7 +91,7 @@ public class TestS3GetSecretRequest {
   private static final String TENANT_ID = "finance";
   private static final String USER_BOB = "bob@EXAMPLE.COM";
   private static final String ACCESS_ID_BOB =
-      TENANT_ID + TENANT_ID_USERNAME_DELIMITER + USER_BOB;
+      OMMultiTenantManager.getDefaultAccessId(TENANT_ID, USER_BOB);
 
   private UserGroupInformation ugiAlice;
 
@@ -137,7 +136,9 @@ public class TestS3GetSecretRequest {
     when(ozoneManager.getMultiTenantManager()).thenReturn(omMultiTenantManager);
 
     when(tenant.getTenantAccessPolicies()).thenReturn(new ArrayList<>());
-    when(omMultiTenantManager.createTenantAccessInAuthorizer(TENANT_ID))
+    when(omMultiTenantManager.createTenantAccessInAuthorizer(TENANT_ID,
+            OMMultiTenantManager.getDefaultUserRoleName(TENANT_ID),
+            OMMultiTenantManager.getDefaultAdminRoleName(TENANT_ID)))
         .thenReturn(tenant);
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/tenant/TestSetRangerServiceVersionRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/tenant/TestSetRangerServiceVersionRequest.java
new file mode 100644
index 0000000000..b30a607c16
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/tenant/TestSetRangerServiceVersionRequest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.request.s3.tenant;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMSetRangerServiceVersionResponse;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SetRangerServiceVersionRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/**
+ * Tests OMSetRangerServiceVersionRequest.
+ */
+public class TestSetRangerServiceVersionRequest {
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OzoneManager ozoneManager;
+  // Set ozoneManagerDoubleBuffer to do nothing.
+  private final OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper =
+      ((response, transactionIndex) -> null);
+
+  @Before
+  public void setUp() throws Exception {
+    ozoneManager = Mockito.mock(OzoneManager.class);
+    Mockito.when(ozoneManager.getVersionManager())
+        .thenReturn(new OMLayoutVersionManager(1));
+
+    final OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    Mockito.when(ozoneManager.getMetadataManager())
+        .thenReturn(new OmMetadataManagerImpl(conf));
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    Mockito.framework().clearInlineMocks();
+  }
+
+  private OMRequest createRangerSyncRequest(long rangerServiceVersion) {
+
+    return OMRequest.newBuilder()
+        .setClientId(UUID.randomUUID().toString())
+        .setCmdType(Type.SetRangerServiceVersion)
+        .setSetRangerServiceVersionRequest(
+            SetRangerServiceVersionRequest.newBuilder()
+                .setRangerServiceVersion(rangerServiceVersion)
+                .build())
+        .build();
+  }
+
+  @Test
+  public void testRequest() throws IOException {
+
+    long txLogIndex = 1;
+
+    // Run preExecute
+    OMSetRangerServiceVersionRequest request =
+        new OMSetRangerServiceVersionRequest(
+            new OMSetRangerServiceVersionRequest(
+                createRangerSyncRequest(10L)).preExecute(ozoneManager));
+
+    // Run validateAndUpdateCaches
+    OMClientResponse clientResponse = request.validateAndUpdateCache(
+            ozoneManager, txLogIndex, ozoneManagerDoubleBufferHelper);
+
+    // Check response type and cast
+    Assert.assertTrue(clientResponse
+        instanceof OMSetRangerServiceVersionResponse);
+    final OMSetRangerServiceVersionResponse omSetRangerServiceVersionResponse =
+        (OMSetRangerServiceVersionResponse) clientResponse;
+
+    // Verify response
+    String verStr = omSetRangerServiceVersionResponse.getNewServiceVersion();
+    Assert.assertEquals(10L, Long.parseLong(verStr));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org