You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by GitBox <gi...@apache.org> on 2022/02/23 19:22:09 UTC

[GitHub] [ozone] prashantpogde opened a new pull request #3131: HDDS-6371. provide OMDB to Apache Ranger Sync mechanism.

prashantpogde opened a new pull request #3131:
URL: https://github.com/apache/ozone/pull/3131


   ## What changes were proposed in this pull request?
   
   This PR includes changes for Background sync activity from Ozone OMDB to apache ranger for the multi-tenancy feature.
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-6371
   
   ## How was this patch tested?
   
   Unit tests and integration tests were added to verify that it works as intended.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a change in pull request #3131: HDDS-6371. [Multi-Tenant] Provide OM DB to Apache Ranger Sync mechanism

Posted by GitBox <gi...@apache.org>.
errose28 commented on a change in pull request #3131:
URL: https://github.com/apache/ozone/pull/3131#discussion_r822189574



##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");
+
+        executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB);
+
+        if (currentOzoneServiceVersionInOMDB !=
+            proposedOzoneServiceVersionInOMDB) {
+          // Submit Ratis Request to sync the new ozone service version in OMDB
+          setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB);
+        }
+        while (!ozoneManager.getMultiTenantManager()
+            .tryAcquireInProgressMtOp(WAIT_MILI)) {
+          sleep(10);
+        }
+        currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB;
+        proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage() );

Review comment:
       Also if/when we switch to using the `RangerRestMultiTenantAccessController` there should only be a small set of checked exceptions which we will need to declare caught here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] errose28 commented on a change in pull request #3131: HDDS-6371. [Multi-Tenant] Provide OM DB to Apache Ranger Sync mechanism

Posted by GitBox <gi...@apache.org>.
errose28 commented on a change in pull request #3131:
URL: https://github.com/apache/ozone/pull/3131#discussion_r821201194



##########
File path: hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
##########
@@ -210,6 +210,15 @@ public void revokeS3Secret(String kerberosID) throws IOException {
     proxy.revokeS3Secret(kerberosID);
   }
 
+  /**
+   * Sync Ozone Service version in Ranger to OMDB.
+   * @param version
+   * @throws IOException
+   */
+  public void rangerServiceVersionSync(long version) throws IOException {

Review comment:
       Why should this be exposed to the client? This seems internal to OM like the purge keys request.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMRangerServiceVersionSyncRequest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.request.s3.tenant;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.RESOURCE_BUSY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.multitenant.AccessPolicy;
+import org.apache.hadoop.ozone.om.multitenant.Tenant;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMRangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/*
+ * This request is issued by the RangerSync Background thread to update the
+ * OzoneServiceVersion as read from the Ranger during the  most up-to-date
+ * ranger-to-OMDB sync operation.
+ */
+
+/**
+ * Handles OMRangerServiceVersionSync request.
+ */
+public class OMRangerServiceVersionSyncRequest extends OMClientRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerServiceVersionSyncRequest.class);
+
+  public OMRangerServiceVersionSyncRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+
+    final RangerServiceVersionSyncRequest request =
+        getOmRequest().getRangerServiceVersionSyncRequest();
+    final long proposedVersion = request.getRangerServiceVersion();
+
+    if (!ozoneManager.getMultiTenantManager()
+        .tryAcquireInProgressMtOp(WAIT_MILISECONDS)) {
+      throw new OMException("Only One MultiTenant operation allowed at a " +
+          "time", RESOURCE_BUSY);
+    }
+
+    final OMRequest.Builder omRequestBuilder = getOmRequest().toBuilder()
+        .setRangerServiceVersionSyncRequest(
+            RangerServiceVersionSyncRequest.newBuilder()
+                .setRangerServiceVersion(proposedVersion))
+        // TODO: Can the three lines below be ignored?
+        .setUserInfo(getUserInfo())
+        .setCmdType(getOmRequest().getCmdType())
+        .setClientId(getOmRequest().getClientId());
+
+    if (getOmRequest().hasTraceID()) {
+      omRequestBuilder.setTraceID(getOmRequest().getTraceID());
+    }
+
+    return omRequestBuilder.build();
+
+  }
+
+  @Override
+  public void handleRequestFailure(OzoneManager ozoneManager) {
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(
+      OzoneManager ozoneManager, long transactionLogIndex,
+      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
+
+    OMClientResponse omClientResponse = null;
+    final OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    final RangerServiceVersionSyncRequest request
+        = getOmRequest().getRangerServiceVersionSyncRequest();
+    final long proposedVersion = request.getRangerServiceVersion();
+    Exception exception = null;
+
+    try {
+      omMetadataManager.getOmRangerStateTable().addCacheEntry(
+          new CacheKey<>(OmMetadataManagerImpl.RangerOzoneServiceVersionKey),
+          new CacheValue<>(Optional.of(proposedVersion), transactionLogIndex));
+      omResponse.setRangerServiceVersionSyncResponse(
+          RangerServiceVersionSyncResponse.newBuilder().build()
+      );
+
+      omClientResponse = new OMRangerServiceVersionSyncResponse(
+          omResponse.build(), proposedVersion,
+          OmMetadataManagerImpl.RangerOzoneServiceVersionKey);
+
+    } catch (Exception ex) {

Review comment:
       I don't think anything in this method is throwing as it is just a cache update.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;

Review comment:
       This isn't being read anywhere. Is there some future use planned for this field?

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");
+
+        executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB);
+
+        if (currentOzoneServiceVersionInOMDB !=
+            proposedOzoneServiceVersionInOMDB) {
+          // Submit Ratis Request to sync the new ozone service version in OMDB
+          setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB);
+        }
+        while (!ozoneManager.getMultiTenantManager()
+            .tryAcquireInProgressMtOp(WAIT_MILI)) {
+          sleep(10);
+        }
+        currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB;
+        proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage() );
+    }finally {
+      ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+    }
+  }
+
+  public long getRangerServiceVersion() throws Exception {
+    return authorizer.getCurrentOzoneServiceVersion(ozoneServiceId);
+  }
+
+  public void setOmdbRangerServiceVersion(long version) throws IOException {

Review comment:
       It looks like this and many of the methods below (that aren't used for tests) can be private. I think that would be preferred to avoid exposing write methods.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
##########
@@ -574,6 +582,12 @@ protected void initializeOmTables() throws IOException {
         String.class, OmDBTenantInfo.class);
     checkTableStatus(tenantStateTable, TENANT_STATE_TABLE);
 
+    // String("RangerOzoneServiceVersion")> Long (current service version
+    // in Ranger)
+    rangerStateTable = this.store.getTable(RANGER_STATE_TABLE,

Review comment:
       Does this work? I don't see an entry in the OMDBDefinition for this table.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");
+
+        executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB);
+
+        if (currentOzoneServiceVersionInOMDB !=
+            proposedOzoneServiceVersionInOMDB) {
+          // Submit Ratis Request to sync the new ozone service version in OMDB
+          setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB);
+        }
+        while (!ozoneManager.getMultiTenantManager()
+            .tryAcquireInProgressMtOp(WAIT_MILI)) {
+          sleep(10);
+        }
+        currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB;
+        proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage() );

