You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2022/05/11 20:09:24 UTC

[GitHub] [ozone] errose28 commented on a diff in pull request #3131: HDDS-6371. [Multi-Tenant] Provide OM DB to Apache Ranger Sync mechanism

errose28 commented on code in PR #3131:
URL: https://github.com/apache/ozone/pull/3131#discussion_r869449423


##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantVolume.java:
##########
@@ -241,4 +242,19 @@ private static ObjectStore getStoreForAccessID(String accessID)
         new S3Auth("unused1", "unused2", accessID, accessID));
     return new ObjectStore(conf, client);
   }
+
+  @Test
+  public void testOMRangerBGSyncRatisSetVersion() throws IOException {

Review Comment:
   Can we unit test this to avoid spinning up a whole cluster here?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java:
##########
@@ -45,12 +48,18 @@ public interface OMMultiTenantManager {
    *
    * @throws IOException
    */
-//  void start() throws IOException;
-//
-//  /**
-//   * Stop multi-tenant manager.
-//   */
-//  void stop() throws Exception;
+
+  /**
+   * Start background thread(s) in the multi-tenant manager.
+   */
+  void start() throws IOException;
+
+  /**
+   * Stop background thread(s) in the multi-tenant manager.
+   */
+  void stop() throws IOException;
+
+  OMRangerBGSyncService getOMRangerBGSyncService();

Review Comment:
   `@VisibleForTesting`



##########
hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java:
##########
@@ -368,6 +368,8 @@ String getMultipartKey(String volume, String bucket, String key, String
 
   Table<String, OmDBTenantState> getTenantStateTable();
 
+  Table<String, Long> getOmRangerStateTable();

Review Comment:
   OM has a meta table that is used for upgrade information, but it was given a generic name in case we needed to put other things there later. This Ranger service version may be a good thing to put there, instead of its own table.



##########
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantVolume.java:
##########
@@ -241,4 +242,19 @@ private static ObjectStore getStoreForAccessID(String accessID)
         new S3Auth("unused1", "unused2", accessID, accessID));
     return new ObjectStore(conf, client);
   }
+
+  @Test
+  public void testOMRangerBGSyncRatisSetVersion() throws IOException {

Review Comment:
   Can we make this a unit test, instead of creating a whole cluster for this?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java:
##########
@@ -204,24 +256,33 @@ public Tenant createTenantAccessInAuthorizer(String tenantID)
         tenant.addTenantAccessPolicy(tenantBucketCreatePolicy);
       }
 
-      tenantCache.put(tenantID, new CachedTenantState(tenantID));
-    } catch (Exception e) {
+      if (tenantCache.containsKey(tenantId)) {
+        LOG.warn("Cache entry for tenant '{}' somehow already exists, "
+            + "will be overwritten", tenantId);  // TODO: throw exception?
+      }
+
+      // New entry in tenant cache
+      tenantCache.put(tenantId, new CachedTenantState(

Review Comment:
   This is going to be called in pre-execute to create the Ranger state. This info has not yet been committed to the DB. We need to delay tenant cache update until validateAndUpdateCache otherwise we will have inconsistent reads, and need to manage this state in the bg sync. This also makes having a tenant cache lock not really necessary. This is somewhat outside the scope of this PR, so lets just add a TODO to handle this in that next Jira where we will update all the requests.
   
   This applies other places in this file as well.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java:
##########
@@ -45,12 +48,18 @@ public interface OMMultiTenantManager {
    *
    * @throws IOException
    */
-//  void start() throws IOException;
-//
-//  /**
-//   * Stop multi-tenant manager.
-//   */
-//  void stop() throws Exception;
+
+  /**
+   * Start background thread(s) in the multi-tenant manager.
+   */
+  void start() throws IOException;
+
+  /**
+   * Stop background thread(s) in the multi-tenant manager.
+   */
+  void stop() throws IOException;
+
+  OMRangerBGSyncService getOMRangerBGSyncService();

Review Comment:
   `@VisibleForTesting`



##########
hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java:
##########
@@ -368,6 +368,8 @@ String getMultipartKey(String volume, String bucket, String key, String
 
   Table<String, OmDBTenantState> getTenantStateTable();
 
+  Table<String, Long> getOmRangerStateTable();

Review Comment:
   The OM meta table is currently only being used for upgrade info, but we gave it a generic name in case we needed to put other things there. This might be better to put in that table instead of making a new one.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java:
##########
@@ -260,57 +321,75 @@ public void removeTenantAccessFromAuthorizer(Tenant tenant) throws Exception {
    */
   @Override
   public String assignUserToTenant(BasicUserPrincipal principal,
-                                 String tenantId,
-                                 String accessId) throws IOException {
-    ImmutablePair<String, String> userAccessIdPair =
-        new ImmutablePair<>(principal.getName(), accessId);
+      String tenantId, String accessId) throws IOException {
+
+    final CachedAccessIdInfo cacheEntry =
+        new CachedAccessIdInfo(principal.getName(), false);
+
     try {
-      controlPathLock.writeLock().lock();
-
-      LOG.info("Adding user '{}' to tenant '{}' in-memory state.",
-          principal.getName(), tenantId);
-      CachedTenantState cachedTenantState =
-          tenantCache.getOrDefault(tenantId,
-              new CachedTenantState(tenantId));
-      cachedTenantState.getTenantUsers().add(userAccessIdPair);
-
-      final OzoneTenantRolePrincipal roleTenantAllUsers =
-          OzoneTenantRolePrincipal.getUserRole(tenantId);
-      String roleJsonStr = authorizer.getRole(roleTenantAllUsers);
-      String roleId = authorizer.assignUser(principal, roleJsonStr, false);
+      tenantCacheLock.writeLock().lock();
+
+      CachedTenantState cachedTenantState = tenantCache.get(tenantId);
+      Preconditions.checkNotNull(cachedTenantState,
+          "Cache entry for tenant '" + tenantId + "' does not exist");
+
+      LOG.info("Adding user '{}' access ID '{}' to tenant '{}' in-memory cache",
+          principal.getName(), accessId, tenantId);
+      cachedTenantState.getAccessIdInfoMap().put(accessId, cacheEntry);
+
+      final String tenantUserRoleName =
+          tenantCache.get(tenantId).getTenantUserRoleName();
+      final OzoneTenantRolePrincipal tenantUserRolePrincipal =
+          new OzoneTenantRolePrincipal(tenantUserRoleName);
+      String roleJsonStr = authorizer.getRole(tenantUserRolePrincipal);
+      final String roleId =
+          authorizer.assignUserToRole(principal, roleJsonStr, false);
       return roleId;
-    } catch (Exception e) {
+    } catch (IOException e) {
+      // Clean up
       revokeUserAccessId(accessId);
-      tenantCache.get(tenantId).getTenantUsers().remove(userAccessIdPair);
+      tenantCache.get(tenantId).getAccessIdInfoMap().remove(accessId);
+
       throw new OMException(e.getMessage(), TENANT_AUTHORIZER_ERROR);
     } finally {
-      controlPathLock.writeLock().unlock();
+      tenantCacheLock.writeLock().unlock();
     }
   }
 
   @Override
-  public void revokeUserAccessId(String accessID) throws IOException {
+  public void revokeUserAccessId(String accessId) throws IOException {

Review Comment:
   Multi-tenancy writes occur in two phases:
   1. Ranger updates in preExecute
   2. OM DB updates in validateAndUpdateCache
   
   So each write should actually have two different methods: one that does Ranger updates only, and one that does DB updates only. Like above, I think this is best left as a TODO for the request update PR, but it is something I noticed here.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java:
##########
@@ -417,26 +495,41 @@ private boolean isTenantAdmin(String username, String tenantId,
   public TenantUserList listUsersInTenant(String tenantID, String prefix)
       throws IOException {
 
-    if (!omMetadataManager.getTenantStateTable().isExist(tenantID)) {
-      throw new IOException("Tenant '" + tenantID + "' not found!");
-    }
-
     List<UserAccessIdInfo> userAccessIds = new ArrayList<>();
-    CachedTenantState cachedTenantState = tenantCache.get(tenantID);
-    if (cachedTenantState == null) {
-      throw new IOException("Inconsistent in memory Tenant cache '" + tenantID
-          + "' not found in cache, but present in OM DB!");
-    }
 
-    cachedTenantState.getTenantUsers().stream()
-        .filter(
-            k -> StringUtils.isEmpty(prefix) || k.getKey().startsWith(prefix))
-        .forEach(
-            k -> userAccessIds.add(
+    tenantCacheLock.readLock().lock();
+
+    try {
+      if (!omMetadataManager.getTenantStateTable().isExist(tenantID)) {
+        throw new IOException("Tenant '" + tenantID + "' not found!");
+      }
+
+      CachedTenantState cachedTenantState = tenantCache.get(tenantID);
+
+      if (cachedTenantState == null) {
+        throw new IOException("Inconsistent in memory Tenant cache '" + tenantID
+            + "' not found in cache, but present in OM DB!");
+      }
+
+      cachedTenantState.getAccessIdInfoMap().entrySet().stream()
+          .filter(
+              // Include if user principal matches the prefix
+              k -> StringUtils.isEmpty(prefix) ||
+                  k.getValue().getUserPrincipal().startsWith(prefix))
+          .forEach(k -> {
+            final String accessId = k.getKey();
+            final CachedAccessIdInfo cacheEntry = k.getValue();
+            userAccessIds.add(
                 UserAccessIdInfo.newBuilder()
-                    .setUserPrincipal(k.getKey())
-                    .setAccessId(k.getValue())
-                    .build()));
+                    .setUserPrincipal(cacheEntry.getUserPrincipal())
+                    .setAccessId(accessId)
+//                    .setIsAdmin(cacheEntry.isAdmin())

Review Comment:
   Is this needed or not?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java:
##########
@@ -93,38 +98,81 @@ public class OMMultiTenantManagerImpl implements OMMultiTenantManager {
   private final OzoneManager ozoneManager;
   private final OMMetadataManager omMetadataManager;
   private final OzoneConfiguration conf;
-  private final ReentrantReadWriteLock controlPathLock;
+  // tenantCache: tenantId -> CachedTenantState
   private final Map<String, CachedTenantState> tenantCache;
+  private final ReentrantReadWriteLock tenantCacheLock;
+  private final OMRangerBGSyncService omRangerBGSyncService;
 
-  OMMultiTenantManagerImpl(OzoneManager ozoneManager, OzoneConfiguration conf)
+  public OMMultiTenantManagerImpl(OzoneManager ozoneManager,
+                                  OzoneConfiguration conf)
       throws IOException {
     this.conf = conf;
-    this.controlPathLock = new ReentrantReadWriteLock();
     this.ozoneManager = ozoneManager;
     this.omMetadataManager = ozoneManager.getMetadataManager();
     this.tenantCache = new ConcurrentHashMap<>();
-    boolean devSkipRanger = conf.getBoolean(OZONE_OM_TENANT_DEV_SKIP_RANGER,
-        false);
+    this.tenantCacheLock = new ReentrantReadWriteLock();
+
+    loadTenantCacheFromDB();
+
+    boolean devSkipRanger =
+        conf.getBoolean(OZONE_OM_TENANT_DEV_SKIP_RANGER, false);
     if (devSkipRanger) {
       this.authorizer = new MultiTenantAccessAuthorizerDummyPlugin();
     } else {
       this.authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
     }
-    this.authorizer.init(conf);
-    loadUsersFromDB();
+    try {
+      this.authorizer.init(conf);
+    } catch (OMException ex) {
+      if (ex.getResult().equals(INTERNAL_ERROR)) {

Review Comment:
   This is ok for now, but I think we will be making config errors fatal in https://github.com/apache/ozone/pull/3397 right?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java:
##########
@@ -17,7 +17,11 @@
  */
 package org.apache.hadoop.ozone.om;
 
-import static org.apache.hadoop.ozone.OzoneConsts.TENANT_ID_USERNAME_DELIMITER;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_MULTITENANCY_RANGER_SYNC_INTERVAL;

Review Comment:
   I skimmed this class and left some notes, but I think it should be more thoroughly reviewed as part of the follow up request update PR, where updates would be more appropriate.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java:
##########
@@ -189,6 +192,11 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   public static final String PRINCIPAL_TO_ACCESS_IDS_TABLE =
       "principalToAccessIdsTable";
   public static final String TENANT_STATE_TABLE = "tenantStateTable";
+  public static final String RANGER_STATE_TABLE = "RangerStateTable";
+
+  // TODO: Move this key elsewhere?
+  public static final String RANGER_OZONE_SERVICE_VERSION_KEY =
+      "RangerOzoneServiceVersion";

Review Comment:
   Most existing string keys we use in RocksDB have the format `#RANGER_OZONE_SERVICE_VERSION`



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet

Review Comment:
   I think we do. Ranger user list can diverge from OM user list, so we want to correct that.



##########
hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/s3/security/TestS3GetSecretRequest.java:
##########
@@ -444,4 +459,34 @@ public void testGetSecretWithTenant() throws IOException {
     Assert.assertEquals(originalS3Secret.getKerberosID(),
         s3Secret.getKerberosID());
   }
+
+  @Test
+  public void testRangerSyncRequest() throws IOException {

Review Comment:
   Can we move this to its own unit test? It doesn't involve s3 secrets. It can also use a mocked OM and metadataManager instead of adding a getter to the response to check the service version if it is in its own class.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +

Review Comment:
   Is this necessary? Can we just accept the interface implementation we are given?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();

Review Comment:
   `shouldRun` will be called in a different thread from `start`/`stop`, so `isServiceStarted` at least needs to be volatile. If we want multiple threads to be able to call `start`/`stop` but only have one instance running at a time, we will need compare and set with an `AtomicBoolean`. See the `isRunning` boolean in `OzoneManagerDoubleBuffer`.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests

Review Comment:
   Might be safer to just use the dummy authorizer for tests right?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManager.java:
##########
@@ -45,12 +48,18 @@ public interface OMMultiTenantManager {
    *
    * @throws IOException
    */
-//  void start() throws IOException;
-//
-//  /**
-//   * Stop multi-tenant manager.
-//   */
-//  void stop() throws Exception;
+
+  /**
+   * Start background thread(s) in the multi-tenant manager.
+   */
+  void start() throws IOException;

Review Comment:
   Discussed offline already, but we need to double check how this integrates with OM install snapshot flow. start and stop need to be connected to OzoneManager services start/stop (we can't be doing rocksDB ops while a snapshot is being installed), and the metadata manager instance needs to be refreshed once the snapshot install is complete. Even on a follower, the bg sync thread will be running, it will just be a no-op waiting to become leader.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ranger Ozone service version ({}) differs from DB's ({}). "

Review Comment:
   I think we should take the service versions from this message and put them in the info message below. It shouldn't add too much verbosity but may be helpful.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ranger Ozone service version ({}) differs from DB's ({}). "
+                  + "Starting to sync.",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}",
+            runCount.get(), attempt);
+
+        executeOMDBToRangerSync(currentOzoneServiceVerInDB);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+        // Submit Ratis Request to sync the new service version in OM DB
+        setOMDBRangerServiceVersion(proposedOzoneServiceVerInDB);
+
+        // TODO: Acquire lock
+
+        // Check Ranger ozone service version again
+        currentOzoneServiceVerInDB = proposedOzoneServiceVerInDB;
+        proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception during Ranger Sync", e);
+      // TODO: Check specific exception once switched to
+      //  RangerRestMultiTenantAccessController
+//    } finally {
+//      // TODO: Release lock
+    }
+
+  }
+
+  long retrieveRangerServiceVersion() throws IOException {
+    return authorizer.getCurrentOzoneServiceVersion(rangerOzoneServiceId);
+  }
+
+  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(CLIENT_ID)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  void setOMDBRangerServiceVersion(long version) {
+    // OM DB update goes through Ratis
+    RangerServiceVersionSyncRequest.Builder versionSyncRequest =
+        RangerServiceVersionSyncRequest.newBuilder()
+            .setRangerServiceVersion(version);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.RangerServiceVersionSync)
+        .setRangerServiceVersionSyncRequest(versionSyncRequest)
+        .setClientId(CLIENT_ID.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM

Review Comment:
   Left over comment



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {

Review Comment:
   Let's add a comment explaining the conditions in this loop, including how the service versions are used and that the loop will exit when it completes a no-op run or MAX_ATTEMPTS is reached.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",

Review Comment:
   This shouldn't happen too often. I think warn might be a better level.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ranger Ozone service version ({}) differs from DB's ({}). "
+                  + "Starting to sync.",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}",
+            runCount.get(), attempt);
+
+        executeOMDBToRangerSync(currentOzoneServiceVerInDB);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+        // Submit Ratis Request to sync the new service version in OM DB
+        setOMDBRangerServiceVersion(proposedOzoneServiceVerInDB);
+
+        // TODO: Acquire lock
+
+        // Check Ranger ozone service version again
+        currentOzoneServiceVerInDB = proposedOzoneServiceVerInDB;
+        proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception during Ranger Sync", e);
+      // TODO: Check specific exception once switched to
+      //  RangerRestMultiTenantAccessController
+//    } finally {
+//      // TODO: Release lock
+    }
+
+  }
+
+  long retrieveRangerServiceVersion() throws IOException {
+    return authorizer.getCurrentOzoneServiceVersion(rangerOzoneServiceId);
+  }
+
+  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(CLIENT_ID)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  void setOMDBRangerServiceVersion(long version) {
+    // OM DB update goes through Ratis
+    RangerServiceVersionSyncRequest.Builder versionSyncRequest =
+        RangerServiceVersionSyncRequest.newBuilder()
+            .setRangerServiceVersion(version);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.RangerServiceVersionSync)
+        .setRangerServiceVersionSyncRequest(versionSyncRequest)
+        .setClientId(CLIENT_ID.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM
+    try {
+      RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);

Review Comment:
   This should be propagated to the caller, because on the next loop iteration, we are using the proposed version as the OM DB version update, which assumes that the version is already in OM DB. This may also happen if we are no longer the leader, in which case we want to break out of the main loop.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ranger Ozone service version ({}) differs from DB's ({}). "
+                  + "Starting to sync.",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}",
+            runCount.get(), attempt);
+
+        executeOMDBToRangerSync(currentOzoneServiceVerInDB);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+        // Submit Ratis Request to sync the new service version in OM DB
+        setOMDBRangerServiceVersion(proposedOzoneServiceVerInDB);
+
+        // TODO: Acquire lock
+
+        // Check Ranger ozone service version again
+        currentOzoneServiceVerInDB = proposedOzoneServiceVerInDB;
+        proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception during Ranger Sync", e);
+      // TODO: Check specific exception once switched to
+      //  RangerRestMultiTenantAccessController
+//    } finally {
+//      // TODO: Release lock
+    }
+
+  }
+
+  long retrieveRangerServiceVersion() throws IOException {
+    return authorizer.getCurrentOzoneServiceVersion(rangerOzoneServiceId);
+  }
+
+  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(CLIENT_ID)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  void setOMDBRangerServiceVersion(long version) {
+    // OM DB update goes through Ratis
+    RangerServiceVersionSyncRequest.Builder versionSyncRequest =
+        RangerServiceVersionSyncRequest.newBuilder()
+            .setRangerServiceVersion(version);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.RangerServiceVersionSync)
+        .setRangerServiceVersionSyncRequest(versionSyncRequest)
+        .setClientId(CLIENT_ID.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM
+    try {
+      RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("RangerServiceVersionSync request failed. "
+          + "Will retry at next run.");
+    }
+  }
+
+  long getOMDBRangerServiceVersion() throws IOException {
+    final Long dbValue = ozoneManager.getMetadataManager()
+        .getOmRangerStateTable()
+        .get(OmMetadataManagerImpl.RANGER_OZONE_SERVICE_VERSION_KEY);
+    if (dbValue == null) {
+      return -1;
+    } else {
+      return dbValue;
+    }
+  }
+
+  private void executeOMDBToRangerSync(long baseVersion) throws IOException {
+    clearPolicyAndRoleMaps();
+
+    // TODO: Acquire global lock
+    loadAllPoliciesRolesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOM();
+    // TODO: Release global lock
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    processAllRolesFromOMDB();
+  }
+
+  private void clearPolicyAndRoleMaps() {
+    mtRangerPoliciesToBeCreated.clear();
+    mtRangerPoliciesToBeDeleted.clear();
+    mtRangerRoles.clear();
+    mtOMDBRoles.clear();
+  }
+
+  /**
+   * TODO: Test and make sure invalid JSON response from Ranger won't crash OM.
+   */
+  private void loadAllPoliciesRolesFromRanger(long baseVersion)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("baseVersion is {}", baseVersion);
+    }
+    // TODO: incremental policies API is broken. We are getting all the
+    //  Multi-Tenant policies using Ranger labels.
+    String allPolicies = authorizer.getAllMultiTenantPolicies(
+        rangerOzoneServiceId);
+    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
+    lastRangerPolicyLoadTime = jObject.get("queryTimeMS").getAsLong();
+    JsonArray policyArray = jObject.getAsJsonArray("policies");
+    for (int i = 0; i < policyArray.size(); ++i) {
+      JsonObject newPolicy = policyArray.get(i).getAsJsonObject();
+      if (!newPolicy.getAsJsonArray("policyLabels").get(0)
+          .getAsString().equals("OzoneMultiTenant")) {
+        // Shouldn't get policies without the tag here very often as it is
+        // specified in the query param, unless a user removed the tag during
+        // the sync
+        LOG.warn("Received a Ranger policy without OzoneMultiTenant tag: {}",
+            newPolicy.get("name").getAsString());
+        continue;
+      }
+      // Temporarily put the policy in the to-delete list,
+      // valid entries will be removed later
+      mtRangerPoliciesToBeDeleted.put(
+          newPolicy.get("name").getAsString(),
+          newPolicy.get("id").getAsString());
+
+      final JsonArray policyItems = newPolicy.getAsJsonArray("policyItems");
+      for (int j = 0; j < policyItems.size(); ++j) {
+        JsonObject policy = policyItems.get(j).getAsJsonObject();
+        JsonArray roles = policy.getAsJsonArray("roles");
+        for (int k = 0; k < roles.size(); ++k) {
+          if (!mtRangerRoles.containsKey(roles.get(k).getAsString())) {
+            // We only get the role name here. We need to query and populate it.
+            mtRangerRoles.put(roles.get(k).getAsString(),
+                new BGRole(roles.get(k).getAsString()));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper function to throw benign exception if the current OM is no longer
+   * the leader in case a leader transition happened during the sync. So the
+   * sync run can abort earlier.
+   *
+   * Note: EACH Ranger request can take 3-7 seconds as tested in UT.
+   */
+  private void checkLeader() throws IOException {
+    if (!ozoneManager.isLeaderReady()) {
+      throw new OMException("Not leader. Abort", ResultCodes.INTERNAL_ERROR);
+    }
+  }
+
+  private void loadAllRolesFromRanger() throws IOException {
+    for (Map.Entry<String, BGRole> entry: mtRangerRoles.entrySet()) {
+      final String roleName = entry.getKey();
+      checkLeader();
+      final String roleDataString = authorizer.getRole(roleName);
+      final JsonObject roleObject =
+          new JsonParser().parse(roleDataString).getAsJsonObject();
+      final BGRole role = entry.getValue();
+      role.setId(roleObject.get("id").getAsString());
+      final JsonArray userArray = roleObject.getAsJsonArray("users");
+      for (int i = 0; i < userArray.size(); ++i) {
+        role.addUserPrincipal(userArray.get(i).getAsJsonObject().get("name")
+            .getAsString());
+      }
+    }
+  }
+
+  /**
+   * Helper function to add/remove a policy name to/from mtRangerPolicies lists.
+   */
+  private void mtRangerPoliciesOpHelper(String policyName) {
+    if (mtRangerPoliciesToBeDeleted.containsKey(policyName)) {
+      // This entry is in sync with ranger, remove it from the set
+      // Eventually mtRangerPolicies will only contain entries that
+      // are not in OMDB and should be removed from Ranger.
+      mtRangerPoliciesToBeDeleted.remove(policyName);
+    } else {
+      // We could not find a policy in ranger that should have been there.
+      mtRangerPoliciesToBeCreated.put(policyName, null);
+    }
+  }
+
+  private void processAllPoliciesFromOMDB() throws IOException {
+
+    // Iterate all DB tenant states. For each tenant,
+    // queue or dequeue bucketNamespacePolicyName and bucketPolicyName
+    TableIterator<String, ? extends Table.KeyValue<String, OmDBTenantState>>
+        tenantStateTableIter = metadataManager.getTenantStateTable().iterator();
+    while (tenantStateTableIter.hasNext()) {
+      final Table.KeyValue<String, OmDBTenantState> tableKeyValue =
+          tenantStateTableIter.next();
+      final OmDBTenantState dbTenantState = tableKeyValue.getValue();
+      final String volumeName = dbTenantState.getBucketNamespaceName();
+      Preconditions.checkNotNull(volumeName);
+
+      boolean acquiredVolumeLock = false;
+      try {
+        acquiredVolumeLock = metadataManager.getLock().acquireWriteLock(
+            VOLUME_LOCK, volumeName);
+
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketNamespacePolicyName());
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketPolicyName());
+      } finally {
+        if (acquiredVolumeLock) {
+          metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
+        }
+      }
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeCreated.entrySet()) {
+      final String policyName = entry.getKey();
+      LOG.warn("Expected policy not found in Ranger: {}", policyName);
+      // Attempt to recreate the default volume/bucket policy if it's missing
+      attemptToRecreateDefaultPolicy(policyName);
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeDeleted.entrySet()) {
+      final String policyName = entry.getKey();
+      checkLeader();
+      // TODO: Use AccessController instead of AccessPolicy
+//      MultiTenantAccessController accessController =
+//          authorizer.getMultiTenantAccessController();
+      AccessPolicy accessPolicy = authorizer.getAccessPolicyByName(policyName);
+
+      if (lastRangerPolicyLoadTime >
+          (accessPolicy.getPolicyLastUpdateTime() + ONE_HOUR_IN_MILLIS)) {

Review Comment:
   Is this here to prevent deleting a policy from Ranger that is in the cache, but has not yet been flushed to the DB via the double buffer? We should comment the code for why this is necessary.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ranger Ozone service version ({}) differs from DB's ({}). "
+                  + "Starting to sync.",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}",
+            runCount.get(), attempt);
+
+        executeOMDBToRangerSync(currentOzoneServiceVerInDB);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+        // Submit Ratis Request to sync the new service version in OM DB
+        setOMDBRangerServiceVersion(proposedOzoneServiceVerInDB);
+
+        // TODO: Acquire lock
+
+        // Check Ranger ozone service version again
+        currentOzoneServiceVerInDB = proposedOzoneServiceVerInDB;
+        proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception during Ranger Sync", e);
+      // TODO: Check specific exception once switched to
+      //  RangerRestMultiTenantAccessController
+//    } finally {
+//      // TODO: Release lock
+    }
+
+  }
+
+  long retrieveRangerServiceVersion() throws IOException {
+    return authorizer.getCurrentOzoneServiceVersion(rangerOzoneServiceId);
+  }
+
+  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(CLIENT_ID)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  void setOMDBRangerServiceVersion(long version) {
+    // OM DB update goes through Ratis
+    RangerServiceVersionSyncRequest.Builder versionSyncRequest =
+        RangerServiceVersionSyncRequest.newBuilder()
+            .setRangerServiceVersion(version);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.RangerServiceVersionSync)
+        .setRangerServiceVersionSyncRequest(versionSyncRequest)
+        .setClientId(CLIENT_ID.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM
+    try {
+      RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("RangerServiceVersionSync request failed. "
+          + "Will retry at next run.");
+    }
+  }
+
+  long getOMDBRangerServiceVersion() throws IOException {
+    final Long dbValue = ozoneManager.getMetadataManager()
+        .getOmRangerStateTable()
+        .get(OmMetadataManagerImpl.RANGER_OZONE_SERVICE_VERSION_KEY);
+    if (dbValue == null) {
+      return -1;
+    } else {
+      return dbValue;
+    }
+  }
+
+  private void executeOMDBToRangerSync(long baseVersion) throws IOException {
+    clearPolicyAndRoleMaps();
+
+    // TODO: Acquire global lock
+    loadAllPoliciesRolesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOM();
+    // TODO: Release global lock
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    processAllRolesFromOMDB();
+  }
+
+  private void clearPolicyAndRoleMaps() {
+    mtRangerPoliciesToBeCreated.clear();
+    mtRangerPoliciesToBeDeleted.clear();
+    mtRangerRoles.clear();
+    mtOMDBRoles.clear();
+  }
+
+  /**
+   * TODO: Test and make sure invalid JSON response from Ranger won't crash OM.
+   */
+  private void loadAllPoliciesRolesFromRanger(long baseVersion)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("baseVersion is {}", baseVersion);
+    }
+    // TODO: incremental policies API is broken. We are getting all the
+    //  Multi-Tenant policies using Ranger labels.
+    String allPolicies = authorizer.getAllMultiTenantPolicies(
+        rangerOzoneServiceId);
+    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
+    lastRangerPolicyLoadTime = jObject.get("queryTimeMS").getAsLong();
+    JsonArray policyArray = jObject.getAsJsonArray("policies");
+    for (int i = 0; i < policyArray.size(); ++i) {
+      JsonObject newPolicy = policyArray.get(i).getAsJsonObject();
+      if (!newPolicy.getAsJsonArray("policyLabels").get(0)
+          .getAsString().equals("OzoneMultiTenant")) {
+        // Shouldn't get policies without the tag here very often as it is
+        // specified in the query param, unless a user removed the tag during
+        // the sync
+        LOG.warn("Received a Ranger policy without OzoneMultiTenant tag: {}",
+            newPolicy.get("name").getAsString());
+        continue;
+      }
+      // Temporarily put the policy in the to-delete list,
+      // valid entries will be removed later
+      mtRangerPoliciesToBeDeleted.put(
+          newPolicy.get("name").getAsString(),
+          newPolicy.get("id").getAsString());
+
+      final JsonArray policyItems = newPolicy.getAsJsonArray("policyItems");
+      for (int j = 0; j < policyItems.size(); ++j) {
+        JsonObject policy = policyItems.get(j).getAsJsonObject();
+        JsonArray roles = policy.getAsJsonArray("roles");
+        for (int k = 0; k < roles.size(); ++k) {
+          if (!mtRangerRoles.containsKey(roles.get(k).getAsString())) {
+            // We only get the role name here. We need to query and populate it.
+            mtRangerRoles.put(roles.get(k).getAsString(),
+                new BGRole(roles.get(k).getAsString()));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper function to throw benign exception if the current OM is no longer
+   * the leader in case a leader transition happened during the sync. So the
+   * sync run can abort earlier.
+   *
+   * Note: EACH Ranger request can take 3-7 seconds as tested in UT.
+   */
+  private void checkLeader() throws IOException {
+    if (!ozoneManager.isLeaderReady()) {
+      throw new OMException("Not leader. Abort", ResultCodes.INTERNAL_ERROR);
+    }
+  }
+
+  private void loadAllRolesFromRanger() throws IOException {
+    for (Map.Entry<String, BGRole> entry: mtRangerRoles.entrySet()) {
+      final String roleName = entry.getKey();
+      checkLeader();
+      final String roleDataString = authorizer.getRole(roleName);
+      final JsonObject roleObject =
+          new JsonParser().parse(roleDataString).getAsJsonObject();
+      final BGRole role = entry.getValue();
+      role.setId(roleObject.get("id").getAsString());
+      final JsonArray userArray = roleObject.getAsJsonArray("users");
+      for (int i = 0; i < userArray.size(); ++i) {
+        role.addUserPrincipal(userArray.get(i).getAsJsonObject().get("name")
+            .getAsString());
+      }
+    }
+  }
+
+  /**
+   * Helper function to add/remove a policy name to/from mtRangerPolicies lists.
+   */
+  private void mtRangerPoliciesOpHelper(String policyName) {
+    if (mtRangerPoliciesToBeDeleted.containsKey(policyName)) {
+      // This entry is in sync with ranger, remove it from the set
+      // Eventually mtRangerPolicies will only contain entries that
+      // are not in OMDB and should be removed from Ranger.
+      mtRangerPoliciesToBeDeleted.remove(policyName);
+    } else {
+      // We could not find a policy in ranger that should have been there.
+      mtRangerPoliciesToBeCreated.put(policyName, null);
+    }
+  }
+
+  private void processAllPoliciesFromOMDB() throws IOException {
+
+    // Iterate all DB tenant states. For each tenant,
+    // queue or dequeue bucketNamespacePolicyName and bucketPolicyName
+    TableIterator<String, ? extends Table.KeyValue<String, OmDBTenantState>>
+        tenantStateTableIter = metadataManager.getTenantStateTable().iterator();
+    while (tenantStateTableIter.hasNext()) {
+      final Table.KeyValue<String, OmDBTenantState> tableKeyValue =
+          tenantStateTableIter.next();
+      final OmDBTenantState dbTenantState = tableKeyValue.getValue();
+      final String volumeName = dbTenantState.getBucketNamespaceName();
+      Preconditions.checkNotNull(volumeName);
+
+      boolean acquiredVolumeLock = false;
+      try {
+        acquiredVolumeLock = metadataManager.getLock().acquireWriteLock(
+            VOLUME_LOCK, volumeName);
+
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketNamespacePolicyName());
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketPolicyName());
+      } finally {
+        if (acquiredVolumeLock) {
+          metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
+        }
+      }
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeCreated.entrySet()) {
+      final String policyName = entry.getKey();
+      LOG.warn("Expected policy not found in Ranger: {}", policyName);
+      // Attempt to recreate the default volume/bucket policy if it's missing
+      attemptToRecreateDefaultPolicy(policyName);
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeDeleted.entrySet()) {
+      final String policyName = entry.getKey();
+      checkLeader();
+      // TODO: Use AccessController instead of AccessPolicy
+//      MultiTenantAccessController accessController =
+//          authorizer.getMultiTenantAccessController();
+      AccessPolicy accessPolicy = authorizer.getAccessPolicyByName(policyName);
+
+      if (lastRangerPolicyLoadTime >
+          (accessPolicy.getPolicyLastUpdateTime() + ONE_HOUR_IN_MILLIS)) {
+        LOG.warn("Deleting policy from Ranger: {}", policyName);
+        authorizer.deletePolicybyName(policyName);
+      }
+    }
+
+  }
+
+  private String getTenantIdFromPolicyName(
+      String policyName, String policySuffix) {
+    return policyName.substring(0, policyName.length() - policySuffix.length());
+  }
+
+  /**
+   * Takes a policy name (e.g. tenant1-VolumeAccess), attempts to (re)create
+   * the policy in Ranger by the policy name alone.
+   */
+  private void attemptToRecreateDefaultPolicy(String policyName)
+      throws IOException {
+    String policySuffix, tenantId;
+
+    policySuffix = OzoneConsts.DEFAULT_TENANT_BUCKET_NAMESPACE_POLICY_SUFFIX;
+    if (policyName.endsWith(policySuffix)) {
+      // Recreate default VolumeAccess policy
+      tenantId = getTenantIdFromPolicyName(policyName, policySuffix);

Review Comment:
   In the loop where we detected that this policy was missing, we knew which tenant it was needed for. We can save that and use it here as a more reliable way to get the tenant name.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ranger Ozone service version ({}) differs from DB's ({}). "
+                  + "Starting to sync.",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}",
+            runCount.get(), attempt);
+
+        executeOMDBToRangerSync(currentOzoneServiceVerInDB);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+        // Submit Ratis Request to sync the new service version in OM DB
+        setOMDBRangerServiceVersion(proposedOzoneServiceVerInDB);
+
+        // TODO: Acquire lock
+
+        // Check Ranger ozone service version again
+        currentOzoneServiceVerInDB = proposedOzoneServiceVerInDB;
+        proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception during Ranger Sync", e);
+      // TODO: Check specific exception once switched to
+      //  RangerRestMultiTenantAccessController
+//    } finally {
+//      // TODO: Release lock
+    }
+
+  }
+
+  long retrieveRangerServiceVersion() throws IOException {
+    return authorizer.getCurrentOzoneServiceVersion(rangerOzoneServiceId);
+  }
+
+  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(CLIENT_ID)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  void setOMDBRangerServiceVersion(long version) {
+    // OM DB update goes through Ratis
+    RangerServiceVersionSyncRequest.Builder versionSyncRequest =
+        RangerServiceVersionSyncRequest.newBuilder()
+            .setRangerServiceVersion(version);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.RangerServiceVersionSync)
+        .setRangerServiceVersionSyncRequest(versionSyncRequest)
+        .setClientId(CLIENT_ID.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM
+    try {
+      RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("RangerServiceVersionSync request failed. "
+          + "Will retry at next run.");
+    }
+  }
+
+  long getOMDBRangerServiceVersion() throws IOException {
+    final Long dbValue = ozoneManager.getMetadataManager()
+        .getOmRangerStateTable()
+        .get(OmMetadataManagerImpl.RANGER_OZONE_SERVICE_VERSION_KEY);
+    if (dbValue == null) {
+      return -1;
+    } else {
+      return dbValue;
+    }
+  }
+
+  private void executeOMDBToRangerSync(long baseVersion) throws IOException {
+    clearPolicyAndRoleMaps();
+
+    // TODO: Acquire global lock
+    loadAllPoliciesRolesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOM();
+    // TODO: Release global lock
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    processAllRolesFromOMDB();
+  }
+
+  private void clearPolicyAndRoleMaps() {
+    mtRangerPoliciesToBeCreated.clear();
+    mtRangerPoliciesToBeDeleted.clear();
+    mtRangerRoles.clear();
+    mtOMDBRoles.clear();
+  }
+
+  /**
+   * TODO: Test and make sure invalid JSON response from Ranger won't crash OM.
+   */
+  private void loadAllPoliciesRolesFromRanger(long baseVersion)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("baseVersion is {}", baseVersion);
+    }
+    // TODO: incremental policies API is broken. We are getting all the
+    //  Multi-Tenant policies using Ranger labels.
+    String allPolicies = authorizer.getAllMultiTenantPolicies(
+        rangerOzoneServiceId);
+    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
+    lastRangerPolicyLoadTime = jObject.get("queryTimeMS").getAsLong();
+    JsonArray policyArray = jObject.getAsJsonArray("policies");
+    for (int i = 0; i < policyArray.size(); ++i) {
+      JsonObject newPolicy = policyArray.get(i).getAsJsonObject();
+      if (!newPolicy.getAsJsonArray("policyLabels").get(0)
+          .getAsString().equals("OzoneMultiTenant")) {
+        // Shouldn't get policies without the tag here very often as it is
+        // specified in the query param, unless a user removed the tag during
+        // the sync
+        LOG.warn("Received a Ranger policy without OzoneMultiTenant tag: {}",
+            newPolicy.get("name").getAsString());
+        continue;
+      }
+      // Temporarily put the policy in the to-delete list,
+      // valid entries will be removed later
+      mtRangerPoliciesToBeDeleted.put(
+          newPolicy.get("name").getAsString(),
+          newPolicy.get("id").getAsString());
+
+      final JsonArray policyItems = newPolicy.getAsJsonArray("policyItems");
+      for (int j = 0; j < policyItems.size(); ++j) {
+        JsonObject policy = policyItems.get(j).getAsJsonObject();
+        JsonArray roles = policy.getAsJsonArray("roles");
+        for (int k = 0; k < roles.size(); ++k) {
+          if (!mtRangerRoles.containsKey(roles.get(k).getAsString())) {
+            // We only get the role name here. We need to query and populate it.
+            mtRangerRoles.put(roles.get(k).getAsString(),
+                new BGRole(roles.get(k).getAsString()));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper function to throw benign exception if the current OM is no longer
+   * the leader in case a leader transition happened during the sync. So the
+   * sync run can abort earlier.
+   *
+   * Note: EACH Ranger request can take 3-7 seconds as tested in UT.
+   */
+  private void checkLeader() throws IOException {
+    if (!ozoneManager.isLeaderReady()) {
+      throw new OMException("Not leader. Abort", ResultCodes.INTERNAL_ERROR);

Review Comment:
   Leader changes aren't errors. We should probably throw NotLeaderException here instead, which will exit the main loop.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ranger Ozone service version ({}) differs from DB's ({}). "
+                  + "Starting to sync.",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}",
+            runCount.get(), attempt);
+
+        executeOMDBToRangerSync(currentOzoneServiceVerInDB);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+        // Submit Ratis Request to sync the new service version in OM DB
+        setOMDBRangerServiceVersion(proposedOzoneServiceVerInDB);
+
+        // TODO: Acquire lock
+
+        // Check Ranger ozone service version again
+        currentOzoneServiceVerInDB = proposedOzoneServiceVerInDB;
+        proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception during Ranger Sync", e);
+      // TODO: Check specific exception once switched to
+      //  RangerRestMultiTenantAccessController
+//    } finally {
+//      // TODO: Release lock
+    }
+
+  }
+
+  long retrieveRangerServiceVersion() throws IOException {
+    return authorizer.getCurrentOzoneServiceVersion(rangerOzoneServiceId);
+  }
+
+  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(CLIENT_ID)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  void setOMDBRangerServiceVersion(long version) {
+    // OM DB update goes through Ratis
+    RangerServiceVersionSyncRequest.Builder versionSyncRequest =
+        RangerServiceVersionSyncRequest.newBuilder()
+            .setRangerServiceVersion(version);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.RangerServiceVersionSync)
+        .setRangerServiceVersionSyncRequest(versionSyncRequest)
+        .setClientId(CLIENT_ID.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM
+    try {
+      RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("RangerServiceVersionSync request failed. "
+          + "Will retry at next run.");
+    }
+  }
+
+  long getOMDBRangerServiceVersion() throws IOException {
+    final Long dbValue = ozoneManager.getMetadataManager()
+        .getOmRangerStateTable()
+        .get(OmMetadataManagerImpl.RANGER_OZONE_SERVICE_VERSION_KEY);
+    if (dbValue == null) {
+      return -1;
+    } else {
+      return dbValue;
+    }
+  }
+
+  private void executeOMDBToRangerSync(long baseVersion) throws IOException {
+    clearPolicyAndRoleMaps();
+
+    // TODO: Acquire global lock
+    loadAllPoliciesRolesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOM();
+    // TODO: Release global lock
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    processAllRolesFromOMDB();
+  }
+
+  private void clearPolicyAndRoleMaps() {
+    mtRangerPoliciesToBeCreated.clear();
+    mtRangerPoliciesToBeDeleted.clear();
+    mtRangerRoles.clear();
+    mtOMDBRoles.clear();
+  }
+
+  /**
+   * TODO: Test and make sure invalid JSON response from Ranger won't crash OM.
+   */
+  private void loadAllPoliciesRolesFromRanger(long baseVersion)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("baseVersion is {}", baseVersion);
+    }
+    // TODO: incremental policies API is broken. We are getting all the
+    //  Multi-Tenant policies using Ranger labels.
+    String allPolicies = authorizer.getAllMultiTenantPolicies(
+        rangerOzoneServiceId);
+    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
+    lastRangerPolicyLoadTime = jObject.get("queryTimeMS").getAsLong();
+    JsonArray policyArray = jObject.getAsJsonArray("policies");
+    for (int i = 0; i < policyArray.size(); ++i) {
+      JsonObject newPolicy = policyArray.get(i).getAsJsonObject();
+      if (!newPolicy.getAsJsonArray("policyLabels").get(0)
+          .getAsString().equals("OzoneMultiTenant")) {
+        // Shouldn't get policies without the tag here very often as it is
+        // specified in the query param, unless a user removed the tag during
+        // the sync
+        LOG.warn("Received a Ranger policy without OzoneMultiTenant tag: {}",
+            newPolicy.get("name").getAsString());
+        continue;
+      }
+      // Temporarily put the policy in the to-delete list,
+      // valid entries will be removed later
+      mtRangerPoliciesToBeDeleted.put(
+          newPolicy.get("name").getAsString(),
+          newPolicy.get("id").getAsString());
+
+      final JsonArray policyItems = newPolicy.getAsJsonArray("policyItems");
+      for (int j = 0; j < policyItems.size(); ++j) {
+        JsonObject policy = policyItems.get(j).getAsJsonObject();
+        JsonArray roles = policy.getAsJsonArray("roles");
+        for (int k = 0; k < roles.size(); ++k) {
+          if (!mtRangerRoles.containsKey(roles.get(k).getAsString())) {
+            // We only get the role name here. We need to query and populate it.
+            mtRangerRoles.put(roles.get(k).getAsString(),
+                new BGRole(roles.get(k).getAsString()));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper function to throw benign exception if the current OM is no longer
+   * the leader in case a leader transition happened during the sync. So the
+   * sync run can abort earlier.
+   *
+   * Note: EACH Ranger request can take 3-7 seconds as tested in UT.
+   */
+  private void checkLeader() throws IOException {
+    if (!ozoneManager.isLeaderReady()) {
+      throw new OMException("Not leader. Abort", ResultCodes.INTERNAL_ERROR);
+    }
+  }
+
+  private void loadAllRolesFromRanger() throws IOException {
+    for (Map.Entry<String, BGRole> entry: mtRangerRoles.entrySet()) {
+      final String roleName = entry.getKey();
+      checkLeader();
+      final String roleDataString = authorizer.getRole(roleName);
+      final JsonObject roleObject =
+          new JsonParser().parse(roleDataString).getAsJsonObject();
+      final BGRole role = entry.getValue();
+      role.setId(roleObject.get("id").getAsString());
+      final JsonArray userArray = roleObject.getAsJsonArray("users");
+      for (int i = 0; i < userArray.size(); ++i) {
+        role.addUserPrincipal(userArray.get(i).getAsJsonObject().get("name")
+            .getAsString());
+      }
+    }
+  }
+
+  /**
+   * Helper function to add/remove a policy name to/from mtRangerPolicies lists.
+   */
+  private void mtRangerPoliciesOpHelper(String policyName) {
+    if (mtRangerPoliciesToBeDeleted.containsKey(policyName)) {
+      // This entry is in sync with ranger, remove it from the set
+      // Eventually mtRangerPolicies will only contain entries that
+      // are not in OMDB and should be removed from Ranger.
+      mtRangerPoliciesToBeDeleted.remove(policyName);
+    } else {
+      // We could not find a policy in ranger that should have been there.
+      mtRangerPoliciesToBeCreated.put(policyName, null);
+    }
+  }
+
+  private void processAllPoliciesFromOMDB() throws IOException {
+
+    // Iterate all DB tenant states. For each tenant,
+    // queue or dequeue bucketNamespacePolicyName and bucketPolicyName
+    TableIterator<String, ? extends Table.KeyValue<String, OmDBTenantState>>
+        tenantStateTableIter = metadataManager.getTenantStateTable().iterator();
+    while (tenantStateTableIter.hasNext()) {
+      final Table.KeyValue<String, OmDBTenantState> tableKeyValue =
+          tenantStateTableIter.next();
+      final OmDBTenantState dbTenantState = tableKeyValue.getValue();
+      final String volumeName = dbTenantState.getBucketNamespaceName();
+      Preconditions.checkNotNull(volumeName);
+
+      boolean acquiredVolumeLock = false;
+      try {
+        acquiredVolumeLock = metadataManager.getLock().acquireWriteLock(
+            VOLUME_LOCK, volumeName);
+
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketNamespacePolicyName());
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketPolicyName());
+      } finally {
+        if (acquiredVolumeLock) {
+          metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
+        }
+      }
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeCreated.entrySet()) {
+      final String policyName = entry.getKey();
+      LOG.warn("Expected policy not found in Ranger: {}", policyName);
+      // Attempt to recreate the default volume/bucket policy if it's missing
+      attemptToRecreateDefaultPolicy(policyName);
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeDeleted.entrySet()) {
+      final String policyName = entry.getKey();
+      checkLeader();
+      // TODO: Use AccessController instead of AccessPolicy
+//      MultiTenantAccessController accessController =
+//          authorizer.getMultiTenantAccessController();
+      AccessPolicy accessPolicy = authorizer.getAccessPolicyByName(policyName);
+
+      if (lastRangerPolicyLoadTime >
+          (accessPolicy.getPolicyLastUpdateTime() + ONE_HOUR_IN_MILLIS)) {
+        LOG.warn("Deleting policy from Ranger: {}", policyName);
+        authorizer.deletePolicybyName(policyName);
+      }
+    }
+
+  }
+
+  private String getTenantIdFromPolicyName(
+      String policyName, String policySuffix) {
+    return policyName.substring(0, policyName.length() - policySuffix.length());
+  }
+
+  /**
+   * Takes a policy name (e.g. tenant1-VolumeAccess), attempts to (re)create
+   * the policy in Ranger by the policy name alone.
+   */
+  private void attemptToRecreateDefaultPolicy(String policyName)
+      throws IOException {
+    String policySuffix, tenantId;
+
+    policySuffix = OzoneConsts.DEFAULT_TENANT_BUCKET_NAMESPACE_POLICY_SUFFIX;
+    if (policyName.endsWith(policySuffix)) {
+      // Recreate default VolumeAccess policy
+      tenantId = getTenantIdFromPolicyName(policyName, policySuffix);
+      LOG.info("Recovering VolumeAccess policy for tenant: {}", tenantId);
+
+      final String volumeName =
+          multiTenantManager.getTenantVolumeName(tenantId);
+      final String userRoleName =
+          multiTenantManager.getTenantUserRoleName(tenantId);
+      final String adminRoleName =
+          multiTenantManager.getTenantAdminRoleName(tenantId);
+
+      final AccessPolicy tenantVolumeAccessPolicy =
+          multiTenantManager.newDefaultVolumeAccessPolicy(volumeName,
+              new OzoneTenantRolePrincipal(userRoleName),
+              new OzoneTenantRolePrincipal(adminRoleName));
+
+      String id = authorizer.createAccessPolicy(tenantVolumeAccessPolicy);
+      LOG.info("Created policy, ID: {}", id);
+
+      return;
+    }
+
+    policySuffix = OzoneConsts.DEFAULT_TENANT_BUCKET_POLICY_SUFFIX;
+    if (policyName.endsWith(policySuffix)) {
+      // Recreate default BucketAccess policy
+      tenantId = getTenantIdFromPolicyName(policyName, policySuffix);
+      LOG.info("Recovering BucketAccess policy for tenant: {}", tenantId);
+
+      final String volumeName =
+          multiTenantManager.getTenantVolumeName(tenantId);
+      final String userRoleName =
+          multiTenantManager.getTenantUserRoleName(tenantId);
+
+      final AccessPolicy tenantBucketCreatePolicy =
+          multiTenantManager.newDefaultBucketAccessPolicy(volumeName,
+              new OzoneTenantRolePrincipal(userRoleName));
+
+      String id = authorizer.createAccessPolicy(tenantBucketCreatePolicy);
+      LOG.info("Created policy, ID: {}", id);
+
+      return;
+    }
+
+    LOG.error("Unable to recover Ranger policy: {}", policyName);
+  }
+
+  /**
+   * Helper function to add user principal to a role in mtOMDBRoles.
+   */
+  private void addUserToMtOMDBRoles(String roleName, String userPrincipal) {
+    if (!mtOMDBRoles.containsKey(roleName)) {
+      mtOMDBRoles.put(roleName, new HashSet<>(
+          Collections.singletonList(userPrincipal)));
+    } else {
+      final HashSet<String> usersInTheRole = mtOMDBRoles.get(roleName);
+      usersInTheRole.add(userPrincipal);
+    }
+  }
+
+  private void loadAllRolesFromOM() throws IOException {
+    if (multiTenantManager instanceof OMMultiTenantManagerImpl) {
+      loadAllRolesFromCache();
+    } else {
+      LOG.warn("Cache is not supported for {}. Loading roles directly from DB",
+          multiTenantManager.getClass().getSimpleName());
+      loadAllRolesFromDB();
+    }
+  }
+
+  private void loadAllRolesFromCache() {
+
+    final OMMultiTenantManagerImpl impl =
+        (OMMultiTenantManagerImpl) multiTenantManager;
+    final Map<String, CachedTenantState> tenantCache = impl.getTenantCache();

Review Comment:
   I think it would be better to have the multiTenant manager compute and return the role -> user map, instead of exposing its cache and locks and having this method use them to compute the map.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ranger Ozone service version ({}) differs from DB's ({}). "
+                  + "Starting to sync.",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}",
+            runCount.get(), attempt);
+
+        executeOMDBToRangerSync(currentOzoneServiceVerInDB);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+        // Submit Ratis Request to sync the new service version in OM DB
+        setOMDBRangerServiceVersion(proposedOzoneServiceVerInDB);
+
+        // TODO: Acquire lock
+
+        // Check Ranger ozone service version again
+        currentOzoneServiceVerInDB = proposedOzoneServiceVerInDB;
+        proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception during Ranger Sync", e);
+      // TODO: Check specific exception once switched to
+      //  RangerRestMultiTenantAccessController
+//    } finally {
+//      // TODO: Release lock
+    }
+
+  }
+
+  long retrieveRangerServiceVersion() throws IOException {
+    return authorizer.getCurrentOzoneServiceVersion(rangerOzoneServiceId);
+  }
+
+  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(CLIENT_ID)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  void setOMDBRangerServiceVersion(long version) {
+    // OM DB update goes through Ratis
+    RangerServiceVersionSyncRequest.Builder versionSyncRequest =
+        RangerServiceVersionSyncRequest.newBuilder()
+            .setRangerServiceVersion(version);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.RangerServiceVersionSync)
+        .setRangerServiceVersionSyncRequest(versionSyncRequest)
+        .setClientId(CLIENT_ID.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM
+    try {
+      RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("RangerServiceVersionSync request failed. "
+          + "Will retry at next run.");
+    }
+  }
+
+  long getOMDBRangerServiceVersion() throws IOException {
+    final Long dbValue = ozoneManager.getMetadataManager()
+        .getOmRangerStateTable()
+        .get(OmMetadataManagerImpl.RANGER_OZONE_SERVICE_VERSION_KEY);
+    if (dbValue == null) {
+      return -1;
+    } else {
+      return dbValue;
+    }
+  }
+
+  private void executeOMDBToRangerSync(long baseVersion) throws IOException {
+    clearPolicyAndRoleMaps();
+
+    // TODO: Acquire global lock
+    loadAllPoliciesRolesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOM();
+    // TODO: Release global lock
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    processAllRolesFromOMDB();
+  }
+
+  private void clearPolicyAndRoleMaps() {
+    mtRangerPoliciesToBeCreated.clear();
+    mtRangerPoliciesToBeDeleted.clear();
+    mtRangerRoles.clear();
+    mtOMDBRoles.clear();
+  }
+
+  /**
+   * TODO: Test and make sure invalid JSON response from Ranger won't crash OM.
+   */
+  private void loadAllPoliciesRolesFromRanger(long baseVersion)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("baseVersion is {}", baseVersion);
+    }
+    // TODO: incremental policies API is broken. We are getting all the
+    //  Multi-Tenant policies using Ranger labels.
+    String allPolicies = authorizer.getAllMultiTenantPolicies(
+        rangerOzoneServiceId);
+    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
+    lastRangerPolicyLoadTime = jObject.get("queryTimeMS").getAsLong();
+    JsonArray policyArray = jObject.getAsJsonArray("policies");
+    for (int i = 0; i < policyArray.size(); ++i) {
+      JsonObject newPolicy = policyArray.get(i).getAsJsonObject();
+      if (!newPolicy.getAsJsonArray("policyLabels").get(0)
+          .getAsString().equals("OzoneMultiTenant")) {

Review Comment:
   Shouldn't this be a `contains` check? Also lets make a constant for the policy label we are using.



##########
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/AccessPolicy.java:
##########
@@ -18,6 +18,7 @@
 

Review Comment:
   You can ignore this for now. We will do the switch in a follow-up PR when we switch to the Ranger client.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.

Review Comment:
   Let's fill out this java doc with the current consistency guarantees of the background sync with respect to in progress tenant operations that may be happening. For example, the current implementation acknowledges that there may be some cases where background sync and in progress operation conflict, but a future bg sync run will correct the issue.



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMRangerServiceVersionSyncRequest.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.request.s3.tenant;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.RESOURCE_BUSY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.multitenant.AccessPolicy;
+import org.apache.hadoop.ozone.om.multitenant.Tenant;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMRangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/*
+ * This request is issued by the RangerSync Background thread to update the
+ * OzoneServiceVersion as read from the Ranger during the  most up-to-date
+ * ranger-to-OMDB sync operation.
+ */
+
+/**
+ * Handles OMRangerServiceVersionSync request.
+ */
+public class OMRangerServiceVersionSyncRequest extends OMClientRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerServiceVersionSyncRequest.class);
+
+  public OMRangerServiceVersionSyncRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+
+    final RangerServiceVersionSyncRequest request =
+        getOmRequest().getRangerServiceVersionSyncRequest();
+    final long proposedVersion = request.getRangerServiceVersion();
+
+    if (!ozoneManager.getMultiTenantManager()
+        .tryAcquireInProgressMtOp(WAIT_MILISECONDS)) {
+      throw new OMException("Only One MultiTenant operation allowed at a " +
+          "time", RESOURCE_BUSY);
+    }
+
+    final OMRequest.Builder omRequestBuilder = getOmRequest().toBuilder()

Review Comment:
   Why are we building a new OM request here?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ranger Ozone service version ({}) differs from DB's ({}). "
+                  + "Starting to sync.",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}",
+            runCount.get(), attempt);
+
+        executeOMDBToRangerSync(currentOzoneServiceVerInDB);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+        // Submit Ratis Request to sync the new service version in OM DB
+        setOMDBRangerServiceVersion(proposedOzoneServiceVerInDB);
+
+        // TODO: Acquire lock
+
+        // Check Ranger ozone service version again
+        currentOzoneServiceVerInDB = proposedOzoneServiceVerInDB;
+        proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception during Ranger Sync", e);
+      // TODO: Check specific exception once switched to
+      //  RangerRestMultiTenantAccessController
+//    } finally {
+//      // TODO: Release lock
+    }
+
+  }
+
+  long retrieveRangerServiceVersion() throws IOException {
+    return authorizer.getCurrentOzoneServiceVersion(rangerOzoneServiceId);
+  }
+
+  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(CLIENT_ID)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  void setOMDBRangerServiceVersion(long version) {
+    // OM DB update goes through Ratis
+    RangerServiceVersionSyncRequest.Builder versionSyncRequest =
+        RangerServiceVersionSyncRequest.newBuilder()
+            .setRangerServiceVersion(version);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.RangerServiceVersionSync)
+        .setRangerServiceVersionSyncRequest(versionSyncRequest)
+        .setClientId(CLIENT_ID.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM
+    try {
+      RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("RangerServiceVersionSync request failed. "
+          + "Will retry at next run.");
+    }
+  }
+
+  long getOMDBRangerServiceVersion() throws IOException {
+    final Long dbValue = ozoneManager.getMetadataManager()
+        .getOmRangerStateTable()
+        .get(OmMetadataManagerImpl.RANGER_OZONE_SERVICE_VERSION_KEY);
+    if (dbValue == null) {
+      return -1;
+    } else {
+      return dbValue;
+    }
+  }
+
+  private void executeOMDBToRangerSync(long baseVersion) throws IOException {
+    clearPolicyAndRoleMaps();
+
+    // TODO: Acquire global lock
+    loadAllPoliciesRolesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOM();
+    // TODO: Release global lock
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    processAllRolesFromOMDB();
+  }
+
+  private void clearPolicyAndRoleMaps() {
+    mtRangerPoliciesToBeCreated.clear();
+    mtRangerPoliciesToBeDeleted.clear();
+    mtRangerRoles.clear();
+    mtOMDBRoles.clear();
+  }
+
+  /**
+   * TODO: Test and make sure invalid JSON response from Ranger won't crash OM.
+   */
+  private void loadAllPoliciesRolesFromRanger(long baseVersion)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("baseVersion is {}", baseVersion);
+    }
+    // TODO: incremental policies API is broken. We are getting all the
+    //  Multi-Tenant policies using Ranger labels.
+    String allPolicies = authorizer.getAllMultiTenantPolicies(
+        rangerOzoneServiceId);
+    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
+    lastRangerPolicyLoadTime = jObject.get("queryTimeMS").getAsLong();
+    JsonArray policyArray = jObject.getAsJsonArray("policies");
+    for (int i = 0; i < policyArray.size(); ++i) {
+      JsonObject newPolicy = policyArray.get(i).getAsJsonObject();
+      if (!newPolicy.getAsJsonArray("policyLabels").get(0)
+          .getAsString().equals("OzoneMultiTenant")) {
+        // Shouldn't get policies without the tag here very often as it is
+        // specified in the query param, unless a user removed the tag during
+        // the sync
+        LOG.warn("Received a Ranger policy without OzoneMultiTenant tag: {}",
+            newPolicy.get("name").getAsString());
+        continue;
+      }
+      // Temporarily put the policy in the to-delete list,
+      // valid entries will be removed later
+      mtRangerPoliciesToBeDeleted.put(
+          newPolicy.get("name").getAsString(),
+          newPolicy.get("id").getAsString());
+
+      final JsonArray policyItems = newPolicy.getAsJsonArray("policyItems");
+      for (int j = 0; j < policyItems.size(); ++j) {
+        JsonObject policy = policyItems.get(j).getAsJsonObject();
+        JsonArray roles = policy.getAsJsonArray("roles");
+        for (int k = 0; k < roles.size(); ++k) {
+          if (!mtRangerRoles.containsKey(roles.get(k).getAsString())) {
+            // We only get the role name here. We need to query and populate it.
+            mtRangerRoles.put(roles.get(k).getAsString(),
+                new BGRole(roles.get(k).getAsString()));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper function to throw benign exception if the current OM is no longer
+   * the leader in case a leader transition happened during the sync. So the
+   * sync run can abort earlier.
+   *
+   * Note: EACH Ranger request can take 3-7 seconds as tested in UT.
+   */
+  private void checkLeader() throws IOException {
+    if (!ozoneManager.isLeaderReady()) {
+      throw new OMException("Not leader. Abort", ResultCodes.INTERNAL_ERROR);
+    }
+  }
+
+  private void loadAllRolesFromRanger() throws IOException {
+    for (Map.Entry<String, BGRole> entry: mtRangerRoles.entrySet()) {
+      final String roleName = entry.getKey();
+      checkLeader();
+      final String roleDataString = authorizer.getRole(roleName);
+      final JsonObject roleObject =
+          new JsonParser().parse(roleDataString).getAsJsonObject();
+      final BGRole role = entry.getValue();
+      role.setId(roleObject.get("id").getAsString());
+      final JsonArray userArray = roleObject.getAsJsonArray("users");
+      for (int i = 0; i < userArray.size(); ++i) {
+        role.addUserPrincipal(userArray.get(i).getAsJsonObject().get("name")
+            .getAsString());
+      }
+    }
+  }
+
+  /**
+   * Helper function to add/remove a policy name to/from mtRangerPolicies lists.
+   */
+  private void mtRangerPoliciesOpHelper(String policyName) {
+    if (mtRangerPoliciesToBeDeleted.containsKey(policyName)) {
+      // This entry is in sync with ranger, remove it from the set
+      // Eventually mtRangerPolicies will only contain entries that
+      // are not in OMDB and should be removed from Ranger.
+      mtRangerPoliciesToBeDeleted.remove(policyName);
+    } else {
+      // We could not find a policy in ranger that should have been there.
+      mtRangerPoliciesToBeCreated.put(policyName, null);
+    }
+  }
+
+  private void processAllPoliciesFromOMDB() throws IOException {
+
+    // Iterate all DB tenant states. For each tenant,
+    // queue or dequeue bucketNamespacePolicyName and bucketPolicyName
+    TableIterator<String, ? extends Table.KeyValue<String, OmDBTenantState>>
+        tenantStateTableIter = metadataManager.getTenantStateTable().iterator();
+    while (tenantStateTableIter.hasNext()) {
+      final Table.KeyValue<String, OmDBTenantState> tableKeyValue =
+          tenantStateTableIter.next();
+      final OmDBTenantState dbTenantState = tableKeyValue.getValue();
+      final String volumeName = dbTenantState.getBucketNamespaceName();
+      Preconditions.checkNotNull(volumeName);
+
+      boolean acquiredVolumeLock = false;
+      try {
+        acquiredVolumeLock = metadataManager.getLock().acquireWriteLock(
+            VOLUME_LOCK, volumeName);
+
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketNamespacePolicyName());
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketPolicyName());
+      } finally {
+        if (acquiredVolumeLock) {
+          metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
+        }
+      }
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeCreated.entrySet()) {
+      final String policyName = entry.getKey();
+      LOG.warn("Expected policy not found in Ranger: {}", policyName);
+      // Attempt to recreate the default volume/bucket policy if it's missing
+      attemptToRecreateDefaultPolicy(policyName);
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeDeleted.entrySet()) {
+      final String policyName = entry.getKey();
+      checkLeader();
+      // TODO: Use AccessController instead of AccessPolicy
+//      MultiTenantAccessController accessController =
+//          authorizer.getMultiTenantAccessController();
+      AccessPolicy accessPolicy = authorizer.getAccessPolicyByName(policyName);
+
+      if (lastRangerPolicyLoadTime >
+          (accessPolicy.getPolicyLastUpdateTime() + ONE_HOUR_IN_MILLIS)) {
+        LOG.warn("Deleting policy from Ranger: {}", policyName);
+        authorizer.deletePolicybyName(policyName);
+      }
+    }
+
+  }
+
+  private String getTenantIdFromPolicyName(
+      String policyName, String policySuffix) {
+    return policyName.substring(0, policyName.length() - policySuffix.length());
+  }
+
+  /**
+   * Takes a policy name (e.g. tenant1-VolumeAccess), attempts to (re)create
+   * the policy in Ranger by the policy name alone.
+   */
+  private void attemptToRecreateDefaultPolicy(String policyName)
+      throws IOException {
+    String policySuffix, tenantId;
+
+    policySuffix = OzoneConsts.DEFAULT_TENANT_BUCKET_NAMESPACE_POLICY_SUFFIX;
+    if (policyName.endsWith(policySuffix)) {
+      // Recreate default VolumeAccess policy
+      tenantId = getTenantIdFromPolicyName(policyName, policySuffix);
+      LOG.info("Recovering VolumeAccess policy for tenant: {}", tenantId);
+
+      final String volumeName =
+          multiTenantManager.getTenantVolumeName(tenantId);
+      final String userRoleName =
+          multiTenantManager.getTenantUserRoleName(tenantId);
+      final String adminRoleName =
+          multiTenantManager.getTenantAdminRoleName(tenantId);
+
+      final AccessPolicy tenantVolumeAccessPolicy =
+          multiTenantManager.newDefaultVolumeAccessPolicy(volumeName,
+              new OzoneTenantRolePrincipal(userRoleName),
+              new OzoneTenantRolePrincipal(adminRoleName));
+
+      String id = authorizer.createAccessPolicy(tenantVolumeAccessPolicy);
+      LOG.info("Created policy, ID: {}", id);
+
+      return;
+    }
+
+    policySuffix = OzoneConsts.DEFAULT_TENANT_BUCKET_POLICY_SUFFIX;
+    if (policyName.endsWith(policySuffix)) {
+      // Recreate default BucketAccess policy
+      tenantId = getTenantIdFromPolicyName(policyName, policySuffix);
+      LOG.info("Recovering BucketAccess policy for tenant: {}", tenantId);
+
+      final String volumeName =
+          multiTenantManager.getTenantVolumeName(tenantId);
+      final String userRoleName =
+          multiTenantManager.getTenantUserRoleName(tenantId);
+
+      final AccessPolicy tenantBucketCreatePolicy =
+          multiTenantManager.newDefaultBucketAccessPolicy(volumeName,
+              new OzoneTenantRolePrincipal(userRoleName));
+
+      String id = authorizer.createAccessPolicy(tenantBucketCreatePolicy);
+      LOG.info("Created policy, ID: {}", id);
+
+      return;
+    }
+
+    LOG.error("Unable to recover Ranger policy: {}", policyName);
+  }
+
+  /**
+   * Helper function to add user principal to a role in mtOMDBRoles.
+   */
+  private void addUserToMtOMDBRoles(String roleName, String userPrincipal) {
+    if (!mtOMDBRoles.containsKey(roleName)) {
+      mtOMDBRoles.put(roleName, new HashSet<>(
+          Collections.singletonList(userPrincipal)));
+    } else {
+      final HashSet<String> usersInTheRole = mtOMDBRoles.get(roleName);
+      usersInTheRole.add(userPrincipal);
+    }
+  }
+
+  private void loadAllRolesFromOM() throws IOException {
+    if (multiTenantManager instanceof OMMultiTenantManagerImpl) {
+      loadAllRolesFromCache();
+    } else {
+      LOG.warn("Cache is not supported for {}. Loading roles directly from DB",
+          multiTenantManager.getClass().getSimpleName());
+      loadAllRolesFromDB();
+    }
+  }
+
+  private void loadAllRolesFromCache() {
+
+    final OMMultiTenantManagerImpl impl =
+        (OMMultiTenantManagerImpl) multiTenantManager;
+    final Map<String, CachedTenantState> tenantCache = impl.getTenantCache();
+
+    impl.acquireTenantCacheReadLock();
+
+    try {
+      // tenantCache: tenantId -> CachedTenantState
+      for (Map.Entry<String, CachedTenantState> e1 : tenantCache.entrySet()) {
+        final CachedTenantState cachedTenantState = e1.getValue();
+
+        final String userRoleName = cachedTenantState.getTenantUserRoleName();
+        mtOMDBRoles.computeIfAbsent(userRoleName, any -> new HashSet<>());
+        final String adminRoleName = cachedTenantState.getTenantAdminRoleName();
+        mtOMDBRoles.computeIfAbsent(adminRoleName, any -> new HashSet<>());
+
+        final Map<String, CachedAccessIdInfo> accessIdInfoMap =
+            cachedTenantState.getAccessIdInfoMap();
+
+        // accessIdInfoMap: accessId -> CachedAccessIdInfo
+        for (Map.Entry<String, CachedAccessIdInfo> e2 :
+            accessIdInfoMap.entrySet()) {
+          final CachedAccessIdInfo cachedAccessIdInfo = e2.getValue();
+
+          final String userPrincipal = cachedAccessIdInfo.getUserPrincipal();
+          final boolean isAdmin = cachedAccessIdInfo.getIsAdmin();
+
+          addUserToMtOMDBRoles(userRoleName, userPrincipal);
+
+          if (isAdmin) {
+            addUserToMtOMDBRoles(adminRoleName, userPrincipal);
+          }
+        }
+      }
+    } finally {
+      impl.releaseTenantCacheReadLock();
+    }
+
+  }
+
+  private void loadAllRolesFromDB() throws IOException {
+    // We have the following in OM DB
+    //  tenantStateTable: tenantId -> TenantState (has user and admin role name)
+    //  tenantAccessIdTable : accessId -> OmDBAccessIdInfo
+
+    final Table<String, OmDBTenantState> tenantStateTable =
+        metadataManager.getTenantStateTable();
+
+    // Iterate all DB ExtendedUserAccessIdInfo. For each accessId,
+    // add to userRole. And add to adminRole if isAdmin is set.
+    TableIterator<String, ? extends Table.KeyValue<String, OmDBAccessIdInfo>>
+        tenantAccessIdTableIter =
+        metadataManager.getTenantAccessIdTable().iterator();
+    while (tenantAccessIdTableIter.hasNext()) {
+      final Table.KeyValue<String, OmDBAccessIdInfo> tableKeyValue =
+          tenantAccessIdTableIter.next();
+      final OmDBAccessIdInfo dbAccessIdInfo = tableKeyValue.getValue();
+
+      final String tenantId = dbAccessIdInfo.getTenantId();
+      final String userPrincipal = dbAccessIdInfo.getUserPrincipal();
+
+      final OmDBTenantState dbTenantState = tenantStateTable.get(tenantId);
+      final String userRoleName = dbTenantState.getUserRoleName();
+      mtOMDBRoles.computeIfAbsent(userRoleName, any -> new HashSet<>());
+      final String adminRoleName = dbTenantState.getAdminRoleName();
+      mtOMDBRoles.computeIfAbsent(adminRoleName, any -> new HashSet<>());
+
+      // Every tenant user should be in the tenant's userRole
+      addUserToMtOMDBRoles(userRoleName, userPrincipal);
+
+      // If the accessId has admin flag set, add to adminRole as well
+      if (dbAccessIdInfo.getIsAdmin()) {
+        addUserToMtOMDBRoles(adminRoleName, userPrincipal);
+      }
+    }
+  }
+
+  private void processAllRolesFromOMDB() throws IOException {
+    // Lets First make sure that all the Roles in OM DB are present in Ranger
+    // as well as the corresponding userlist matches matches.
+    for (Map.Entry<String, HashSet<String>> entry : mtOMDBRoles.entrySet()) {
+      final String roleName = entry.getKey();
+      boolean pushRoleToRanger = false;
+      if (mtRangerRoles.containsKey(roleName)) {
+        final HashSet<String> rangerUserList =
+            mtRangerRoles.get(roleName).getUserSet();
+        final HashSet<String> userSet = entry.getValue();
+        for (String userPrincipal : userSet) {
+          if (rangerUserList.contains(userPrincipal)) {
+            rangerUserList.remove(userPrincipal);
+          } else {
+            // We found a user in OM DB Role that doesn't exist in Ranger Role.
+            // Lets just push the role from OM DB to Ranger
+            pushRoleToRanger = true;
+            break;
+          }
+        }
+        // We have processed all the Userlist entries in the OMDB. If
+        // ranger Userlist is not empty, we are not in sync with ranger.

Review Comment:
   ```suggestion
           // ranger Userlist is not empty, Ranger has users that OM DB does not.
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ranger Ozone service version ({}) differs from DB's ({}). "
+                  + "Starting to sync.",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}",
+            runCount.get(), attempt);
+
+        executeOMDBToRangerSync(currentOzoneServiceVerInDB);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+        // Submit Ratis Request to sync the new service version in OM DB
+        setOMDBRangerServiceVersion(proposedOzoneServiceVerInDB);
+
+        // TODO: Acquire lock
+
+        // Check Ranger ozone service version again
+        currentOzoneServiceVerInDB = proposedOzoneServiceVerInDB;
+        proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception during Ranger Sync", e);
+      // TODO: Check specific exception once switched to
+      //  RangerRestMultiTenantAccessController
+//    } finally {
+//      // TODO: Release lock
+    }
+
+  }
+
+  long retrieveRangerServiceVersion() throws IOException {
+    return authorizer.getCurrentOzoneServiceVersion(rangerOzoneServiceId);
+  }
+
+  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(CLIENT_ID)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  void setOMDBRangerServiceVersion(long version) {
+    // OM DB update goes through Ratis
+    RangerServiceVersionSyncRequest.Builder versionSyncRequest =
+        RangerServiceVersionSyncRequest.newBuilder()
+            .setRangerServiceVersion(version);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.RangerServiceVersionSync)
+        .setRangerServiceVersionSyncRequest(versionSyncRequest)
+        .setClientId(CLIENT_ID.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM
+    try {
+      RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("RangerServiceVersionSync request failed. "
+          + "Will retry at next run.");
+    }
+  }
+
+  long getOMDBRangerServiceVersion() throws IOException {
+    final Long dbValue = ozoneManager.getMetadataManager()
+        .getOmRangerStateTable()
+        .get(OmMetadataManagerImpl.RANGER_OZONE_SERVICE_VERSION_KEY);
+    if (dbValue == null) {
+      return -1;
+    } else {
+      return dbValue;
+    }
+  }
+
+  private void executeOMDBToRangerSync(long baseVersion) throws IOException {
+    clearPolicyAndRoleMaps();
+
+    // TODO: Acquire global lock
+    loadAllPoliciesRolesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOM();
+    // TODO: Release global lock
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    processAllRolesFromOMDB();
+  }
+
+  private void clearPolicyAndRoleMaps() {
+    mtRangerPoliciesToBeCreated.clear();
+    mtRangerPoliciesToBeDeleted.clear();
+    mtRangerRoles.clear();
+    mtOMDBRoles.clear();
+  }
+
+  /**
+   * TODO: Test and make sure invalid JSON response from Ranger won't crash OM.
+   */
+  private void loadAllPoliciesRolesFromRanger(long baseVersion)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("baseVersion is {}", baseVersion);
+    }
+    // TODO: incremental policies API is broken. We are getting all the
+    //  Multi-Tenant policies using Ranger labels.
+    String allPolicies = authorizer.getAllMultiTenantPolicies(
+        rangerOzoneServiceId);
+    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
+    lastRangerPolicyLoadTime = jObject.get("queryTimeMS").getAsLong();
+    JsonArray policyArray = jObject.getAsJsonArray("policies");
+    for (int i = 0; i < policyArray.size(); ++i) {
+      JsonObject newPolicy = policyArray.get(i).getAsJsonObject();
+      if (!newPolicy.getAsJsonArray("policyLabels").get(0)
+          .getAsString().equals("OzoneMultiTenant")) {
+        // Shouldn't get policies without the tag here very often as it is
+        // specified in the query param, unless a user removed the tag during
+        // the sync
+        LOG.warn("Received a Ranger policy without OzoneMultiTenant tag: {}",
+            newPolicy.get("name").getAsString());
+        continue;
+      }
+      // Temporarily put the policy in the to-delete list,
+      // valid entries will be removed later
+      mtRangerPoliciesToBeDeleted.put(
+          newPolicy.get("name").getAsString(),
+          newPolicy.get("id").getAsString());
+
+      final JsonArray policyItems = newPolicy.getAsJsonArray("policyItems");
+      for (int j = 0; j < policyItems.size(); ++j) {
+        JsonObject policy = policyItems.get(j).getAsJsonObject();
+        JsonArray roles = policy.getAsJsonArray("roles");
+        for (int k = 0; k < roles.size(); ++k) {
+          if (!mtRangerRoles.containsKey(roles.get(k).getAsString())) {
+            // We only get the role name here. We need to query and populate it.
+            mtRangerRoles.put(roles.get(k).getAsString(),
+                new BGRole(roles.get(k).getAsString()));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper function to throw benign exception if the current OM is no longer
+   * the leader in case a leader transition happened during the sync. So the
+   * sync run can abort earlier.
+   *
+   * Note: EACH Ranger request can take 3-7 seconds as tested in UT.
+   */
+  private void checkLeader() throws IOException {
+    if (!ozoneManager.isLeaderReady()) {
+      throw new OMException("Not leader. Abort", ResultCodes.INTERNAL_ERROR);
+    }
+  }
+
+  private void loadAllRolesFromRanger() throws IOException {
+    for (Map.Entry<String, BGRole> entry: mtRangerRoles.entrySet()) {
+      final String roleName = entry.getKey();
+      checkLeader();
+      final String roleDataString = authorizer.getRole(roleName);
+      final JsonObject roleObject =
+          new JsonParser().parse(roleDataString).getAsJsonObject();
+      final BGRole role = entry.getValue();
+      role.setId(roleObject.get("id").getAsString());
+      final JsonArray userArray = roleObject.getAsJsonArray("users");
+      for (int i = 0; i < userArray.size(); ++i) {
+        role.addUserPrincipal(userArray.get(i).getAsJsonObject().get("name")
+            .getAsString());
+      }
+    }
+  }
+
+  /**
+   * Helper function to add/remove a policy name to/from mtRangerPolicies lists.
+   */
+  private void mtRangerPoliciesOpHelper(String policyName) {
+    if (mtRangerPoliciesToBeDeleted.containsKey(policyName)) {
+      // This entry is in sync with ranger, remove it from the set
+      // Eventually mtRangerPolicies will only contain entries that
+      // are not in OMDB and should be removed from Ranger.
+      mtRangerPoliciesToBeDeleted.remove(policyName);
+    } else {
+      // We could not find a policy in ranger that should have been there.
+      mtRangerPoliciesToBeCreated.put(policyName, null);
+    }
+  }
+
+  private void processAllPoliciesFromOMDB() throws IOException {
+
+    // Iterate all DB tenant states. For each tenant,
+    // queue or dequeue bucketNamespacePolicyName and bucketPolicyName
+    TableIterator<String, ? extends Table.KeyValue<String, OmDBTenantState>>
+        tenantStateTableIter = metadataManager.getTenantStateTable().iterator();
+    while (tenantStateTableIter.hasNext()) {
+      final Table.KeyValue<String, OmDBTenantState> tableKeyValue =
+          tenantStateTableIter.next();
+      final OmDBTenantState dbTenantState = tableKeyValue.getValue();
+      final String volumeName = dbTenantState.getBucketNamespaceName();
+      Preconditions.checkNotNull(volumeName);
+
+      boolean acquiredVolumeLock = false;
+      try {
+        acquiredVolumeLock = metadataManager.getLock().acquireWriteLock(
+            VOLUME_LOCK, volumeName);
+
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketNamespacePolicyName());
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketPolicyName());
+      } finally {
+        if (acquiredVolumeLock) {
+          metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
+        }
+      }
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeCreated.entrySet()) {
+      final String policyName = entry.getKey();
+      LOG.warn("Expected policy not found in Ranger: {}", policyName);
+      // Attempt to recreate the default volume/bucket policy if it's missing
+      attemptToRecreateDefaultPolicy(policyName);
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeDeleted.entrySet()) {
+      final String policyName = entry.getKey();
+      checkLeader();
+      // TODO: Use AccessController instead of AccessPolicy
+//      MultiTenantAccessController accessController =
+//          authorizer.getMultiTenantAccessController();
+      AccessPolicy accessPolicy = authorizer.getAccessPolicyByName(policyName);
+
+      if (lastRangerPolicyLoadTime >
+          (accessPolicy.getPolicyLastUpdateTime() + ONE_HOUR_IN_MILLIS)) {
+        LOG.warn("Deleting policy from Ranger: {}", policyName);
+        authorizer.deletePolicybyName(policyName);
+      }
+    }
+
+  }
+
+  private String getTenantIdFromPolicyName(
+      String policyName, String policySuffix) {
+    return policyName.substring(0, policyName.length() - policySuffix.length());
+  }
+
+  /**
+   * Takes a policy name (e.g. tenant1-VolumeAccess), attempts to (re)create
+   * the policy in Ranger by the policy name alone.
+   */
+  private void attemptToRecreateDefaultPolicy(String policyName)
+      throws IOException {
+    String policySuffix, tenantId;
+
+    policySuffix = OzoneConsts.DEFAULT_TENANT_BUCKET_NAMESPACE_POLICY_SUFFIX;
+    if (policyName.endsWith(policySuffix)) {
+      // Recreate default VolumeAccess policy
+      tenantId = getTenantIdFromPolicyName(policyName, policySuffix);
+      LOG.info("Recovering VolumeAccess policy for tenant: {}", tenantId);
+
+      final String volumeName =
+          multiTenantManager.getTenantVolumeName(tenantId);
+      final String userRoleName =
+          multiTenantManager.getTenantUserRoleName(tenantId);
+      final String adminRoleName =
+          multiTenantManager.getTenantAdminRoleName(tenantId);
+
+      final AccessPolicy tenantVolumeAccessPolicy =
+          multiTenantManager.newDefaultVolumeAccessPolicy(volumeName,
+              new OzoneTenantRolePrincipal(userRoleName),
+              new OzoneTenantRolePrincipal(adminRoleName));
+
+      String id = authorizer.createAccessPolicy(tenantVolumeAccessPolicy);
+      LOG.info("Created policy, ID: {}", id);
+
+      return;
+    }
+
+    policySuffix = OzoneConsts.DEFAULT_TENANT_BUCKET_POLICY_SUFFIX;
+    if (policyName.endsWith(policySuffix)) {
+      // Recreate default BucketAccess policy
+      tenantId = getTenantIdFromPolicyName(policyName, policySuffix);
+      LOG.info("Recovering BucketAccess policy for tenant: {}", tenantId);
+
+      final String volumeName =
+          multiTenantManager.getTenantVolumeName(tenantId);
+      final String userRoleName =
+          multiTenantManager.getTenantUserRoleName(tenantId);
+
+      final AccessPolicy tenantBucketCreatePolicy =
+          multiTenantManager.newDefaultBucketAccessPolicy(volumeName,
+              new OzoneTenantRolePrincipal(userRoleName));
+
+      String id = authorizer.createAccessPolicy(tenantBucketCreatePolicy);
+      LOG.info("Created policy, ID: {}", id);
+
+      return;
+    }
+
+    LOG.error("Unable to recover Ranger policy: {}", policyName);
+  }
+
+  /**
+   * Helper function to add user principal to a role in mtOMDBRoles.
+   */
+  private void addUserToMtOMDBRoles(String roleName, String userPrincipal) {
+    if (!mtOMDBRoles.containsKey(roleName)) {
+      mtOMDBRoles.put(roleName, new HashSet<>(
+          Collections.singletonList(userPrincipal)));
+    } else {
+      final HashSet<String> usersInTheRole = mtOMDBRoles.get(roleName);
+      usersInTheRole.add(userPrincipal);
+    }
+  }
+
+  private void loadAllRolesFromOM() throws IOException {
+    if (multiTenantManager instanceof OMMultiTenantManagerImpl) {
+      loadAllRolesFromCache();
+    } else {
+      LOG.warn("Cache is not supported for {}. Loading roles directly from DB",
+          multiTenantManager.getClass().getSimpleName());
+      loadAllRolesFromDB();
+    }
+  }
+
+  private void loadAllRolesFromCache() {
+
+    final OMMultiTenantManagerImpl impl =
+        (OMMultiTenantManagerImpl) multiTenantManager;
+    final Map<String, CachedTenantState> tenantCache = impl.getTenantCache();
+
+    impl.acquireTenantCacheReadLock();
+
+    try {
+      // tenantCache: tenantId -> CachedTenantState
+      for (Map.Entry<String, CachedTenantState> e1 : tenantCache.entrySet()) {
+        final CachedTenantState cachedTenantState = e1.getValue();
+
+        final String userRoleName = cachedTenantState.getTenantUserRoleName();
+        mtOMDBRoles.computeIfAbsent(userRoleName, any -> new HashSet<>());
+        final String adminRoleName = cachedTenantState.getTenantAdminRoleName();
+        mtOMDBRoles.computeIfAbsent(adminRoleName, any -> new HashSet<>());
+
+        final Map<String, CachedAccessIdInfo> accessIdInfoMap =
+            cachedTenantState.getAccessIdInfoMap();
+
+        // accessIdInfoMap: accessId -> CachedAccessIdInfo
+        for (Map.Entry<String, CachedAccessIdInfo> e2 :
+            accessIdInfoMap.entrySet()) {
+          final CachedAccessIdInfo cachedAccessIdInfo = e2.getValue();
+
+          final String userPrincipal = cachedAccessIdInfo.getUserPrincipal();
+          final boolean isAdmin = cachedAccessIdInfo.getIsAdmin();
+
+          addUserToMtOMDBRoles(userRoleName, userPrincipal);
+
+          if (isAdmin) {
+            addUserToMtOMDBRoles(adminRoleName, userPrincipal);
+          }
+        }
+      }
+    } finally {
+      impl.releaseTenantCacheReadLock();
+    }
+
+  }
+
+  private void loadAllRolesFromDB() throws IOException {
+    // We have the following in OM DB
+    //  tenantStateTable: tenantId -> TenantState (has user and admin role name)
+    //  tenantAccessIdTable : accessId -> OmDBAccessIdInfo
+
+    final Table<String, OmDBTenantState> tenantStateTable =
+        metadataManager.getTenantStateTable();
+
+    // Iterate all DB ExtendedUserAccessIdInfo. For each accessId,
+    // add to userRole. And add to adminRole if isAdmin is set.
+    TableIterator<String, ? extends Table.KeyValue<String, OmDBAccessIdInfo>>
+        tenantAccessIdTableIter =
+        metadataManager.getTenantAccessIdTable().iterator();
+    while (tenantAccessIdTableIter.hasNext()) {
+      final Table.KeyValue<String, OmDBAccessIdInfo> tableKeyValue =
+          tenantAccessIdTableIter.next();
+      final OmDBAccessIdInfo dbAccessIdInfo = tableKeyValue.getValue();
+
+      final String tenantId = dbAccessIdInfo.getTenantId();
+      final String userPrincipal = dbAccessIdInfo.getUserPrincipal();
+
+      final OmDBTenantState dbTenantState = tenantStateTable.get(tenantId);
+      final String userRoleName = dbTenantState.getUserRoleName();
+      mtOMDBRoles.computeIfAbsent(userRoleName, any -> new HashSet<>());
+      final String adminRoleName = dbTenantState.getAdminRoleName();
+      mtOMDBRoles.computeIfAbsent(adminRoleName, any -> new HashSet<>());
+
+      // Every tenant user should be in the tenant's userRole
+      addUserToMtOMDBRoles(userRoleName, userPrincipal);
+
+      // If the accessId has admin flag set, add to adminRole as well
+      if (dbAccessIdInfo.getIsAdmin()) {
+        addUserToMtOMDBRoles(adminRoleName, userPrincipal);
+      }
+    }
+  }
+
+  private void processAllRolesFromOMDB() throws IOException {
+    // Lets First make sure that all the Roles in OM DB are present in Ranger
+    // as well as the corresponding userlist matches matches.
+    for (Map.Entry<String, HashSet<String>> entry : mtOMDBRoles.entrySet()) {
+      final String roleName = entry.getKey();
+      boolean pushRoleToRanger = false;
+      if (mtRangerRoles.containsKey(roleName)) {
+        final HashSet<String> rangerUserList =
+            mtRangerRoles.get(roleName).getUserSet();
+        final HashSet<String> userSet = entry.getValue();
+        for (String userPrincipal : userSet) {
+          if (rangerUserList.contains(userPrincipal)) {
+            rangerUserList.remove(userPrincipal);
+          } else {
+            // We found a user in OM DB Role that doesn't exist in Ranger Role.
+            // Lets just push the role from OM DB to Ranger
+            pushRoleToRanger = true;
+            break;
+          }
+        }
+        // We have processed all the Userlist entries in the OMDB. If
+        // ranger Userlist is not empty, we are not in sync with ranger.
+        if (!rangerUserList.isEmpty()) {
+          pushRoleToRanger = true;
+        }
+      } else {
+        // 1. The role is missing from Ranger, or;
+        // 2. A policy (that uses this role) is missing from Ranger, causing
+        // mtRangerRoles to be populated incorrectly. In this case the roles
+        // are there just fine. If not, will be corrected in the next run anyway
+        checkLeader();
+        try {
+          authorizer.createRole(roleName, null);
+        } catch (IOException e) {
+          // Tolerate create role failure, possibly due to role already exists
+          LOG.error(e.getMessage());
+        }
+        pushRoleToRanger = true;
+      }
+      if (pushRoleToRanger) {
+        LOG.info("Updating role in Ranger: {}", roleName);
+        checkLeader();
+        pushOMDBRoleToRanger(roleName);
+      }
+      // We have processed this role in OM DB now. Lets remove it from
+      // mtRangerRoles. Eventually whatever is left in mtRangerRoles
+      // are extra entries, that should not have been in Ranger.
+      mtRangerRoles.remove(roleName);
+    }
+
+    // A hack (for now) to delete UserRole before AdminRole

Review Comment:
   Why do we need to delete user role before admin role?



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;

Review Comment:
   ```suggestion
     private static final long STALE_POLICY_THRESHOLD = Duration.ofHours(1).toMillis();`
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSyncService.java:
##########
@@ -0,0 +1,761 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManager;
+import org.apache.hadoop.ozone.om.OMMultiTenantManagerImpl;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantState;
+import org.apache.hadoop.ozone.om.multitenant.CachedTenantState.CachedAccessIdInfo;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+/**
+ * Background Sync thread that reads Multi-Tenancy state from OM DB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSyncService extends BackgroundService {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerBGSyncService.class);
+  private static final ClientId CLIENT_ID = ClientId.randomId();
+  private static final long ONE_HOUR_IN_MILLIS = 3600 * 1000;
+
+  private final OzoneManager ozoneManager;
+  private final OMMetadataManager metadataManager;
+  private final OMMultiTenantManager multiTenantManager;
+  private final MultiTenantAccessAuthorizer authorizer;
+
+  // Maximum number of attempts for each sync run
+  private static final int MAX_ATTEMPT = 2;
+  private final AtomicLong runCount = new AtomicLong(0);
+  private int rangerOzoneServiceId;
+
+  private boolean isServiceStarted = false;
+
+  static class BGRole {
+    private final String name;
+    private String id;
+    private final HashSet<String> userSet;
+
+    BGRole(String n) {
+      this.name = n;
+      userSet = new HashSet<>();
+    }
+
+    public void setId(String id) {
+      this.id = id;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public void addUserPrincipal(String userPrincipal) {
+      userSet.add(userPrincipal);
+    }
+
+    public HashSet<String> getUserSet() {
+      return userSet;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      // TODO: Do we care about userSet
+      return this.hashCode() == o.hashCode();
+    }
+  }
+
+  // This map will be used to keep all the policies that are found in
+  // OM DB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeCreated =
+      new HashMap<>();
+
+  // We will track all the policies in Ranger here. After we have
+  // processed all the policies from OM DB, this map will
+  // be left with policies that we need to delete.
+  //
+  // Maps from policy name to policy ID in Ranger
+  private final HashMap<String, String> mtRangerPoliciesToBeDeleted =
+      new HashMap<>();
+
+  // This map will keep all the Multi-Tenancy related roles from Ranger.
+  private final HashMap<String, BGRole> mtRangerRoles = new HashMap<>();
+
+  // Keep OM DB mapping of Roles -> list of user principals.
+  private final HashMap<String, HashSet<String>> mtOMDBRoles = new HashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSyncService(OzoneManager ozoneManager,
+      MultiTenantAccessAuthorizer authorizer, long interval,
+      TimeUnit unit, long serviceTimeout)
+      throws IOException {
+    super("OMRangerBGSyncService", interval, unit, 1, serviceTimeout);
+
+    this.ozoneManager = ozoneManager;
+    this.metadataManager = ozoneManager.getMetadataManager();
+    this.multiTenantManager = ozoneManager.getMultiTenantManager();
+
+    this.authorizer = authorizer;
+
+    if (authorizer != null) {
+      if (authorizer instanceof MultiTenantAccessAuthorizerRangerPlugin) {
+        MultiTenantAccessAuthorizerRangerPlugin rangerAuthorizer =
+            (MultiTenantAccessAuthorizerRangerPlugin) authorizer;
+        rangerOzoneServiceId = rangerAuthorizer.getRangerOzoneServiceId();
+      } else if (
+          !(authorizer instanceof MultiTenantAccessAuthorizerDummyPlugin)) {
+        throw new OMException("Unsupported MultiTenantAccessAuthorizer: " +
+            authorizer.getClass().getSimpleName(),
+            ResultCodes.INTERNAL_ERROR);
+      }
+    } else {
+      // authorizer can be null for unit tests
+      LOG.warn("MultiTenantAccessAuthorizer is not set");
+    }
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new RangerBGSyncTask());
+    return queue;
+  }
+
+  public void start() {
+    if (authorizer == null) {
+      LOG.error("Failed to start the background sync service: "
+          + "null authorizer. Please check OM configuration. Aborting");
+      return;
+    }
+    isServiceStarted = true;
+    super.start();
+  }
+
+  public void shutdown() {
+    isServiceStarted = false;
+    super.shutdown();
+  }
+
+  /**
+   * Returns true if the service run conditions are satisfied, false otherwise.
+   */
+  private boolean shouldRun() {
+    if (ozoneManager == null) {
+      // OzoneManager can be null for testing
+      return true;
+    }
+    // The service only runs if current OM node is leader and is ready
+    //  and the service is marked as started
+    return isServiceStarted && ozoneManager.isLeaderReady();
+  }
+
+  private class RangerBGSyncTask implements BackgroundTask {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() {
+      // Check OM leader and readiness
+      if (shouldRun()) {
+        runCount.incrementAndGet();
+        triggerRangerSyncOnce();
+      }
+
+      return EmptyTaskResult.newResult();
+    }
+  }
+
+  private void triggerRangerSyncOnce() {
+    int attempt = 0;
+    try {
+      // TODO: Acquire lock
+      long currentOzoneServiceVerInDB = getOMDBRangerServiceVersion();
+      long proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Database version: {}, Ranger version: {}",
+            currentOzoneServiceVerInDB, proposedOzoneServiceVerInDB);
+      }
+      while (currentOzoneServiceVerInDB != proposedOzoneServiceVerInDB) {
+        // TODO: Release lock
+        if (++attempt > MAX_ATTEMPT) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Reached maximum number of attempts ({}). Abort",
+                MAX_ATTEMPT);
+          }
+          break;
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ranger Ozone service version ({}) differs from DB's ({}). "
+                  + "Starting to sync.",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+
+        LOG.info("Executing Multi-Tenancy Ranger Sync: run #{}, attempt #{}",
+            runCount.get(), attempt);
+
+        executeOMDBToRangerSync(currentOzoneServiceVerInDB);
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Setting OM DB Ranger Service Version to {} (was {})",
+              proposedOzoneServiceVerInDB, currentOzoneServiceVerInDB);
+        }
+        // Submit Ratis Request to sync the new service version in OM DB
+        setOMDBRangerServiceVersion(proposedOzoneServiceVerInDB);
+
+        // TODO: Acquire lock
+
+        // Check Ranger ozone service version again
+        currentOzoneServiceVerInDB = proposedOzoneServiceVerInDB;
+        proposedOzoneServiceVerInDB = retrieveRangerServiceVersion();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exception during Ranger Sync", e);
+      // TODO: Check specific exception once switched to
+      //  RangerRestMultiTenantAccessController
+//    } finally {
+//      // TODO: Release lock
+    }
+
+  }
+
+  long retrieveRangerServiceVersion() throws IOException {
+    return authorizer.getCurrentOzoneServiceVersion(rangerOzoneServiceId);
+  }
+
+  private RaftClientRequest newRaftClientRequest(OMRequest omRequest) {
+    return RaftClientRequest.newBuilder()
+        .setClientId(CLIENT_ID)
+        .setServerId(ozoneManager.getOmRatisServer().getRaftPeerId())
+        .setGroupId(ozoneManager.getOmRatisServer().getRaftGroupId())
+        .setCallId(runCount.get())
+        .setMessage(
+            Message.valueOf(
+                OMRatisHelper.convertRequestToByteString(omRequest)))
+        .setType(RaftClientRequest.writeRequestType())
+        .build();
+  }
+
+  void setOMDBRangerServiceVersion(long version) {
+    // OM DB update goes through Ratis
+    RangerServiceVersionSyncRequest.Builder versionSyncRequest =
+        RangerServiceVersionSyncRequest.newBuilder()
+            .setRangerServiceVersion(version);
+
+    OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.RangerServiceVersionSync)
+        .setRangerServiceVersionSyncRequest(versionSyncRequest)
+        .setClientId(CLIENT_ID.toString())
+        .build();
+
+    // Submit PurgeKeys request to OM
+    try {
+      RaftClientRequest raftClientRequest = newRaftClientRequest(omRequest);
+      ozoneManager.getOmRatisServer().submitRequest(omRequest,
+          raftClientRequest);
+    } catch (ServiceException e) {
+      LOG.error("RangerServiceVersionSync request failed. "
+          + "Will retry at next run.");
+    }
+  }
+
+  long getOMDBRangerServiceVersion() throws IOException {
+    final Long dbValue = ozoneManager.getMetadataManager()
+        .getOmRangerStateTable()
+        .get(OmMetadataManagerImpl.RANGER_OZONE_SERVICE_VERSION_KEY);
+    if (dbValue == null) {
+      return -1;
+    } else {
+      return dbValue;
+    }
+  }
+
+  private void executeOMDBToRangerSync(long baseVersion) throws IOException {
+    clearPolicyAndRoleMaps();
+
+    // TODO: Acquire global lock
+    loadAllPoliciesRolesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOM();
+    // TODO: Release global lock
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    processAllRolesFromOMDB();
+  }
+
+  private void clearPolicyAndRoleMaps() {
+    mtRangerPoliciesToBeCreated.clear();
+    mtRangerPoliciesToBeDeleted.clear();
+    mtRangerRoles.clear();
+    mtOMDBRoles.clear();
+  }
+
+  /**
+   * TODO: Test and make sure invalid JSON response from Ranger won't crash OM.
+   */
+  private void loadAllPoliciesRolesFromRanger(long baseVersion)
+      throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("baseVersion is {}", baseVersion);
+    }
+    // TODO: incremental policies API is broken. We are getting all the
+    //  Multi-Tenant policies using Ranger labels.
+    String allPolicies = authorizer.getAllMultiTenantPolicies(
+        rangerOzoneServiceId);
+    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
+    lastRangerPolicyLoadTime = jObject.get("queryTimeMS").getAsLong();
+    JsonArray policyArray = jObject.getAsJsonArray("policies");
+    for (int i = 0; i < policyArray.size(); ++i) {
+      JsonObject newPolicy = policyArray.get(i).getAsJsonObject();
+      if (!newPolicy.getAsJsonArray("policyLabels").get(0)
+          .getAsString().equals("OzoneMultiTenant")) {
+        // Shouldn't get policies without the tag here very often as it is
+        // specified in the query param, unless a user removed the tag during
+        // the sync
+        LOG.warn("Received a Ranger policy without OzoneMultiTenant tag: {}",
+            newPolicy.get("name").getAsString());
+        continue;
+      }
+      // Temporarily put the policy in the to-delete list,
+      // valid entries will be removed later
+      mtRangerPoliciesToBeDeleted.put(
+          newPolicy.get("name").getAsString(),
+          newPolicy.get("id").getAsString());
+
+      final JsonArray policyItems = newPolicy.getAsJsonArray("policyItems");
+      for (int j = 0; j < policyItems.size(); ++j) {
+        JsonObject policy = policyItems.get(j).getAsJsonObject();
+        JsonArray roles = policy.getAsJsonArray("roles");
+        for (int k = 0; k < roles.size(); ++k) {
+          if (!mtRangerRoles.containsKey(roles.get(k).getAsString())) {
+            // We only get the role name here. We need to query and populate it.
+            mtRangerRoles.put(roles.get(k).getAsString(),
+                new BGRole(roles.get(k).getAsString()));
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Helper function to throw benign exception if the current OM is no longer
+   * the leader in case a leader transition happened during the sync. So the
+   * sync run can abort earlier.
+   *
+   * Note: EACH Ranger request can take 3-7 seconds as tested in UT.
+   */
+  private void checkLeader() throws IOException {
+    if (!ozoneManager.isLeaderReady()) {
+      throw new OMException("Not leader. Abort", ResultCodes.INTERNAL_ERROR);
+    }
+  }
+
+  private void loadAllRolesFromRanger() throws IOException {
+    for (Map.Entry<String, BGRole> entry: mtRangerRoles.entrySet()) {
+      final String roleName = entry.getKey();
+      checkLeader();
+      final String roleDataString = authorizer.getRole(roleName);
+      final JsonObject roleObject =
+          new JsonParser().parse(roleDataString).getAsJsonObject();
+      final BGRole role = entry.getValue();
+      role.setId(roleObject.get("id").getAsString());
+      final JsonArray userArray = roleObject.getAsJsonArray("users");
+      for (int i = 0; i < userArray.size(); ++i) {
+        role.addUserPrincipal(userArray.get(i).getAsJsonObject().get("name")
+            .getAsString());
+      }
+    }
+  }
+
+  /**
+   * Helper function to add/remove a policy name to/from mtRangerPolicies lists.
+   */
+  private void mtRangerPoliciesOpHelper(String policyName) {
+    if (mtRangerPoliciesToBeDeleted.containsKey(policyName)) {
+      // This entry is in sync with ranger, remove it from the set
+      // Eventually mtRangerPolicies will only contain entries that
+      // are not in OMDB and should be removed from Ranger.
+      mtRangerPoliciesToBeDeleted.remove(policyName);
+    } else {
+      // We could not find a policy in ranger that should have been there.
+      mtRangerPoliciesToBeCreated.put(policyName, null);
+    }
+  }
+
+  private void processAllPoliciesFromOMDB() throws IOException {
+
+    // Iterate all DB tenant states. For each tenant,
+    // queue or dequeue bucketNamespacePolicyName and bucketPolicyName
+    TableIterator<String, ? extends Table.KeyValue<String, OmDBTenantState>>
+        tenantStateTableIter = metadataManager.getTenantStateTable().iterator();
+    while (tenantStateTableIter.hasNext()) {
+      final Table.KeyValue<String, OmDBTenantState> tableKeyValue =
+          tenantStateTableIter.next();
+      final OmDBTenantState dbTenantState = tableKeyValue.getValue();
+      final String volumeName = dbTenantState.getBucketNamespaceName();
+      Preconditions.checkNotNull(volumeName);
+
+      boolean acquiredVolumeLock = false;
+      try {
+        acquiredVolumeLock = metadataManager.getLock().acquireWriteLock(
+            VOLUME_LOCK, volumeName);
+
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketNamespacePolicyName());
+        mtRangerPoliciesOpHelper(dbTenantState.getBucketPolicyName());
+      } finally {
+        if (acquiredVolumeLock) {
+          metadataManager.getLock().releaseWriteLock(VOLUME_LOCK, volumeName);
+        }
+      }
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeCreated.entrySet()) {
+      final String policyName = entry.getKey();
+      LOG.warn("Expected policy not found in Ranger: {}", policyName);
+      // Attempt to recreate the default volume/bucket policy if it's missing
+      attemptToRecreateDefaultPolicy(policyName);
+    }
+
+    for (Map.Entry<String, String> entry :
+        mtRangerPoliciesToBeDeleted.entrySet()) {
+      final String policyName = entry.getKey();
+      checkLeader();
+      // TODO: Use AccessController instead of AccessPolicy
+//      MultiTenantAccessController accessController =
+//          authorizer.getMultiTenantAccessController();
+      AccessPolicy accessPolicy = authorizer.getAccessPolicyByName(policyName);
+
+      if (lastRangerPolicyLoadTime >
+          (accessPolicy.getPolicyLastUpdateTime() + ONE_HOUR_IN_MILLIS)) {
+        LOG.warn("Deleting policy from Ranger: {}", policyName);
+        authorizer.deletePolicybyName(policyName);
+      }
+    }
+
+  }
+
+  private String getTenantIdFromPolicyName(
+      String policyName, String policySuffix) {
+    return policyName.substring(0, policyName.length() - policySuffix.length());
+  }
+
+  /**
+   * Takes a policy name (e.g. tenant1-VolumeAccess), attempts to (re)create
+   * the policy in Ranger by the policy name alone.
+   */
+  private void attemptToRecreateDefaultPolicy(String policyName)
+      throws IOException {
+    String policySuffix, tenantId;
+
+    policySuffix = OzoneConsts.DEFAULT_TENANT_BUCKET_NAMESPACE_POLICY_SUFFIX;
+    if (policyName.endsWith(policySuffix)) {
+      // Recreate default VolumeAccess policy
+      tenantId = getTenantIdFromPolicyName(policyName, policySuffix);
+      LOG.info("Recovering VolumeAccess policy for tenant: {}", tenantId);
+
+      final String volumeName =
+          multiTenantManager.getTenantVolumeName(tenantId);
+      final String userRoleName =
+          multiTenantManager.getTenantUserRoleName(tenantId);
+      final String adminRoleName =
+          multiTenantManager.getTenantAdminRoleName(tenantId);
+
+      final AccessPolicy tenantVolumeAccessPolicy =
+          multiTenantManager.newDefaultVolumeAccessPolicy(volumeName,
+              new OzoneTenantRolePrincipal(userRoleName),
+              new OzoneTenantRolePrincipal(adminRoleName));
+
+      String id = authorizer.createAccessPolicy(tenantVolumeAccessPolicy);
+      LOG.info("Created policy, ID: {}", id);
+
+      return;
+    }
+
+    policySuffix = OzoneConsts.DEFAULT_TENANT_BUCKET_POLICY_SUFFIX;
+    if (policyName.endsWith(policySuffix)) {
+      // Recreate default BucketAccess policy
+      tenantId = getTenantIdFromPolicyName(policyName, policySuffix);
+      LOG.info("Recovering BucketAccess policy for tenant: {}", tenantId);
+
+      final String volumeName =
+          multiTenantManager.getTenantVolumeName(tenantId);
+      final String userRoleName =
+          multiTenantManager.getTenantUserRoleName(tenantId);
+
+      final AccessPolicy tenantBucketCreatePolicy =
+          multiTenantManager.newDefaultBucketAccessPolicy(volumeName,
+              new OzoneTenantRolePrincipal(userRoleName));
+
+      String id = authorizer.createAccessPolicy(tenantBucketCreatePolicy);
+      LOG.info("Created policy, ID: {}", id);
+
+      return;
+    }
+
+    LOG.error("Unable to recover Ranger policy: {}", policyName);
+  }
+
+  /**
+   * Helper function to add user principal to a role in mtOMDBRoles.
+   */
+  private void addUserToMtOMDBRoles(String roleName, String userPrincipal) {
+    if (!mtOMDBRoles.containsKey(roleName)) {
+      mtOMDBRoles.put(roleName, new HashSet<>(
+          Collections.singletonList(userPrincipal)));
+    } else {
+      final HashSet<String> usersInTheRole = mtOMDBRoles.get(roleName);
+      usersInTheRole.add(userPrincipal);
+    }
+  }
+
+  private void loadAllRolesFromOM() throws IOException {
+    if (multiTenantManager instanceof OMMultiTenantManagerImpl) {
+      loadAllRolesFromCache();
+    } else {
+      LOG.warn("Cache is not supported for {}. Loading roles directly from DB",
+          multiTenantManager.getClass().getSimpleName());
+      loadAllRolesFromDB();
+    }
+  }
+
+  private void loadAllRolesFromCache() {
+
+    final OMMultiTenantManagerImpl impl =
+        (OMMultiTenantManagerImpl) multiTenantManager;
+    final Map<String, CachedTenantState> tenantCache = impl.getTenantCache();
+
+    impl.acquireTenantCacheReadLock();
+
+    try {
+      // tenantCache: tenantId -> CachedTenantState
+      for (Map.Entry<String, CachedTenantState> e1 : tenantCache.entrySet()) {
+        final CachedTenantState cachedTenantState = e1.getValue();
+
+        final String userRoleName = cachedTenantState.getTenantUserRoleName();
+        mtOMDBRoles.computeIfAbsent(userRoleName, any -> new HashSet<>());
+        final String adminRoleName = cachedTenantState.getTenantAdminRoleName();
+        mtOMDBRoles.computeIfAbsent(adminRoleName, any -> new HashSet<>());
+
+        final Map<String, CachedAccessIdInfo> accessIdInfoMap =
+            cachedTenantState.getAccessIdInfoMap();
+
+        // accessIdInfoMap: accessId -> CachedAccessIdInfo
+        for (Map.Entry<String, CachedAccessIdInfo> e2 :
+            accessIdInfoMap.entrySet()) {
+          final CachedAccessIdInfo cachedAccessIdInfo = e2.getValue();
+
+          final String userPrincipal = cachedAccessIdInfo.getUserPrincipal();
+          final boolean isAdmin = cachedAccessIdInfo.getIsAdmin();
+
+          addUserToMtOMDBRoles(userRoleName, userPrincipal);
+
+          if (isAdmin) {
+            addUserToMtOMDBRoles(adminRoleName, userPrincipal);
+          }
+        }
+      }
+    } finally {
+      impl.releaseTenantCacheReadLock();
+    }
+
+  }
+
+  private void loadAllRolesFromDB() throws IOException {
+    // We have the following in OM DB
+    //  tenantStateTable: tenantId -> TenantState (has user and admin role name)
+    //  tenantAccessIdTable : accessId -> OmDBAccessIdInfo
+
+    final Table<String, OmDBTenantState> tenantStateTable =
+        metadataManager.getTenantStateTable();
+
+    // Iterate all DB ExtendedUserAccessIdInfo. For each accessId,
+    // add to userRole. And add to adminRole if isAdmin is set.
+    TableIterator<String, ? extends Table.KeyValue<String, OmDBAccessIdInfo>>
+        tenantAccessIdTableIter =
+        metadataManager.getTenantAccessIdTable().iterator();
+    while (tenantAccessIdTableIter.hasNext()) {
+      final Table.KeyValue<String, OmDBAccessIdInfo> tableKeyValue =
+          tenantAccessIdTableIter.next();
+      final OmDBAccessIdInfo dbAccessIdInfo = tableKeyValue.getValue();
+
+      final String tenantId = dbAccessIdInfo.getTenantId();
+      final String userPrincipal = dbAccessIdInfo.getUserPrincipal();
+
+      final OmDBTenantState dbTenantState = tenantStateTable.get(tenantId);
+      final String userRoleName = dbTenantState.getUserRoleName();
+      mtOMDBRoles.computeIfAbsent(userRoleName, any -> new HashSet<>());
+      final String adminRoleName = dbTenantState.getAdminRoleName();
+      mtOMDBRoles.computeIfAbsent(adminRoleName, any -> new HashSet<>());
+
+      // Every tenant user should be in the tenant's userRole
+      addUserToMtOMDBRoles(userRoleName, userPrincipal);
+
+      // If the accessId has admin flag set, add to adminRole as well
+      if (dbAccessIdInfo.getIsAdmin()) {
+        addUserToMtOMDBRoles(adminRoleName, userPrincipal);
+      }
+    }
+  }
+
+  private void processAllRolesFromOMDB() throws IOException {
+    // Lets First make sure that all the Roles in OM DB are present in Ranger
+    // as well as the corresponding userlist matches matches.
+    for (Map.Entry<String, HashSet<String>> entry : mtOMDBRoles.entrySet()) {
+      final String roleName = entry.getKey();
+      boolean pushRoleToRanger = false;
+      if (mtRangerRoles.containsKey(roleName)) {
+        final HashSet<String> rangerUserList =
+            mtRangerRoles.get(roleName).getUserSet();
+        final HashSet<String> userSet = entry.getValue();
+        for (String userPrincipal : userSet) {
+          if (rangerUserList.contains(userPrincipal)) {
+            rangerUserList.remove(userPrincipal);
+          } else {
+            // We found a user in OM DB Role that doesn't exist in Ranger Role.
+            // Lets just push the role from OM DB to Ranger
+            pushRoleToRanger = true;
+            break;
+          }
+        }
+        // We have processed all the Userlist entries in the OMDB. If
+        // ranger Userlist is not empty, we are not in sync with ranger.
+        if (!rangerUserList.isEmpty()) {
+          pushRoleToRanger = true;
+        }
+      } else {
+        // 1. The role is missing from Ranger, or;
+        // 2. A policy (that uses this role) is missing from Ranger, causing
+        // mtRangerRoles to be populated incorrectly. In this case the roles
+        // are there just fine. If not, will be corrected in the next run anyway
+        checkLeader();
+        try {
+          authorizer.createRole(roleName, null);
+        } catch (IOException e) {
+          // Tolerate create role failure, possibly due to role already exists
+          LOG.error(e.getMessage());

Review Comment:
   We need to check error code here. We should not tolerate network errors, otherwise we will be updating our ranger sync version incorrectly. We must only tolerate create failure due to 400 return code (already exists).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org