Review comment:
       Also once we switch to using the `RangerRestMultiTenantAccessController` there should only be a small set of checked exceptions which we will need to declare caught here.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMultiTenantManagerImpl.java
##########
@@ -637,4 +641,18 @@ public void loadUsersFromDB() {
   Map<String, CachedTenantInfo> getTenantCache() {
     return tenantCache;
   }
+
+  @Override
+  public boolean tryAcquireInProgressMtOp(long milli) {
+    try {
+      return inProgressMtOp.tryAcquire(milli, TimeUnit.MILLISECONDS);

Review comment:
       Any advantage to each call being able to pass their own timeout, vs. having one shared timeout as a config key?

##########
File path: hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
##########
@@ -315,4 +315,8 @@ private OMConfigKeys() {
   public static final String OZONE_RANGER_SERVICE =
       "ozone.om.ranger.service";
 
+  public static final String OZONE_OM_RANGER_SYNC_INTERVAL
+      = "ozone.om.ranger.sync.interval";
+  public static final int OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT
+      = 600;

Review comment:
       Can we make this a `TimeDuration` object? That looks to be the preferred type for existing time related configs in this class.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMRangerServiceVersionSyncRequest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.request.s3.tenant;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.RESOURCE_BUSY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.multitenant.AccessPolicy;
+import org.apache.hadoop.ozone.om.multitenant.Tenant;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMRangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/*
+ * This request is issued by the RangerSync Background thread to update the
+ * OzoneServiceVersion as read from the Ranger during the  most up-to-date
+ * ranger-to-OMDB sync operation.
+ */
+
+/**
+ * Handles OMRangerServiceVersionSync request.
+ */
+public class OMRangerServiceVersionSyncRequest extends OMClientRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerServiceVersionSyncRequest.class);
+
+  public OMRangerServiceVersionSyncRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+
+    final RangerServiceVersionSyncRequest request =
+        getOmRequest().getRangerServiceVersionSyncRequest();
+    final long proposedVersion = request.getRangerServiceVersion();
+
+    if (!ozoneManager.getMultiTenantManager()
+        .tryAcquireInProgressMtOp(WAIT_MILISECONDS)) {
+      throw new OMException("Only One MultiTenant operation allowed at a " +
+          "time", RESOURCE_BUSY);
+    }
+
+    final OMRequest.Builder omRequestBuilder = getOmRequest().toBuilder()
+        .setRangerServiceVersionSyncRequest(
+            RangerServiceVersionSyncRequest.newBuilder()
+                .setRangerServiceVersion(proposedVersion))
+        // TODO: Can the three lines below be ignored?
+        .setUserInfo(getUserInfo())
+        .setCmdType(getOmRequest().getCmdType())
+        .setClientId(getOmRequest().getClientId());
+
+    if (getOmRequest().hasTraceID()) {
+      omRequestBuilder.setTraceID(getOmRequest().getTraceID());
+    }
+
+    return omRequestBuilder.build();
+
+  }
+
+  @Override
+  public void handleRequestFailure(OzoneManager ozoneManager) {
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")
+  public OMClientResponse validateAndUpdateCache(
+      OzoneManager ozoneManager, long transactionLogIndex,
+      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
+
+    OMClientResponse omClientResponse = null;
+    final OMResponse.Builder omResponse =
+        OmResponseUtil.getOMResponseBuilder(getOmRequest());
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    final RangerServiceVersionSyncRequest request
+        = getOmRequest().getRangerServiceVersionSyncRequest();
+    final long proposedVersion = request.getRangerServiceVersion();
+    Exception exception = null;
+
+    try {
+      omMetadataManager.getOmRangerStateTable().addCacheEntry(
+          new CacheKey<>(OmMetadataManagerImpl.RangerOzoneServiceVersionKey),
+          new CacheValue<>(Optional.of(proposedVersion), transactionLogIndex));
+      omResponse.setRangerServiceVersionSyncResponse(
+          RangerServiceVersionSyncResponse.newBuilder().build()
+      );
+
+      omClientResponse = new OMRangerServiceVersionSyncResponse(
+          omResponse.build(), proposedVersion,
+          OmMetadataManagerImpl.RangerOzoneServiceVersionKey);
+
+    } catch (Exception ex) {
+      // Prepare omClientResponse
+      omResponse.setRangerServiceVersionSyncResponse(
+          RangerServiceVersionSyncResponse.newBuilder().build());
+      omClientResponse = new OMTenantCreateResponse(
+          createErrorOMResponse(omResponse, new IOException(ex.getMessage())));
+    } finally {
+      if (omClientResponse != null) {
+        omClientResponse.setFlushFuture(ozoneManagerDoubleBufferHelper
+            .add(omClientResponse, transactionLogIndex));
+      }

Review comment:
       Looks like we could use `addResponseToDoubleBuffer` instead of these lines.

##########
File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/multitenant/TestMultiTenantVolume.java
##########
@@ -241,4 +244,20 @@ private static ObjectStore getStoreForAccessID(String accessID)
         new S3Auth("unused1", "unused2", accessID, accessID));
     return new ObjectStore(conf, client);
   }
+
+  @Test
+  public void testBGSync() throws Exception {
+    final int TRIAL_VERSION = 10;
+
+    // Default client not belonging to a tenant should end up in the S3 volume.
+    ObjectStore store = cluster.getClient().getObjectStore();
+    store.rangerServiceVersionSync(TRIAL_VERSION);
+    long version =
+        cluster.getOzoneManager().getMetadataManager()
+            .getOmRangerStateTable()
+            .get(OmMetadataManagerImpl.RangerOzoneServiceVersionKey);
+    if (version != TRIAL_VERSION) {

Review comment:
       ```suggestion
   Assert.assertEquals(TRIAL_VERSION, version);
   ```

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;

Review comment:
       Looks the above two vars are only used in `executeOneRangerSyncCycle`. Making them local would make that method easier to reason about.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {

Review comment:
       It might be better to extend the standard `BackgroundService` abstract class for jobs that run on the leader. See `KeyDeletingService` for an example. This will handle constructing the thread pool, run interval, and timeout from values we provide.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMRangerServiceVersionSyncRequest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.request.s3.tenant;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.RESOURCE_BUSY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.multitenant.AccessPolicy;
+import org.apache.hadoop.ozone.om.multitenant.Tenant;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMRangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/*
+ * This request is issued by the RangerSync Background thread to update the
+ * OzoneServiceVersion as read from the Ranger during the  most up-to-date
+ * ranger-to-OMDB sync operation.
+ */
+
+/**
+ * Handles OMRangerServiceVersionSync request.
+ */
+public class OMRangerServiceVersionSyncRequest extends OMClientRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerServiceVersionSyncRequest.class);
+
+  public OMRangerServiceVersionSyncRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+
+    final RangerServiceVersionSyncRequest request =
+        getOmRequest().getRangerServiceVersionSyncRequest();
+    final long proposedVersion = request.getRangerServiceVersion();
+
+    if (!ozoneManager.getMultiTenantManager()
+        .tryAcquireInProgressMtOp(WAIT_MILISECONDS)) {
+      throw new OMException("Only One MultiTenant operation allowed at a " +

Review comment:
       Why is this lock necessary? This request does not contact Ranger, it persists the version to the DB. This DB version is not used by any tenant requests.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMRangerServiceVersionSyncRequest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.request.s3.tenant;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.RESOURCE_BUSY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.multitenant.AccessPolicy;
+import org.apache.hadoop.ozone.om.multitenant.Tenant;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMRangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/*
+ * This request is issued by the RangerSync Background thread to update the
+ * OzoneServiceVersion as read from the Ranger during the  most up-to-date
+ * ranger-to-OMDB sync operation.
+ */
+
+/**
+ * Handles OMRangerServiceVersionSync request.
+ */
+public class OMRangerServiceVersionSyncRequest extends OMClientRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerServiceVersionSyncRequest.class);
+
+  public OMRangerServiceVersionSyncRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+
+    final RangerServiceVersionSyncRequest request =
+        getOmRequest().getRangerServiceVersionSyncRequest();
+    final long proposedVersion = request.getRangerServiceVersion();
+
+    if (!ozoneManager.getMultiTenantManager()
+        .tryAcquireInProgressMtOp(WAIT_MILISECONDS)) {
+      throw new OMException("Only One MultiTenant operation allowed at a " +
+          "time", RESOURCE_BUSY);
+    }
+
+    final OMRequest.Builder omRequestBuilder = getOmRequest().toBuilder()

Review comment:
       Why do we need to rebuild the existing request? It does not look like we are adding new information here.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;

Review comment:
       Were the above two fields supposed to be final?

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;

Review comment:
       Why package private for this field?

##########
File path: hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/multitenant/AccessPolicy.java
##########
@@ -18,6 +18,7 @@
 

Review comment:
       Why did we switch back to using this class? I thought we were using `MultiTenantAccessController` after HDDS-5942 and removing this set of classes once bg sync was complete?

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");
+
+        executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB);
+
+        if (currentOzoneServiceVersionInOMDB !=
+            proposedOzoneServiceVersionInOMDB) {
+          // Submit Ratis Request to sync the new ozone service version in OMDB
+          setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB);
+        }
+        while (!ozoneManager.getMultiTenantManager()
+            .tryAcquireInProgressMtOp(WAIT_MILI)) {
+          sleep(10);
+        }
+        currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB;
+        proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage() );

Review comment:
       Let's pass the exception directly as the second parameter to preserve the stack trace. Our future selves will thank us.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/s3/tenant/OMRangerServiceVersionSyncRequest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.request.s3.tenant;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.RESOURCE_BUSY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TENANT_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.USER_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.VOLUME_LOCK;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmDBTenantInfo;
+import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
+import org.apache.hadoop.ozone.om.multitenant.AccessPolicy;
+import org.apache.hadoop.ozone.om.multitenant.Tenant;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.request.volume.OMVolumeRequest;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMRangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.om.response.s3.tenant.OMTenantCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RangerServiceVersionSyncResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+
+/*
+ * This request is issued by the RangerSync Background thread to update the
+ * OzoneServiceVersion as read from the Ranger during the  most up-to-date
+ * ranger-to-OMDB sync operation.
+ */
+
+/**
+ * Handles OMRangerServiceVersionSync request.
+ */
+public class OMRangerServiceVersionSyncRequest extends OMClientRequest {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMRangerServiceVersionSyncRequest.class);
+
+  public OMRangerServiceVersionSyncRequest(OMRequest omRequest) {
+    super(omRequest);
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+
+    final RangerServiceVersionSyncRequest request =
+        getOmRequest().getRangerServiceVersionSyncRequest();
+    final long proposedVersion = request.getRangerServiceVersion();
+
+    if (!ozoneManager.getMultiTenantManager()
+        .tryAcquireInProgressMtOp(WAIT_MILISECONDS)) {
+      throw new OMException("Only One MultiTenant operation allowed at a " +
+          "time", RESOURCE_BUSY);
+    }
+
+    final OMRequest.Builder omRequestBuilder = getOmRequest().toBuilder()
+        .setRangerServiceVersionSyncRequest(
+            RangerServiceVersionSyncRequest.newBuilder()
+                .setRangerServiceVersion(proposedVersion))
+        // TODO: Can the three lines below be ignored?
+        .setUserInfo(getUserInfo())
+        .setCmdType(getOmRequest().getCmdType())
+        .setClientId(getOmRequest().getClientId());
+
+    if (getOmRequest().hasTraceID()) {
+      omRequestBuilder.setTraceID(getOmRequest().getTraceID());
+    }
+
+    return omRequestBuilder.build();
+
+  }
+
+  @Override
+  public void handleRequestFailure(OzoneManager ozoneManager) {
+  }
+
+  @Override
+  @SuppressWarnings("methodlength")

Review comment:
       I think this was copied from another method. Luckily this validateAndUpdateCache is not too big 😃 

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {

Review comment:
       Also, I don't see any usages of this class outside of tests. Is the integration with the OM code finished?

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.

Review comment:
       I don't understand why we are locking here. Requests should not be updating the DB value. Ranger's version is free to change due to other modifications we did not make.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");
+
+        executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB);
+
+        if (currentOzoneServiceVersionInOMDB !=
+            proposedOzoneServiceVersionInOMDB) {
+          // Submit Ratis Request to sync the new ozone service version in OMDB
+          setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB);
+        }
+        while (!ozoneManager.getMultiTenantManager()
+            .tryAcquireInProgressMtOp(WAIT_MILI)) {
+          sleep(10);
+        }
+        currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB;
+        proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage() );
+    }finally {
+      ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+    }
+  }
+
+  public long getRangerServiceVersion() throws Exception {
+    return authorizer.getCurrentOzoneServiceVersion(ozoneServiceId);
+  }
+
+  public void setOmdbRangerServiceVersion(long version) throws IOException {
+    // OMDB update goes through RATIS
+    ozoneClient.getObjectStore().rangerServiceVersionSync(version);

Review comment:
       This isn't something we want exposed to the client. See `KeyDeletingService` for an example of submitting a ratis request from the server directly:
   ```java
           RaftClientRequest raftClientRequest =
               createRaftClientRequestForPurge(omRequest);
           ozoneManager.getOmRatisServer().submitRequest(omRequest,
               raftClientRequest);
   
   ```

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");

Review comment:
       I think info would be a better fit, and maybe add the attempt number.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");
+
+        executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB);
+
+        if (currentOzoneServiceVersionInOMDB !=
+            proposedOzoneServiceVersionInOMDB) {
+          // Submit Ratis Request to sync the new ozone service version in OMDB
+          setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB);
+        }
+        while (!ozoneManager.getMultiTenantManager()
+            .tryAcquireInProgressMtOp(WAIT_MILI)) {
+          sleep(10);
+        }
+        currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB;
+        proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage() );
+    }finally {
+      ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+    }
+  }
+
+  public long getRangerServiceVersion() throws Exception {
+    return authorizer.getCurrentOzoneServiceVersion(ozoneServiceId);
+  }
+
+  public void setOmdbRangerServiceVersion(long version) throws IOException {
+    // OMDB update goes through RATIS
+    ozoneClient.getObjectStore().rangerServiceVersionSync(version);
+  }
+
+  public long getOmdbRangerServiceVersion() {
+    long lastKnownVersion = 0;
+    try {
+      lastKnownVersion =
+          ozoneManager.getMetadataManager().getOmRangerStateTable()
+              .get(OmMetadataManagerImpl.RangerOzoneServiceVersionKey);
+    } catch (Exception ex) {
+      return 0;

Review comment:
       I think we should propagate this IOException. The service's current iteration will abort and log a warning. I'm not sure we want the service still running with 0 as a placeholder.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/s3/tenant/OMRangerServiceVersionSyncResponse.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.response.s3.tenant;
+
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.RANGER_STATE_TABLE;
+
+import java.io.IOException;
+
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Response for OMRangerServiceVersionSync request.
+ */
+@CleanupTableInfo(cleanupTables = {RANGER_STATE_TABLE})
+public class OMRangerServiceVersionSyncResponse extends OMClientResponse {
+  private long newServiceVersion;
+  private String serviceVersionkey;
+
+  public OMRangerServiceVersionSyncResponse(@Nonnull OMResponse omResponse,
+                                            @Nonnull long proposedVersion,
+                                            @Nonnull String key
+  ) {
+    super(omResponse);
+    this.newServiceVersion = proposedVersion;
+    this.serviceVersionkey = key;
+  }
+
+  /**
+   * For when the request is not successful.
+   * For a successful request, the other constructor should be used.
+   */
+  public OMRangerServiceVersionSyncResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    omMetadataManager.getOmRangerStateTable().putWithBatch(
+        batchOperation, serviceVersionkey,
+        new Long(newServiceVersion));

Review comment:
       nit. This will get auto boxed so we don't need `new Long...`

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");
+
+        executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB);
+
+        if (currentOzoneServiceVersionInOMDB !=
+            proposedOzoneServiceVersionInOMDB) {
+          // Submit Ratis Request to sync the new ozone service version in OMDB
+          setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB);
+        }
+        while (!ozoneManager.getMultiTenantManager()
+            .tryAcquireInProgressMtOp(WAIT_MILI)) {
+          sleep(10);
+        }
+        currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB;
+        proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage() );
+    }finally {
+      ozoneManager.getMultiTenantManager().resetInProgressMtOpState();

Review comment:
       There's no check if we are already holding the semaphore here. Depending on when the exception is thrown or return is called, we could have drained someone else's permits.
   
   If we are not planning on increasing number of concurrent Ranger ops beyond 1 (I don't see how we could do this), a 0/1 semaphore is largely equivalent to a lock, except more dangerous because you can accidentally release another thread's acquisition without an exception as seen here. For this reason I think we should use a normal lock instead.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,480 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static final int WAIT_MILI = 1000;
+  private static final int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  private long rangerBGSyncCounter = 0;
+  private long currentOzoneServiceVersionInOMDB;
+  private long proposedOzoneServiceVersionInOMDB;
+  private static int ozoneServiceId;
+
+  private MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole {
+    private String name;
+    private String id;
+    private HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    public void addId(String i) {
+      this.id = i;
+    }
+
+    public void addUsers(String u) {
+      users.add(u);
+    }
+
+    public HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return (this.hashCode() == obj.hashCode());
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  private ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  private ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  private ConcurrentHashMap<String, BGRole> mtRangerRoles =
+      new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  private ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  private long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");
+
+        executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB);
+
+        if (currentOzoneServiceVersionInOMDB !=
+            proposedOzoneServiceVersionInOMDB) {
+          // Submit Ratis Request to sync the new ozone service version in OMDB
+          setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB);
+        }
+        while (!ozoneManager.getMultiTenantManager()
+            .tryAcquireInProgressMtOp(WAIT_MILI)) {
+          sleep(10);
+        }
+        currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB;
+        proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage());
+    } finally {
+      ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+    }
+  }
+
+  public long getRangerServiceVersion() throws Exception {
+    return authorizer.getCurrentOzoneServiceVersion(ozoneServiceId);
+  }
+
+  public void setOmdbRangerServiceVersion(long version) throws IOException {
+    // OMDB update goes through RATIS
+    ozoneClient.getObjectStore().rangerServiceVersionSync(version);
+  }
+
+  public long getOmdbRangerServiceVersion() {
+    long lastKnownVersion = 0;
+    try {
+      lastKnownVersion =
+          ozoneManager.getMetadataManager().getOmRangerStateTable()
+              .get(OmMetadataManagerImpl.RANGER_OZONE_SERVICE_VERSION_KEY);
+    } catch (Exception ex) {
+      return 0;
+    }
+    return lastKnownVersion;
+  }
+
+  private void executeOmdbToRangerSync(long baseVersion) throws Exception {
+    ++rangerBGSyncCounter;
+    cleanupSyncState();

Review comment:
       We need locking here to prevent undoing in progress requests. Something like:
   ```
   // Get a consistent snapshot of what tenants exist.
   lock()
   get tenants/policies from Ranger
   get tenants/policies from OM
   unlock()
   
   for each tenant from OM:
     lock(tenant)
     Update Ranger if needed with policies and their corresponding roles for tenant.
     unlock(tenant)
   
   for each tenant/policy from Ranger:
     lock(tenant)
     delete policy and its roles from Ranger if not in OM
     unlock(tenant)
   ```
   The global lock could be used in place of a tenant specific lock, but this may lead to unnecessary starvation.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");
+
+        executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB);
+
+        if (currentOzoneServiceVersionInOMDB !=
+            proposedOzoneServiceVersionInOMDB) {
+          // Submit Ratis Request to sync the new ozone service version in OMDB
+          setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB);
+        }
+        while (!ozoneManager.getMultiTenantManager()
+            .tryAcquireInProgressMtOp(WAIT_MILI)) {
+          sleep(10);
+        }
+        currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB;
+        proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage() );
+    }finally {
+      ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+    }
+  }
+
+  public long getRangerServiceVersion() throws Exception {
+    return authorizer.getCurrentOzoneServiceVersion(ozoneServiceId);
+  }
+
+  public void setOmdbRangerServiceVersion(long version) throws IOException {
+    // OMDB update goes through RATIS
+    ozoneClient.getObjectStore().rangerServiceVersionSync(version);
+  }
+
+  public long getOmdbRangerServiceVersion() {
+    long lastKnownVersion = 0;
+    try {
+      lastKnownVersion =
+          ozoneManager.getMetadataManager().getOmRangerStateTable()
+              .get(OmMetadataManagerImpl.RangerOzoneServiceVersionKey);
+    } catch (Exception ex) {
+      return 0;
+    }
+    return lastKnownVersion;
+  }
+
+  private void executeOmdbToRangerSync(long baseVersion) throws Exception {
+    ++rangerBGSyncCounter;
+    cleanupSyncState();
+    loadAllPoliciesRolesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOMDB();
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    //
+    processAllRolesFromOMDB();
+  }
+
+  public void cleanupSyncState() {
+    mtRangerRoles.clear();
+    mtRangerPoliciesTobeDeleted.clear();
+    mtRangerPoliciesTobeCreated.clear();
+    mtOMDBRoles.clear();
+  }
+
+  public void loadAllPoliciesRolesFromRanger(long baseVersion)
+      throws Exception {
+    // TODO: incremental policies API is broken. We are getting all the
+    //  multitenant policies using Ranger labels.

Review comment:
       This could actually be a preferable way to do it. Just use the Ranger version to quick check if we need to run, but use labels instead of incremental updates to avoid fetching all the policies. If we have tested that this Ranger API is in fact only giving us labelled policies, I am +1 for this current approach.

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");
+
+        executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB);
+
+        if (currentOzoneServiceVersionInOMDB !=
+            proposedOzoneServiceVersionInOMDB) {
+          // Submit Ratis Request to sync the new ozone service version in OMDB
+          setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB);
+        }
+        while (!ozoneManager.getMultiTenantManager()
+            .tryAcquireInProgressMtOp(WAIT_MILI)) {
+          sleep(10);
+        }
+        currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB;
+        proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage() );
+    }finally {
+      ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+    }
+  }
+
+  public long getRangerServiceVersion() throws Exception {
+    return authorizer.getCurrentOzoneServiceVersion(ozoneServiceId);
+  }
+
+  public void setOmdbRangerServiceVersion(long version) throws IOException {
+    // OMDB update goes through RATIS
+    ozoneClient.getObjectStore().rangerServiceVersionSync(version);
+  }
+
+  public long getOmdbRangerServiceVersion() {
+    long lastKnownVersion = 0;
+    try {
+      lastKnownVersion =
+          ozoneManager.getMetadataManager().getOmRangerStateTable()
+              .get(OmMetadataManagerImpl.RangerOzoneServiceVersionKey);
+    } catch (Exception ex) {
+      return 0;
+    }
+    return lastKnownVersion;
+  }
+
+  private void executeOmdbToRangerSync(long baseVersion) throws Exception {
+    ++rangerBGSyncCounter;
+    cleanupSyncState();
+    loadAllPoliciesRolesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOMDB();
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    //
+    processAllRolesFromOMDB();
+  }
+
+  public void cleanupSyncState() {
+    mtRangerRoles.clear();
+    mtRangerPoliciesTobeDeleted.clear();
+    mtRangerPoliciesTobeCreated.clear();
+    mtOMDBRoles.clear();
+  }
+
+  public void loadAllPoliciesRolesFromRanger(long baseVersion)
+      throws Exception {
+    // TODO: incremental policies API is broken. We are getting all the
+    //  multitenant policies using Ranger labels.
+    String allPolicies = authorizer.getAllMultiTenantPolicies(ozoneServiceId);
+    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
+    lastRangerPolicyLoadTime = jObject.get("queryTimeMS").getAsLong();
+    JsonArray policyArray = jObject.getAsJsonArray("policies");
+    for (int i = 0; i < policyArray.size(); ++i) {
+      JsonObject newPolicy = policyArray.get(i).getAsJsonObject();
+      if (!newPolicy.getAsJsonArray("policyLabels").get(0)
+          .getAsString().equals("OzoneMultiTenant")) {
+        LOG.warn("Apache Ranger BG Sync: received non MultiTenant policy");

Review comment:
       Can we add the policy name to this message for easier debugging?

##########
File path: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/multitenant/OMRangerBGSync.java
##########
@@ -0,0 +1,475 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.multitenant;
+
+import static java.lang.Thread.sleep;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OmDBAccessIdInfo;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ConcurrentHashMultiset;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+
+/**
+ * Background Sync thread that reads Multitnancy state from OMDB
+ * and applies it to Ranger.
+ */
+public class OMRangerBGSync implements Runnable, Closeable {
+
+  private OzoneManager ozoneManager;
+  private OzoneClient ozoneClient;
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(OMRangerBGSync.class);
+  private static int WAIT_MILI = 1000;
+  private static int MAX_ATTEMPT = 2;
+
+  /**
+   * ExecutorService used for scheduling sync operation.
+   */
+  private final ScheduledExecutorService executorService;
+  private ScheduledFuture<?> rangerSyncFuture;
+  private final int rangerSyncInterval;
+
+  long rangerBGSyncCounter = 0;
+  long currentOzoneServiceVersionInOMDB;
+  long proposedOzoneServiceVersionInOMDB;
+  public static int ozoneServiceId;
+
+  MultiTenantAccessAuthorizerRangerPlugin authorizer;
+
+  class BGRole{
+    String name;
+    String id;
+    HashSet<String> users;
+
+    BGRole(String n) {
+      this.name = n;
+      users = new HashSet<>();
+    }
+
+    void addId(String i) {
+      this.id =i;
+    }
+
+    void addUsers(String u) {
+      users.add(u);
+    }
+
+    HashSet<String> getUsers() {
+      return users;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+  }
+
+  // we will track all the policies in Ranger here. After we have
+  // processed all the policies from OMDB, this map will
+  // be left with policies that we need to delete.
+  // Its a map of Policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeDeleted =
+      new ConcurrentHashMap<>();
+
+  // This map will be used to keep all the policies that are found in
+  // OMDB and should have been in Ranger. Currently, we are only printing such
+  // policyID. This can result if a tenant is deleted but the system
+  // crashed. Its an easy recovery to retry the "tenant delete" operation.
+  // Its a map of policy ID to policy names
+  ConcurrentHashMap<String, String> mtRangerPoliciesTobeCreated =
+      new ConcurrentHashMap<>();
+
+  // This map will keep all the Multiotenancy related roles from Ranger.
+  ConcurrentHashMap<String, BGRole> mtRangerRoles = new ConcurrentHashMap<>();
+
+  // keep OMDB mapping of Roles -> list of user principals.
+  ConcurrentHashMap<String, HashSet<String>> mtOMDBRoles =
+      new ConcurrentHashMap<>();
+
+  // Every BG ranger sync cycle we update this
+  long lastRangerPolicyLoadTime;
+
+  public OMRangerBGSync(OzoneManager om) throws Exception {
+    try {
+      ozoneManager = om;
+      authorizer = new MultiTenantAccessAuthorizerRangerPlugin();
+      authorizer.init(om.getConfiguration());
+      ozoneClient =
+          OzoneClientFactory.getRpcClient(ozoneManager.getConfiguration());
+      executorService = HadoopExecutors.newScheduledThreadPool(1,
+          new ThreadFactoryBuilder().setDaemon(true)
+              .setNameFormat("OM Ranger Sync Thread - %d").build());
+      rangerSyncInterval =
+          ozoneManager.getConfiguration().getInt(OZONE_OM_RANGER_SYNC_INTERVAL,
+              OZONE_OM_RANGER_SYNC_INTERVAL_DEFAULT);
+      scheduleNextRangerSync();
+      ozoneServiceId = authorizer.getOzoneServiceId();
+    } catch (Exception e) {
+      LOG.warn("Failed to Initialize Ranger Background Sync");
+      throw e;
+    }
+  }
+
+  public int getOzoneServiceId() throws Exception {
+    return ozoneServiceId;
+  }
+
+  public int getRangerSyncInterval() throws Exception {
+    return rangerSyncInterval;
+  }
+
+  @Override
+  public void run() {
+    try {
+      Thread.sleep(10);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+    try {
+      if (ozoneManager.isLeaderReady()) {
+        executeOneRangerSyncCycle();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during OM Ranger Background Sync." + e.getMessage());
+    } finally {
+      // Lets Schedule the next cycle now. We do not deliberaty schedule at
+      // fixed interval to account for ranger sync processing time.
+      scheduleNextRangerSync();
+    }
+  }
+
+  private void scheduleNextRangerSync() {
+
+    if (!Thread.currentThread().isInterrupted() &&
+        !executorService.isShutdown()) {
+      rangerSyncFuture = executorService.schedule(this,
+          rangerSyncInterval, TimeUnit.SECONDS);
+    } else {
+      LOG.warn("Current Thread is interrupted, shutting down Ranger Sync " +
+          "processing thread for Ozone MultiTenant Manager.");
+    }
+  }
+
+  @Override
+  public void close() {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown OM Ranger Background Sync properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private void executeOneRangerSyncCycle() {
+    int attempt = 0;
+    try {
+      // Taking the lock only makes sure that while we are reading
+      // the current Ozone service version, another multitenancy
+      // request is not changing it. We can drop the lock after that.
+      while (!ozoneManager.getMultiTenantManager()
+          .tryAcquireInProgressMtOp(WAIT_MILI)) {
+        sleep(10);
+      }
+      currentOzoneServiceVersionInOMDB = getOmdbRangerServiceVersion();
+      proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      while (currentOzoneServiceVersionInOMDB !=
+          proposedOzoneServiceVersionInOMDB) {
+        if (++attempt > MAX_ATTEMPT) {
+          break;
+        }
+        ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+        if (!ozoneManager.isLeaderReady()) {
+          return;
+        }
+        LOG.warn("Executing Ranger Sync Cycle.");
+
+        executeOmdbToRangerSync(currentOzoneServiceVersionInOMDB);
+
+        if (currentOzoneServiceVersionInOMDB !=
+            proposedOzoneServiceVersionInOMDB) {
+          // Submit Ratis Request to sync the new ozone service version in OMDB
+          setOmdbRangerServiceVersion(proposedOzoneServiceVersionInOMDB);
+        }
+        while (!ozoneManager.getMultiTenantManager()
+            .tryAcquireInProgressMtOp(WAIT_MILI)) {
+          sleep(10);
+        }
+        currentOzoneServiceVersionInOMDB = proposedOzoneServiceVersionInOMDB;
+        proposedOzoneServiceVersionInOMDB = getRangerServiceVersion();
+      }
+    } catch (Exception e) {
+      LOG.warn("Exception during a Ranger Sync Cycle. " + e.getMessage() );
+    }finally {
+      ozoneManager.getMultiTenantManager().resetInProgressMtOpState();
+    }
+  }
+
+  public long getRangerServiceVersion() throws Exception {
+    return authorizer.getCurrentOzoneServiceVersion(ozoneServiceId);
+  }
+
+  public void setOmdbRangerServiceVersion(long version) throws IOException {
+    // OMDB update goes through RATIS
+    ozoneClient.getObjectStore().rangerServiceVersionSync(version);
+  }
+
+  public long getOmdbRangerServiceVersion() {
+    long lastKnownVersion = 0;
+    try {
+      lastKnownVersion =
+          ozoneManager.getMetadataManager().getOmRangerStateTable()
+              .get(OmMetadataManagerImpl.RangerOzoneServiceVersionKey);
+    } catch (Exception ex) {
+      return 0;
+    }
+    return lastKnownVersion;
+  }
+
+  private void executeOmdbToRangerSync(long baseVersion) throws Exception {
+    ++rangerBGSyncCounter;
+    cleanupSyncState();
+    loadAllPoliciesRolesFromRanger(baseVersion);
+    loadAllRolesFromRanger();
+    loadAllRolesFromOMDB();
+
+    // This should isolate policies into two groups
+    // 1. mtRangerPoliciesTobeDeleted and
+    // 2. mtRangerPoliciesTobeCreated
+    processAllPoliciesFromOMDB();
+
+    // This should isolate roles that need fixing into a list of
+    // roles that need to be replayed back into ranger to get in sync with OMDB.
+    //
+    processAllRolesFromOMDB();
+  }
+
+  public void cleanupSyncState() {
+    mtRangerRoles.clear();
+    mtRangerPoliciesTobeDeleted.clear();
+    mtRangerPoliciesTobeCreated.clear();
+    mtOMDBRoles.clear();
+  }
+
+  public void loadAllPoliciesRolesFromRanger(long baseVersion)
+      throws Exception {
+    // TODO: incremental policies API is broken. We are getting all the
+    //  multitenant policies using Ranger labels.
+    String allPolicies = authorizer.getAllMultiTenantPolicies(ozoneServiceId);
+    JsonObject jObject = new JsonParser().parse(allPolicies).getAsJsonObject();
+    lastRangerPolicyLoadTime = jObject.get("queryTimeMS").getAsLong();
+    JsonArray policyArray = jObject.getAsJsonArray("policies");
+    for (int i = 0; i < policyArray.size(); ++i) {
+      JsonObject newPolicy = policyArray.get(i).getAsJsonObject();
+      if (!newPolicy.getAsJsonArray("policyLabels").get(0)
+          .getAsString().equals("OzoneMultiTenant")) {
+        LOG.warn("Apache Ranger BG Sync: received non MultiTenant policy");
+        continue;
+      }
+      mtRangerPoliciesTobeDeleted.put(newPolicy.get("id").getAsString(),
+          newPolicy.get("name").getAsString());
+      JsonArray policyItems = newPolicy
+          .getAsJsonArray("policyItems");
+      for (int j = 0; j < policyItems.size(); ++j) {
+        JsonObject policy = policyItems.get(j).getAsJsonObject();
+        JsonArray roles = policy.getAsJsonArray("roles");
+        for (int k = 0; k < roles.size(); ++k) {
+          if(!mtRangerRoles.containsKey(roles.get(k).getAsString())) {
+            // We only get the role name here. We need to query and populate it.
+            mtRangerRoles.put(roles.get(k).getAsString(),
+            new BGRole(roles.get(k).getAsString()));
+          }
+        }
+      }
+    }
+  }
+
+  public void loadAllRolesFromRanger() throws Exception {
+    for (String rolename: mtRangerRoles.keySet()) {
+      String roleDataString = authorizer.getRole(rolename);
+      JsonObject roleObject =
+          new JsonParser().parse(roleDataString).getAsJsonObject();
+      BGRole role = mtRangerRoles.get(rolename);
+      role.addId(roleObject.get("id").getAsString());
+      JsonArray userArray = roleObject.getAsJsonArray("users");
+      for (int i =0; i< userArray.size(); ++i) {
+        role.addUsers(userArray.get(i).getAsJsonObject().get("name")
+            .getAsString());
+      }
+    }
+  }
+
+  public void processAllPoliciesFromOMDB() throws Exception {
+    TableIterator<String, ? extends Table.KeyValue<String, String>>
+        policyTableIterator =
+        ozoneManager.getMetadataManager().getTenantPolicyTable().iterator();
+    while (policyTableIterator.hasNext()) {
+      Table.KeyValue<String, String> keyValue = policyTableIterator.next();
+      // Value in the table is a list of policy Ids separated by ",". Please
+      // also see OMTenantCreateRequest.java
+      String[] omDBPolicies = keyValue.getValue().split(",");
+
+      for (String omDBPolicy: omDBPolicies) {
+        if (mtRangerPoliciesTobeDeleted.containsKey(omDBPolicy)) {
+          // This entry is in sync with ranger, remove it from the set
+          // Eventually mtRangerPolicies will only contain entries that
+          // are not in OMDB and should be removed from Ranger.
+          mtRangerPoliciesTobeDeleted.remove(omDBPolicy);
+        } else {
+          // We could not find a policy in ranger that should have been there.
+          mtRangerPoliciesTobeCreated.put(omDBPolicy, null);
+        }
+      }
+    }
+
+    for (String policy: mtRangerPoliciesTobeCreated.keySet()) {
+      // TODO : Currently we are not maintaining enough information in OMDB
+      //  to recreate the policies.
+      LOG.warn("Policies not found in Ranger: " + policy);
+    }
+
+    for (String policyId: mtRangerPoliciesTobeDeleted.keySet()) {
+      // TODO : Its best to not create these poilicies automatically and the
+      //  let the user delete the tenant and recreate the tenant.
+      String policy = mtRangerPoliciesTobeDeleted.get(policyId);
+      AccessPolicy accessPolicy = authorizer.getAccessPolicyByName(policy);
+      if (lastRangerPolicyLoadTime >
+          (accessPolicy.getLastUpdateTime() + 3600 *1000)) {
+        LOG.warn("Deleting policies from Ranger: " + policy);
+        authorizer.deletePolicybyName(policy);
+        for (String deletedrole : accessPolicy.getRoleList()) {
+          authorizer.deleteRole(new JsonParser()
+              .parse(authorizer.getRole(deletedrole))
+              .getAsJsonObject().get("id").getAsString());
+        }
+      }
+    }
+  }
+
+  public void loadAllRolesFromOMDB() throws Exception {
+    // We have following info in OMDB
+    //  tenantRoleTable: accessId -> roles [admin, roleB, ...]
+    //  tenantAccessIdTable : accessId -> OmDBAccessIdInfo

Review comment:
       Potential performance improvement? We have already loaded a map of tenantId -> CachedTenantInfo (which contains all the tenant users) into memory in `OMMultiTenantManagerImpl#loadUsersFromDB`. If we store <user, accessId, roles> there instead of just <user, accessID> like we do now, we can read this from memory.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org