You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ri...@apache.org on 2022/07/14 04:30:04 UTC

[ozone] branch master updated: HDDS-6433. Refactor OMFailoverProxyProvider to provide a base for OM and GrpcOM FailoverProxyProviders (#3389)

This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 843fac2fb6 HDDS-6433. Refactor OMFailoverProxyProvider to provide a base for OM and GrpcOM FailoverProxyProviders (#3389)
843fac2fb6 is described below

commit 843fac2fb646eecfc33103fdb16eaf77a66ca062
Author: Neil Joshi <ne...@gmail.com>
AuthorDate: Wed Jul 13 22:29:59 2022 -0600

    HDDS-6433. Refactor OMFailoverProxyProvider to provide a base for OM and GrpcOM FailoverProxyProviders (#3389)
---
 .../ozone/om/ha/GrpcOMFailoverProxyProvider.java   | 102 ++++---
 .../om/ha/HadoopRpcOMFailoverProxyProvider.java    | 240 ++++++++++++++++
 ...vider.java => OMFailoverProxyProviderBase.java} | 304 +++++----------------
 .../ozone/om/protocolPB/GrpcOmTransport.java       |  27 +-
 .../ozone/om/protocolPB/Hadoop3OmTransport.java    |  20 +-
 .../protocolPB/OMAdminProtocolClientSideImpl.java  |  10 +-
 .../OMInterServiceProtocolClientSideImpl.java      |  12 +-
 .../ozone/om/ha/TestOMFailoverProxyProvider.java   |  12 +-
 .../ozone/om/protocolPB/TestS3GrpcOmTransport.java |  42 ++-
 .../client/rpc/TestOzoneRpcClientAbstract.java     |   4 +-
 .../hadoop/ozone/om/OmFailoverProxyUtil.java       |   4 +-
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |   6 +-
 .../ozone/om/TestOzoneManagerHAMetadataOnly.java   |  36 +--
 .../ozone/om/TestOzoneManagerHAWithData.java       |   6 +-
 .../ozone/om/TestOzoneManagerHAWithFailover.java   |   6 +-
 .../hadoop/ozone/om/failover/TestOMFailovers.java  |  11 +-
 .../hadoop/fs/ozone/Hadoop27RpcTransport.java      |  17 +-
 .../ozone/protocolPB/TestGrpcOmTransport.java      |  28 +-
 18 files changed, 536 insertions(+), 351 deletions(-)

diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
index 498f935974..002c2012e7 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
@@ -17,18 +17,21 @@
  */
 package org.apache.hadoop.ozone.om.ha;
 
+import io.grpc.Status;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.ConfigurationException;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.ha.ConfUtils;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -36,6 +39,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalInt;
+import io.grpc.StatusRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
@@ -48,39 +54,29 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
  * connecting to another OM node from the list of proxies.
  */
 public class GrpcOMFailoverProxyProvider<T> extends
-    OMFailoverProxyProvider<T> {
-
-  private Map<String, String> omAddresses;
+    OMFailoverProxyProviderBase<T> {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(GrpcOMFailoverProxyProvider.class);
 
   public GrpcOMFailoverProxyProvider(ConfigurationSource configuration,
-                                     UserGroupInformation ugi,
                                      String omServiceId,
                                      Class<T> protocol) throws IOException {
-    super(configuration, ugi, omServiceId, protocol);
+    super(configuration, omServiceId, protocol);
   }
 
   @Override
   protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
       throws IOException {
-    // to be used for base class omProxies,
-    // ProxyInfo not applicable for gRPC, just need key set
-    Map<String, ProxyInfo<T>> omProxiesNodeIdKeyset = new HashMap<>();
-    // to be used for base class omProxyInfos
-    // OMProxyInfo not applicable for gRPC, just need key set
-    Map<String, OMProxyInfo> omProxyInfosNodeIdKeyset = new HashMap<>();
-    List<String> omNodeIDList = new ArrayList<>();
-    omAddresses = new HashMap<>();
 
     Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId);
+    Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
+    List<String> omNodeIDList = new ArrayList<>();
 
     for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
-
       String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
           omSvcId, nodeId);
-
       Optional<String> hostaddr = getHostNameFromConfigKeys(config,
           rpcAddrKey);
-
       OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config,
           ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
               omSvcId, nodeId),
@@ -88,15 +84,15 @@ public class GrpcOMFailoverProxyProvider<T> extends
       if (nodeId == null) {
         nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
       }
-      omProxiesNodeIdKeyset.put(nodeId, null);
-      omProxyInfosNodeIdKeyset.put(nodeId, null);
       if (hostaddr.isPresent()) {
-        omAddresses.put(nodeId,
-            hostaddr.get() + ":"
-                + hostport.orElse(config
-                .getObject(GrpcOmTransport
-                    .GrpcOmTransportConfig.class)
-                .getPort()));
+        ProxyInfo<T> proxyInfo =
+            new ProxyInfo<>(createOMProxy(),
+                hostaddr.get() + ":"
+                    + hostport.orElse(config
+                    .getObject(GrpcOmTransport
+                        .GrpcOmTransportConfig.class)
+                    .getPort()));
+        omProxies.put(nodeId, proxyInfo);
       } else {
         LOG.error("expected host address not defined for: {}", rpcAddrKey);
         throw new ConfigurationException(rpcAddrKey + "is not defined");
@@ -104,37 +100,63 @@ public class GrpcOMFailoverProxyProvider<T> extends
       omNodeIDList.add(nodeId);
     }
 
-    if (omProxiesNodeIdKeyset.isEmpty()) {
+    if (omProxies.isEmpty()) {
       throw new IllegalArgumentException("Could not find any configured " +
           "addresses for OM. Please configure the system with "
           + OZONE_OM_ADDRESS_KEY);
     }
+    setOmProxies(omProxies);
+    setOmNodeIDList(omNodeIDList);
+  }
 
-    // set base class omProxies, omProxyInfos, omNodeIDList
+  private T createOMProxy() throws IOException {
+    InetSocketAddress addr = new InetSocketAddress(0);
+    Configuration hadoopConf =
+        LegacyHadoopConfigurationSource.asHadoopConfiguration(getConf());
+    return (T) RPC.getProxy(getInterface(), 0, addr, hadoopConf);
+  }
 
-    // omProxies needed in base class
-    // omProxies.size == number of om nodes
-    // omProxies key needs to be valid nodeid
-    // omProxyInfos keyset needed in base class
-    setProxies(omProxiesNodeIdKeyset, omProxyInfosNodeIdKeyset, omNodeIDList);
+  /**
+   * Get the proxy object which should be used until the next failover event
+   * occurs. RPC proxy object is intialized lazily.
+   * @return the OM proxy object to invoke methods upon
+   */
+  @Override
+  public synchronized ProxyInfo<T> getProxy() {
+    return getOMProxyMap().get(getCurrentProxyOMNodeId());
   }
 
   @Override
-  protected Text computeDelegationTokenService() {
-    return new Text();
+  protected synchronized boolean shouldFailover(Exception ex) {
+    if (ex instanceof StatusRuntimeException) {
+      StatusRuntimeException srexp = (StatusRuntimeException)ex;
+      Status status = srexp.getStatus();
+      if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
+        LOG.debug("Grpc response has invalid length, {}", srexp.getMessage());
+        return false;
+      } else if (status.getCode() == Status.Code.DATA_LOSS) {
+        LOG.debug("Grpc unrecoverable data loss or corruption, {}",
+                srexp.getMessage());
+        return false;
+      }
+    }
+    return super.shouldFailover(ex);
   }
 
+  @Override
+  public synchronized void close() throws IOException { }
+
   // need to throw if nodeID not in omAddresses
   public String getGrpcProxyAddress(String nodeId) throws IOException {
-    if (omAddresses.containsKey(nodeId)) {
-      return omAddresses.get(nodeId);
+    Map<String, ProxyInfo<T>> omProxies = getOMProxyMap();
+    if (omProxies.containsKey(nodeId)) {
+      return omProxies.get(nodeId).proxyInfo;
     } else {
-      LOG.error("expected nodeId not found in omAddresses for proxyhost {}",
+      LOG.error("expected nodeId not found in omProxies for proxyhost {}",
           nodeId);
       throw new IOException(
-          "expected nodeId not found in omAddresses for proxyhost");
+          "expected nodeId not found in omProxies for proxyhost");
     }
-
   }
 
   public List<String> getGrpcOmNodeIDList() {
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
new file mode 100644
index 0000000000..f27db4e04a
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/HadoopRpcOMFailoverProxyProvider.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ha;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+
+/**
+ * A failover proxy provider implementation which allows clients to configure
+ * multiple OMs to connect to. In case of OM failover, client can try
+ * connecting to another OM node from the list of proxies.
+ */
+public class HadoopRpcOMFailoverProxyProvider<T> extends
+      OMFailoverProxyProviderBase<T> {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(HadoopRpcOMFailoverProxyProvider.class);
+
+  private final long omVersion;
+  private final Text delegationTokenService;
+  private final UserGroupInformation ugi;
+  private Map<String, OMProxyInfo> omProxyInfos;
+  private List<String> retryExceptions = new ArrayList<>();
+
+  // HadoopRpcOMFailoverProxyProvider, on encountering certain exception,
+  // tries each OM once in a round robin fashion. After that it waits
+  // for configured time before attempting to contact all the OMs again.
+  // For other exceptions such as LeaderNotReadyException, the same OM
+  // is contacted again with a linearly increasing wait time.
+
+  public HadoopRpcOMFailoverProxyProvider(ConfigurationSource configuration,
+                                 UserGroupInformation ugi,
+                                 String omServiceId,
+                                 Class<T> protocol) throws IOException {
+    super(configuration, omServiceId, protocol);
+    this.ugi = ugi;
+    this.omVersion = RPC.getProtocolVersion(protocol);
+    this.delegationTokenService = computeDelegationTokenService();
+  }
+
+  protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
+      throws IOException {
+    Map<String, ProxyInfo<T>> omProxies = new HashMap<>();
+    this.omProxyInfos = new HashMap<>();
+    List<String> omNodeIDList = new ArrayList<>();
+
+    Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config,
+        omSvcId);
+
+    for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+
+      String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+          omSvcId, nodeId);
+      String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
+      if (rpcAddrStr == null) {
+        continue;
+      }
+
+      OMProxyInfo omProxyInfo = new OMProxyInfo(omSvcId, nodeId,
+          rpcAddrStr);
+
+      if (omProxyInfo.getAddress() != null) {
+        // For a non-HA OM setup, nodeId might be null. If so, we assign it
+        // the default value
+        if (nodeId == null) {
+          nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
+        }
+        // ProxyInfo will be set during first time call to server.
+        omProxies.put(nodeId, null);
+        omProxyInfos.put(nodeId, omProxyInfo);
+        omNodeIDList.add(nodeId);
+      } else {
+        LOG.error("Failed to create OM proxy for {} at address {}",
+            nodeId, rpcAddrStr);
+      }
+    }
+
+    if (omProxies.isEmpty()) {
+      throw new IllegalArgumentException("Could not find any configured " +
+          "addresses for OM. Please configure the system with "
+          + OZONE_OM_ADDRESS_KEY);
+    }
+    setOmProxies(omProxies);
+    setOmNodeIDList(omNodeIDList);
+  }
+
+  private T createOMProxy(InetSocketAddress omAddress) throws IOException {
+    Configuration hadoopConf =
+        LegacyHadoopConfigurationSource.asHadoopConfiguration(getConf());
+    RPC.setProtocolEngine(hadoopConf, getInterface(), ProtobufRpcEngine.class);
+
+    // FailoverOnNetworkException ensures that the IPC layer does not attempt
+    // retries on the same OM in case of connection exception. This retry
+    // policy essentially results in TRY_ONCE_THEN_FAIL.
+    RetryPolicy connectionRetryPolicy = RetryPolicies
+        .failoverOnNetworkException(0);
+
+    return (T) RPC.getProtocolProxy(getInterface(), omVersion,
+        omAddress, ugi, hadoopConf, NetUtils.getDefaultSocketFactory(
+            hadoopConf), (int) OmUtils.getOMClientRpcTimeOut(getConf()),
+        connectionRetryPolicy).getProxy();
+
+  }
+
+  /**
+   * Get the proxy object which should be used until the next failover event
+   * occurs. RPC proxy object is intialized lazily.
+   * @return the OM proxy object to invoke methods upon
+   */
+  @Override
+  public synchronized ProxyInfo<T> getProxy() {
+    ProxyInfo currentProxyInfo = getOMProxyMap().get(getCurrentProxyOMNodeId());
+    if (currentProxyInfo == null) {
+      currentProxyInfo = createOMProxy(getCurrentProxyOMNodeId());
+    }
+    return currentProxyInfo;
+  }
+
+  /**
+   * Creates proxy object.
+   */
+  protected ProxyInfo createOMProxy(String nodeId) {
+    OMProxyInfo omProxyInfo = omProxyInfos.get(nodeId);
+    InetSocketAddress address = omProxyInfo.getAddress();
+    ProxyInfo proxyInfo;
+    try {
+      T proxy = createOMProxy(address);
+      // Create proxyInfo here, to make it work with all Hadoop versions.
+      proxyInfo = new ProxyInfo<>(proxy, omProxyInfo.toString());
+      getOMProxyMap().put(nodeId, proxyInfo);
+    } catch (IOException ioe) {
+      LOG.error("{} Failed to create RPC proxy to OM at {}",
+          this.getClass().getSimpleName(), address, ioe);
+      throw new RuntimeException(ioe);
+    }
+    return proxyInfo;
+  }
+
+  public Text getCurrentProxyDelegationToken() {
+    return delegationTokenService;
+  }
+
+  protected Text computeDelegationTokenService() {
+    // For HA, this will return "," separated address of all OM's.
+    List<String> addresses = new ArrayList<>();
+
+    for (Map.Entry<String, OMProxyInfo> omProxyInfoSet :
+        omProxyInfos.entrySet()) {
+      Text dtService = omProxyInfoSet.getValue().getDelegationTokenService();
+
+      // During client object creation when one of the OM configured address
+      // in unreachable, dtService can be null.
+      if (dtService != null) {
+        addresses.add(dtService.toString());
+      }
+    }
+
+    if (!addresses.isEmpty()) {
+      Collections.sort(addresses);
+      return new Text(String.join(",", addresses));
+    } else {
+      // If all OM addresses are unresolvable, set dt service to null. Let
+      // this fail in later step when during connection setup.
+      return null;
+    }
+  }
+
+  /**
+   * Close all the proxy objects which have been opened over the lifetime of
+   * the proxy provider.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    for (ProxyInfo<T> proxyInfo : getOMProxies()) {
+      if (proxyInfo != null) {
+        RPC.stopProxy(proxyInfo.proxy);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public List<OMProxyInfo> getOMProxyInfos() {
+    return new ArrayList<OMProxyInfo>(omProxyInfos.values());
+  }
+
+  @VisibleForTesting
+  protected void setProxiesForTesting(
+      Map<String, ProxyInfo<T>> setOMProxies,
+      Map<String, OMProxyInfo> setOMProxyInfos,
+      List<String> setOMNodeIDList) {
+    setOmProxies(setOMProxies);
+    this.omProxyInfos = setOMProxyInfos;
+    setOmNodeIDList(setOMNodeIDList);
+  }
+
+}
+
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
similarity index 65%
rename from hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
rename to hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
index 985b3e76ba..b281e22783 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProviderBase.java
@@ -19,79 +19,57 @@
 package org.apache.hadoop.ozone.om.ha;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.ha.ConfUtils;
 import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
 import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
-
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
- * A failover proxy provider implementation which allows clients to configure
+ * A failover proxy provider base abstract class.
+ * Provides common methods for failover proxy provider
+ * implementations. Failover proxy provider allows clients to configure
  * multiple OMs to connect to. In case of OM failover, client can try
  * connecting to another OM node from the list of proxies.
  */
-public class OMFailoverProxyProvider<T> implements
+public abstract class OMFailoverProxyProviderBase<T> implements
     FailoverProxyProvider<T>, Closeable {
 
   public static final Logger LOG =
-      LoggerFactory.getLogger(OMFailoverProxyProvider.class);
+      LoggerFactory.getLogger(OMFailoverProxyProviderBase.class);
 
-  private final String omServiceId;
   private final ConfigurationSource conf;
+  private final String omServiceId;
   private final Class<T> protocolClass;
-  private final long omVersion;
-  private final UserGroupInformation ugi;
-  private final Text delegationTokenService;
 
   // Map of OMNodeID to its proxy
   private Map<String, ProxyInfo<T>> omProxies;
-  private Map<String, OMProxyInfo> omProxyInfos;
   private List<String> omNodeIDList;
 
-  private String nextProxyOMNodeId;
   private String currentProxyOMNodeId;
-  private int nextProxyIndex;
   private int currentProxyIndex;
-
-  private List<String> retryExceptions = new ArrayList<>();
+  private String nextProxyOMNodeId;
+  private int nextProxyIndex;
 
   // OMFailoverProxyProvider, on encountering certain exception, tries each OM
   // once in a round robin fashion. After that it waits for configured time
@@ -105,70 +83,61 @@ public class OMFailoverProxyProvider<T> implements
   private Set<String> accessControlExceptionOMs = new HashSet<>();
   private boolean performFailoverDone;
 
-  public OMFailoverProxyProvider(ConfigurationSource configuration,
-      UserGroupInformation ugi, String omServiceId, Class<T> protocol)
-      throws IOException {
+  public OMFailoverProxyProviderBase(ConfigurationSource configuration,
+                                     String omServiceId,
+                                     Class<T> protocol) throws IOException {
     this.conf = configuration;
-    this.omVersion = RPC.getProtocolVersion(protocol);
-    this.ugi = ugi;
-    this.omServiceId = omServiceId;
     this.protocolClass = protocol;
     this.performFailoverDone = true;
-    loadOMClientConfigs(conf, this.omServiceId);
-    this.delegationTokenService = computeDelegationTokenService();
-
-    nextProxyIndex = 0;
-    nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
-    currentProxyIndex = 0;
-    currentProxyOMNodeId = nextProxyOMNodeId;
+    this.omServiceId = omServiceId;
 
     waitBetweenRetries = conf.getLong(
         OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
         OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
-  }
-
-  protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId)
-      throws IOException {
-    this.omProxies = new HashMap<>();
-    this.omProxyInfos = new HashMap<>();
-    this.omNodeIDList = new ArrayList<>();
 
-    Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config,
-        omSvcId);
+    loadOMClientConfigs(conf, omServiceId);
+    Preconditions.checkNotNull(omProxies);
+    Preconditions.checkNotNull(omNodeIDList);
 
-    for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
-
-      String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
-          omSvcId, nodeId);
-      String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
-      if (rpcAddrStr == null) {
-        continue;
-      }
+    nextProxyIndex = 0;
+    nextProxyOMNodeId = omNodeIDList.get(nextProxyIndex);
+    currentProxyIndex = 0;
+    currentProxyOMNodeId = nextProxyOMNodeId;
+  }
 
-      OMProxyInfo omProxyInfo = new OMProxyInfo(omSvcId, nodeId,
-          rpcAddrStr);
+  protected abstract void loadOMClientConfigs(ConfigurationSource config,
+                                              String omSvcId)
+      throws IOException;
 
-      if (omProxyInfo.getAddress() != null) {
-        // For a non-HA OM setup, nodeId might be null. If so, we assign it
-        // the default value
-        if (nodeId == null) {
-          nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
-        }
-        // ProxyInfo will be set during first time call to server.
-        omProxies.put(nodeId, null);
-        omProxyInfos.put(nodeId, omProxyInfo);
-        omNodeIDList.add(nodeId);
+  protected synchronized boolean shouldFailover(Exception ex) {
+    Throwable unwrappedException = HddsUtils.getUnwrappedException(ex);
+    if (unwrappedException instanceof AccessControlException ||
+        unwrappedException instanceof SecretManager.InvalidToken) {
+      // Retry all available OMs once before failing with
+      // AccessControlException.
+      if (accessControlExceptionOMs.contains(nextProxyOMNodeId)) {
+        accessControlExceptionOMs.clear();
+        return false;
       } else {
-        LOG.error("Failed to create OM proxy for {} at address {}",
-            nodeId, rpcAddrStr);
+        accessControlExceptionOMs.add(nextProxyOMNodeId);
+        if (accessControlExceptionOMs.containsAll(omNodeIDList)) {
+          return false;
+        }
+      }
+    } else if (HddsUtils.shouldNotFailoverOnRpcException(unwrappedException)) {
+      return false;
+    } else if (ex instanceof StateMachineException) {
+      StateMachineException smEx = (StateMachineException) ex;
+      Throwable cause = smEx.getCause();
+      if (cause instanceof OMException) {
+        OMException omEx = (OMException) cause;
+        // Do not failover if the operation was blocked because the OM was
+        // prepared.
+        return omEx.getResult() !=
+           OMException.ResultCodes.NOT_SUPPORTED_OPERATION_WHEN_PREPARED;
       }
     }
-
-    if (omProxies.isEmpty()) {
-      throw new IllegalArgumentException("Could not find any configured " +
-          "addresses for OM. Please configure the system with "
-          + OZONE_OM_ADDRESS_KEY);
-    }
+    return true;
   }
 
   @VisibleForTesting
@@ -176,58 +145,6 @@ public class OMFailoverProxyProvider<T> implements
     return currentProxyOMNodeId;
   }
 
-  private T createOMProxy(InetSocketAddress omAddress) throws IOException {
-    Configuration hadoopConf =
-        LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
-    RPC.setProtocolEngine(hadoopConf, protocolClass, ProtobufRpcEngine.class);
-
-    // FailoverOnNetworkException ensures that the IPC layer does not attempt
-    // retries on the same OM in case of connection exception. This retry
-    // policy essentially results in TRY_ONCE_THEN_FAIL.
-    RetryPolicy connectionRetryPolicy = RetryPolicies
-        .failoverOnNetworkException(0);
-
-    return (T) RPC.getProtocolProxy(protocolClass, omVersion,
-        omAddress, ugi, hadoopConf, NetUtils.getDefaultSocketFactory(
-            hadoopConf), (int) OmUtils.getOMClientRpcTimeOut(conf),
-        connectionRetryPolicy).getProxy();
-
-  }
-
-  /**
-   * Get the proxy object which should be used until the next failover event
-   * occurs. RPC proxy object is intialized lazily.
-   * @return the OM proxy object to invoke methods upon
-   */
-  @Override
-  public synchronized ProxyInfo<T> getProxy() {
-    ProxyInfo currentProxyInfo = omProxies.get(currentProxyOMNodeId);
-    if (currentProxyInfo == null) {
-      currentProxyInfo = createOMProxy(currentProxyOMNodeId);
-    }
-    return currentProxyInfo;
-  }
-
-  /**
-   * Creates proxy object.
-   */
-  protected ProxyInfo createOMProxy(String nodeId) {
-    OMProxyInfo omProxyInfo = omProxyInfos.get(nodeId);
-    InetSocketAddress address = omProxyInfo.getAddress();
-    ProxyInfo proxyInfo;
-    try {
-      T proxy = createOMProxy(address);
-      // Create proxyInfo here, to make it work with all Hadoop versions.
-      proxyInfo = new ProxyInfo<>(proxy, omProxyInfo.toString());
-      omProxies.put(nodeId, proxyInfo);
-    } catch (IOException ioe) {
-      LOG.error("{} Failed to create RPC proxy to OM at {}",
-          this.getClass().getSimpleName(), address, ioe);
-      throw new RuntimeException(ioe);
-    }
-    return proxyInfo;
-  }
-
   @VisibleForTesting
   public RetryPolicy getRetryPolicy(int maxFailovers) {
     // Client will attempt upto maxFailovers number of failovers between
@@ -292,7 +209,7 @@ public class OMFailoverProxyProvider<T> implements
           return new RetryAction(fallbackAction, getWaitTime());
         } else {
           LOG.error("Failed to connect to OMs: {}. Attempted {} failovers.",
-              getOMProxyInfos(), maxFailovers);
+              omNodeIDList, maxFailovers);
           return RetryAction.FAIL;
         }
       }
@@ -301,37 +218,8 @@ public class OMFailoverProxyProvider<T> implements
     return retryPolicy;
   }
 
-  public Text getCurrentProxyDelegationToken() {
-    return delegationTokenService;
-  }
-
-  protected Text computeDelegationTokenService() {
-    // For HA, this will return "," separated address of all OM's.
-    List<String> addresses = new ArrayList<>();
-
-    for (Map.Entry<String, OMProxyInfo> omProxyInfoSet :
-        omProxyInfos.entrySet()) {
-      Text dtService = omProxyInfoSet.getValue().getDelegationTokenService();
-
-      // During client object creation when one of the OM configured address
-      // in unreachable, dtService can be null.
-      if (dtService != null) {
-        addresses.add(dtService.toString());
-      }
-    }
-
-    if (!addresses.isEmpty()) {
-      Collections.sort(addresses);
-      return new Text(String.join(",", addresses));
-    } else {
-      // If all OM addresses are unresolvable, set dt service to null. Let
-      // this fail in later step when during connection setup.
-      return null;
-    }
-  }
-
   @Override
-  public Class<T> getInterface() {
+  public final Class<T> getInterface() {
     return protocolClass;
   }
 
@@ -441,7 +329,7 @@ public class OMFailoverProxyProvider<T> implements
 
     // OMs are being contacted in Round Robin way. Check if all the OMs have
     // been contacted in this attempt.
-    for (String omNodeID : omProxyInfos.keySet()) {
+    for (String omNodeID : omProxies.keySet()) {
       if (!attemptedOMs.contains(omNodeID)) {
         return 0;
       }
@@ -453,65 +341,16 @@ public class OMFailoverProxyProvider<T> implements
     return waitBetweenRetries;
   }
 
-  private synchronized boolean shouldFailover(Exception ex) {
-    Throwable unwrappedException = HddsUtils.getUnwrappedException(ex);
-    if (unwrappedException instanceof AccessControlException ||
-        unwrappedException instanceof SecretManager.InvalidToken) {
-      // Retry all available OMs once before failing with
-      // AccessControlException.
-      if (accessControlExceptionOMs.contains(nextProxyOMNodeId)) {
-        accessControlExceptionOMs.clear();
-        return false;
-      } else {
-        accessControlExceptionOMs.add(nextProxyOMNodeId);
-        if (accessControlExceptionOMs.containsAll(omNodeIDList)) {
-          return false;
-        }
-      }
-    } else if (HddsUtils.shouldNotFailoverOnRpcException(unwrappedException)) {
-      return false;
-    } else if (ex instanceof StateMachineException) {
-      StateMachineException smEx = (StateMachineException) ex;
-      Throwable cause = smEx.getCause();
-      if (cause instanceof OMException) {
-        OMException omEx = (OMException) cause;
-        // Do not failover if the operation was blocked because the OM was
-        // prepared.
-        return omEx.getResult() !=
-            OMException.ResultCodes.NOT_SUPPORTED_OPERATION_WHEN_PREPARED;
-      }
-    }
-    return true;
-  }
-
-  /**
-   * Close all the proxy objects which have been opened over the lifetime of
-   * the proxy provider.
-   */
-  @Override
-  public synchronized void close() throws IOException {
-    for (ProxyInfo<T> proxyInfo : omProxies.values()) {
-      if (proxyInfo != null) {
-        RPC.stopProxy(proxyInfo.proxy);
-      }
-    }
-  }
 
-  @VisibleForTesting
   public List<ProxyInfo> getOMProxies() {
     return new ArrayList<ProxyInfo>(omProxies.values());
   }
 
-  @VisibleForTesting
+
   public Map<String, ProxyInfo<T>> getOMProxyMap() {
     return omProxies;
   }
 
-  @VisibleForTesting
-  public List<OMProxyInfo> getOMProxyInfos() {
-    return new ArrayList<OMProxyInfo>(omProxyInfos.values());
-  }
-
   /**
    * Check if exception is OMLeaderNotReadyException.
    *
@@ -549,18 +388,21 @@ public class OMFailoverProxyProvider<T> implements
     return null;
   }
 
-  protected void setProxies(
-      Map<String, ProxyInfo<T>> setOMProxies,
-      Map<String, OMProxyInfo> setOMProxyInfos,
-      List<String> setOMNodeIDList) {
-    this.omProxies = setOMProxies;
-    this.omProxyInfos = setOMProxyInfos;
-    this.omNodeIDList = setOMNodeIDList;
+  protected ConfigurationSource getConf() {
+    return conf;
+  }
+
+  protected synchronized void setOmProxies(Map<String,
+      ProxyInfo<T>> omProxies) {
+    this.omProxies = omProxies;
+  }
+
+  protected synchronized void setOmNodeIDList(List<String> omNodeIDList) {
+    this.omNodeIDList = omNodeIDList;
   }
 
-  protected List<String> getOmNodeIDList() {
+  protected synchronized List<String> getOmNodeIDList() {
     return omNodeIDList;
   }
 
 }
-
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
index d32926a88f..b899efcd9d 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -23,6 +23,7 @@ import java.security.cert.X509Certificate;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.HashMap;
 import java.util.Map;
@@ -83,6 +84,7 @@ public class GrpcOmTransport implements OmTransport {
   private ConfigurationSource conf;
 
   private AtomicReference<String> host;
+  private AtomicInteger syncFailoverCount;
   private final int maxSize;
   private SecurityConfig secConfig;
 
@@ -104,6 +106,9 @@ public class GrpcOmTransport implements OmTransport {
     this.clients = new HashMap<>();
     this.conf = conf;
     this.host = new AtomicReference();
+    this.failoverCount = 0;
+    this.syncFailoverCount = new AtomicInteger();
+
 
     secConfig =  new SecurityConfig(conf);
     maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
@@ -111,7 +116,6 @@ public class GrpcOmTransport implements OmTransport {
 
     omFailoverProxyProvider = new GrpcOMFailoverProxyProvider(
         conf,
-        ugi,
         omServiceId,
         OzoneManagerProtocolPB.class);
 
@@ -177,9 +181,11 @@ public class GrpcOmTransport implements OmTransport {
   public OMResponse submitRequest(OMRequest payload) throws IOException {
     OMResponse resp = null;
     boolean tryOtherHost = true;
+    int expectedFailoverCount = 0;
     ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
     while (tryOtherHost) {
       tryOtherHost = false;
+      expectedFailoverCount = syncFailoverCount.get();
       try {
         resp = clients.get(host.get()).submitRequest(payload);
       } catch (StatusRuntimeException e) {
@@ -187,7 +193,8 @@ public class GrpcOmTransport implements OmTransport {
           resultCode = ResultCodes.TIMEOUT;
         }
         Exception exp = new Exception(e);
-        tryOtherHost = shouldRetry(unwrapException(exp));
+        tryOtherHost = shouldRetry(unwrapException(exp),
+            expectedFailoverCount);
         if (!tryOtherHost) {
           throw new OMException(resultCode);
         }
@@ -223,6 +230,9 @@ public class GrpcOmTransport implements OmTransport {
         } catch (Exception e) {
           LOG.error("cannot get cause for remote exception");
         }
+      } else if ((status.getCode() == Status.Code.RESOURCE_EXHAUSTED) ||
+              (status.getCode() == Status.Code.DATA_LOSS)) {
+        grpcException = srexp;
       } else {
         // exception generated by connection failure, gRPC
         grpcException = ex;
@@ -234,7 +244,7 @@ public class GrpcOmTransport implements OmTransport {
     return grpcException;
   }
 
-  private boolean shouldRetry(Exception ex) {
+  private boolean shouldRetry(Exception ex, int expectedFailoverCount) {
     boolean retry = false;
     RetryPolicy.RetryAction action = null;
     try {
@@ -242,7 +252,8 @@ public class GrpcOmTransport implements OmTransport {
       LOG.debug("grpc failover retry action {}", action.action);
       if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
         retry = false;
-        LOG.error("Retry request failed. " + action.reason, ex);
+        LOG.error("Retry request failed. Action : {}, {}",
+            action.action, ex.toString());
       } else {
         if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY ||
             (action.action == RetryPolicy.RetryAction.RetryDecision
@@ -255,7 +266,13 @@ public class GrpcOmTransport implements OmTransport {
             }
           }
           // switch om host to current proxy OMNodeId
-          omFailoverProxyProvider.performFailover(null);
+          if (syncFailoverCount.get() == expectedFailoverCount) {
+            omFailoverProxyProvider.performFailover(null);
+            syncFailoverCount.getAndIncrement();
+          } else {
+            LOG.warn("A failover has occurred since the start of current" +
+                " thread retry, NOT failover using current proxy");
+          }
           host.set(omFailoverProxyProvider
               .getGrpcProxyAddress(
                   omFailoverProxyProvider.getCurrentProxyOMNodeId()));
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
index 9a6040c8ea..297d5d6322 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/Hadoop3OmTransport.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -52,7 +52,7 @@ public class Hadoop3OmTransport implements OmTransport {
   private static final Logger LOG =
       LoggerFactory.getLogger(Hadoop3OmTransport.class);
 
-  private final OMFailoverProxyProvider omFailoverProxyProvider;
+  private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
 
   private final OzoneManagerProtocolPB rpcProxy;
 
@@ -63,8 +63,8 @@ public class Hadoop3OmTransport implements OmTransport {
         OzoneManagerProtocolPB.class,
         ProtobufRpcEngine.class);
 
-    this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi,
-        omServiceId, OzoneManagerProtocolPB.class);
+    this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider(
+            conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
 
     int maxFailovers = conf.getInt(
         OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
@@ -90,7 +90,7 @@ public class Hadoop3OmTransport implements OmTransport {
       return omResponse;
     } catch (ServiceException e) {
       OMNotLeaderException notLeaderException =
-          OMFailoverProxyProvider.getNotLeaderException(e);
+          HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
       if (notLeaderException == null) {
         throw ProtobufHelper.getRemoteException(e);
       }
@@ -105,11 +105,13 @@ public class Hadoop3OmTransport implements OmTransport {
 
   /**
    * Creates a {@link RetryProxy} encapsulating the
-   * {@link OMFailoverProxyProvider}. The retry proxy fails over on network
-   * exception or if the current proxy is not the leader OM.
+   * {@link HadoopRpcOMFailoverProxyProvider}. The retry proxy
+   * fails over on network exception or if the current proxy
+   * is not the leader OM.
    */
   private OzoneManagerProtocolPB createRetryProxy(
-      OMFailoverProxyProvider failoverProxyProvider, int maxFailovers) {
+      HadoopRpcOMFailoverProxyProvider failoverProxyProvider,
+      int maxFailovers) {
 
     OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
         OzoneManagerProtocolPB.class, failoverProxyProvider,
@@ -118,7 +120,7 @@ public class Hadoop3OmTransport implements OmTransport {
   }
 
   @VisibleForTesting
-  public OMFailoverProxyProvider getOmFailoverProxyProvider() {
+  public HadoopRpcOMFailoverProxyProvider getOmFailoverProxyProvider() {
     return omFailoverProxyProvider;
   }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
index 05e73d4b8f..b207657b1f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMAdminProtocolClientSideImpl.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.ozone.om.protocol.OMConfiguration;
 import org.apache.hadoop.ozone.om.protocol.OMAdminProtocol;
 import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.DecommissionOMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.DecommissionOMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerAdminProtocolProtos.OMConfigurationRequest;
@@ -128,8 +128,8 @@ public final class OMAdminProtocolClientSideImpl implements OMAdminProtocol {
     RPC.setProtocolEngine(OzoneConfiguration.of(conf),
         OMAdminProtocolPB.class, ProtobufRpcEngine.class);
 
-    OMFailoverProxyProvider omFailoverProxyProvider =
-        new OMFailoverProxyProvider(conf, ugi, omServiceId,
+    HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+        new HadoopRpcOMFailoverProxyProvider(conf, ugi, omServiceId,
             OMAdminProtocolPB.class);
 
     // Multiple the max number of retries with number of OMs to calculate the
@@ -192,13 +192,13 @@ public final class OMAdminProtocolClientSideImpl implements OMAdminProtocol {
       response = rpcProxy.decommission(NULL_RPC_CONTROLLER, decommOMRequest);
     } catch (ServiceException e) {
       OMNotLeaderException notLeaderException =
-          OMFailoverProxyProvider.getNotLeaderException(e);
+          HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
       if (notLeaderException != null) {
         throwException(notLeaderException.getMessage());
       }
 
       OMLeaderNotReadyException leaderNotReadyException =
-          OMFailoverProxyProvider.getLeaderNotReadyException(e);
+          HadoopRpcOMFailoverProxyProvider.getLeaderNotReadyException(e);
       if (leaderNotReadyException != null) {
         throwException(leaderNotReadyException.getMessage());
       }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
index a81a19bfd6..66b5f63bc9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OMInterServiceProtocolClientSideImpl.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OMNodeDetails;
 import org.apache.hadoop.ozone.om.protocol.OMInterServiceProtocol;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerInterServiceProtocolProtos.BootstrapOMRequest;
@@ -53,7 +53,7 @@ public class OMInterServiceProtocolClientSideImpl implements
   private static final Logger LOG =
       LoggerFactory.getLogger(OMInterServiceProtocolClientSideImpl.class);
 
-  private final OMFailoverProxyProvider omFailoverProxyProvider;
+  private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
 
   private final OMInterServiceProtocolPB rpcProxy;
 
@@ -63,8 +63,8 @@ public class OMInterServiceProtocolClientSideImpl implements
     RPC.setProtocolEngine(OzoneConfiguration.of(conf),
         OMInterServiceProtocolPB.class, ProtobufRpcEngine.class);
 
-    this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi,
-        omServiceId, OMInterServiceProtocolPB.class);
+    this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider(
+            conf, ugi, omServiceId, OMInterServiceProtocolPB.class);
 
     int maxFailovers = conf.getInt(
         OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
@@ -88,14 +88,14 @@ public class OMInterServiceProtocolClientSideImpl implements
       response = rpcProxy.bootstrap(NULL_RPC_CONTROLLER, bootstrapOMRequest);
     } catch (ServiceException e) {
       OMNotLeaderException notLeaderException =
-          OMFailoverProxyProvider.getNotLeaderException(e);
+          HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
       if (notLeaderException != null) {
         throwException(ErrorCode.LEADER_UNDETERMINED,
             notLeaderException.getMessage());
       }
 
       OMLeaderNotReadyException leaderNotReadyException =
-          OMFailoverProxyProvider.getLeaderNotReadyException(e);
+          HadoopRpcOMFailoverProxyProvider.getLeaderNotReadyException(e);
       if (leaderNotReadyException != null) {
         throwException(ErrorCode.LEADER_NOT_READY,
             leaderNotReadyException.getMessage());
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
index 78505f7948..d1a3f5c51d 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/ha/TestOMFailoverProxyProvider.java
@@ -48,7 +48,7 @@ public class TestOMFailoverProxyProvider {
   private static final String OM_SERVICE_ID = "om-service-test1";
   private static final String NODE_ID_BASE_STR = "omNode-";
   private static final String DUMMY_NODE_ADDR = "0.0.0.0:8080";
-  private OMFailoverProxyProvider provider;
+  private HadoopRpcOMFailoverProxyProvider provider;
   private long waitBetweenRetries;
   private int numNodes = 3;
   private OzoneConfiguration config;
@@ -68,7 +68,7 @@ public class TestOMFailoverProxyProvider {
     }
     config.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
         allNodeIds.toString());
-    provider = new OMFailoverProxyProvider(config,
+    provider = new HadoopRpcOMFailoverProxyProvider(config,
         UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
         OzoneManagerProtocolPB.class);
   }
@@ -187,9 +187,11 @@ public class TestOMFailoverProxyProvider {
     }
     ozoneConf.set(ConfUtils.addKeySuffixes(OZONE_OM_NODES_KEY, OM_SERVICE_ID),
         allNodeIds.toString());
-    OMFailoverProxyProvider prov = new OMFailoverProxyProvider(ozoneConf,
-        UserGroupInformation.getCurrentUser(), OM_SERVICE_ID,
-        OzoneManagerProtocolPB.class);
+    HadoopRpcOMFailoverProxyProvider prov =
+            new HadoopRpcOMFailoverProxyProvider(ozoneConf,
+                    UserGroupInformation.getCurrentUser(),
+                    OM_SERVICE_ID,
+                    OzoneManagerProtocolPB.class);
 
     Text dtService = prov.getCurrentProxyDelegationToken();
 
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
index 1ca5b6470b..421100ffaa 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.om.protocolPB;
 
 import static org.apache.hadoop.ozone.ClientVersion.CURRENT_VERSION;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT;
 import static org.mockito.AdditionalAnswers.delegatesTo;
 import static org.mockito.Mockito.mock;
 
@@ -48,6 +49,8 @@ import com.google.protobuf.ServiceException;
 import org.apache.ratis.protocol.RaftPeerId;
 
 import static org.junit.Assert.fail;
+import static org.apache.hadoop.ozone.om.OMConfigKeys
+    .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH;
 
 /**
  * Tests for GrpcOmTransport client.
@@ -77,6 +80,8 @@ public class TestS3GrpcOmTransport {
   private UserGroupInformation ugi;
   private ManagedChannel channel;
 
+  private String serverName;
+
 
   private ServiceException createNotLeaderException() {
     RaftPeerId raftPeerId = RaftPeerId.getRaftPeerId("testNodeId");
@@ -126,7 +131,7 @@ public class TestS3GrpcOmTransport {
   @Before
   public void setUp() throws Exception {
     // Generate a unique in-process server name.
-    String serverName = InProcessServerBuilder.generateName();
+    serverName = InProcessServerBuilder.generateName();
 
     // Create a server, add service, start,
     // and register for automatic graceful shutdown.
@@ -219,4 +224,39 @@ public class TestS3GrpcOmTransport {
       Assert.assertTrue(true);
     }
   }
+
+  @Test
+  public void testGrpcFailoverExceedMaxMesgLen() throws Exception {
+    ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+    final OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.ServiceList)
+        .setVersion(CURRENT_VERSION)
+        .setClientId("test")
+        .setServiceListRequest(req)
+        .build();
+
+    conf.setInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH, 1);
+    client = new GrpcOmTransport(conf, ugi, omServiceId);
+    int maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
+        OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
+    channel = grpcCleanup.register(
+        InProcessChannelBuilder
+            .forName(serverName)
+            .maxInboundMetadataSize(maxSize)
+            .directExecutor().build());
+    client.startClient(channel);
+
+    doFailover = true;
+    // GrpcOMFailoverProvider returns Fail retry due to mesg response
+    // len > 0, causing RESOURCE_EXHAUSTED exception.
+    // This exception should cause failover to NOT retry,
+    // rather to fail.
+    try {
+      final OMResponse resp = client.submitRequest(omRequest);
+      fail();
+    } catch (Exception e) {
+      Assert.assertTrue(true);
+    }
+  }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 280eb3573e..49622b3c57 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -93,7 +93,7 @@ import org.apache.hadoop.ozone.om.OmFailoverProxyUtil;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -265,7 +265,7 @@ public abstract class TestOzoneRpcClientAbstract {
   @Test
   public void testOMClientProxyProvider() {
 
-    OMFailoverProxyProvider omFailoverProxyProvider =
+    HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
         OmFailoverProxyUtil.getFailoverProxyProvider(store.getClientProxy());
 
     List<OMProxyInfo> omProxies = omFailoverProxyProvider.getOMProxyInfos();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
index 15cb4253d6..a2f8373e6d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/OmFailoverProxyUtil.java
@@ -18,7 +18,7 @@ package org.apache.hadoop.ozone.om;
 
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.protocolPB.Hadoop3OmTransport;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
 
@@ -33,7 +33,7 @@ public final class OmFailoverProxyUtil {
   /**
    * Get FailoverProxyProvider from RpcClient / ClientProtocol.
    */
-  public static OMFailoverProxyProvider getFailoverProxyProvider(
+  public static HadoopRpcOMFailoverProxyProvider getFailoverProxyProvider(
       ClientProtocol clientProtocol) {
 
     OzoneManagerProtocolClientSideTranslatorPB ozoneManagerClient =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 66e1505b59..3095633eb9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
@@ -250,11 +250,11 @@ public abstract class TestOzoneManagerHA {
    */
   protected void stopLeaderOM() {
     //Stop the leader OM.
-    OMFailoverProxyProvider omFailoverProxyProvider =
+    HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
         OmFailoverProxyUtil.getFailoverProxyProvider(
             (RpcClient) objectStore.getClientProxy());
 
-    // The OMFailoverProxyProvider will point to the current leader OM node.
+    // The omFailoverProxyProvider will point to the current leader OM node.
     String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
 
     // Stop one of the ozone manager, to see when the OM leader changes
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
index e1874c36fb..203abdce5e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAMetadataOnly.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
 import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
@@ -150,14 +150,14 @@ public class TestOzoneManagerHAMetadataOnly extends TestOzoneManagerHA {
   }
 
   /**
-   * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
-   * cluster.
+   * Test that HadoopRpcOMFailoverProxyProvider creates an OM proxy
+   * for each OM in the cluster.
    */
   @Test
   public void testOMProxyProviderInitialization() throws Exception {
     OzoneClient rpcClient = getCluster().getRpcClient();
 
-    OMFailoverProxyProvider omFailoverProxyProvider =
+    HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
         OmFailoverProxyUtil.getFailoverProxyProvider(
             rpcClient.getObjectStore().getClientProxy());
 
@@ -183,13 +183,14 @@ public class TestOzoneManagerHAMetadataOnly extends TestOzoneManagerHA {
   }
 
   /**
-   * Test OMFailoverProxyProvider failover on connection exception to OM client.
+   * Test HadoopRpcOMFailoverProxyProvider failover on connection exception
+   * to OM client.
    */
   @Test
   public void testOMProxyProviderFailoverOnConnectionFailure()
       throws Exception {
     ObjectStore objectStore = getObjectStore();
-    OMFailoverProxyProvider omFailoverProxyProvider =
+    HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
         OmFailoverProxyUtil
             .getFailoverProxyProvider(objectStore.getClientProxy());
     String firstProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
@@ -215,20 +216,21 @@ public class TestOzoneManagerHAMetadataOnly extends TestOzoneManagerHA {
   }
 
   /**
-   * Test OMFailoverProxyProvider failover when current OM proxy is not
+   * Test HadoopRpcOMFailoverProxyProvider failover when current OM proxy is not
    * the current OM Leader.
    */
   @Test
   public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
     ObjectStore objectStore = getObjectStore();
-    OMFailoverProxyProvider omFailoverProxyProvider = OmFailoverProxyUtil
-        .getFailoverProxyProvider(objectStore.getClientProxy());
+    HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+            OmFailoverProxyUtil
+                    .getFailoverProxyProvider(objectStore.getClientProxy());
 
     // Run couple of createVolume tests to discover the current Leader OM
     createVolumeTest(true);
     createVolumeTest(true);
 
-    // The OMFailoverProxyProvider will point to the current leader OM node.
+    // The oMFailoverProxyProvider will point to the current leader OM node.
     String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
 
     // Perform a manual failover of the proxy provider to move the
@@ -288,21 +290,22 @@ public class TestOzoneManagerHAMetadataOnly extends TestOzoneManagerHA {
     ObjectStore objectStore = getObjectStore();
     objectStore.createVolume(volumeName);
 
-    OMFailoverProxyProvider omFailoverProxyProvider = OmFailoverProxyUtil
-        .getFailoverProxyProvider(objectStore.getClientProxy());
+    HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+            OmFailoverProxyUtil
+                    .getFailoverProxyProvider(objectStore.getClientProxy());
 
     String currentLeaderNodeId = omFailoverProxyProvider
         .getCurrentProxyOMNodeId();
 
     // A read request from any proxy should failover to the current leader OM
     for (int i = 0; i < getNumOfOMs(); i++) {
-      // Failover OMFailoverProxyProvider to OM at index i
+      // Failover omFailoverProxyProvider to OM at index i
       OzoneManager ozoneManager = getCluster().getOzoneManager(i);
 
       // Get the ObjectStore and FailoverProxyProvider for OM at index i
       final ObjectStore store = OzoneClientFactory.getRpcClient(
           getOmServiceId(), getConf()).getObjectStore();
-      final OMFailoverProxyProvider proxyProvider =
+      final HadoopRpcOMFailoverProxyProvider proxyProvider =
           OmFailoverProxyUtil.getFailoverProxyProvider(store.getClientProxy());
 
       // Failover to the OM node that the objectStore points to
@@ -371,8 +374,9 @@ public class TestOzoneManagerHAMetadataOnly extends TestOzoneManagerHA {
     objectStore.createVolume(UUID.randomUUID().toString());
 
 
-    OMFailoverProxyProvider omFailoverProxyProvider = OmFailoverProxyUtil
-        .getFailoverProxyProvider(objectStore.getClientProxy());
+    HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
+            OmFailoverProxyUtil
+                    .getFailoverProxyProvider(objectStore.getClientProxy());
 
     String currentLeaderNodeId = omFailoverProxyProvider
         .getCurrentProxyOMNodeId();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
index b6fc179dab..fbfe6d36e3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithData.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.ozone.test.GenericTestUtils;
@@ -257,11 +257,11 @@ public class TestOzoneManagerHAWithData extends TestOzoneManagerHA {
     // Stop leader OM, to see when the OM leader changes
     // multipart upload is happening successfully or not.
 
-    OMFailoverProxyProvider omFailoverProxyProvider =
+    HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
         OmFailoverProxyUtil
             .getFailoverProxyProvider(getObjectStore().getClientProxy());
 
-    // The OMFailoverProxyProvider will point to the current leader OM node.
+    // The omFailoverProxyProvider will point to the current leader OM node.
     String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
 
     // Stop one of the ozone manager, to see when the OM leader changes
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
index 39f5f4ead1..3aee013ebb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHAWithFailover.java
@@ -17,7 +17,7 @@
 package org.apache.hadoop.ozone.om;
 
 import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
 import org.junit.Assert;
 import org.junit.jupiter.api.Test;
 
@@ -44,11 +44,11 @@ public class TestOzoneManagerHAWithFailover extends TestOzoneManagerHA {
     long waitBetweenRetries = getConf().getLong(
         OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY,
         OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_DEFAULT);
-    OMFailoverProxyProvider omFailoverProxyProvider =
+    HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider =
         OmFailoverProxyUtil
             .getFailoverProxyProvider(getObjectStore().getClientProxy());
 
-    // The OMFailoverProxyProvider will point to the current leader OM node.
+    // The omFailoverProxyProvider will point to the current leader OM node.
     String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
 
     getCluster().stopOzoneManager(leaderOMNodeId);
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
index fe7f6f49ea..79ea1ec170 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
@@ -29,7 +29,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProviderBase;
 import org.apache.hadoop.ozone.om.ha.OMProxyInfo;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
@@ -54,9 +55,9 @@ public class TestOMFailovers {
 
     testException = new AccessControlException();
 
-    GenericTestUtils.setLogLevel(OMFailoverProxyProvider.LOG, Level.DEBUG);
+    GenericTestUtils.setLogLevel(OMFailoverProxyProviderBase.LOG, Level.DEBUG);
     GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
-        .captureLogs(OMFailoverProxyProvider.LOG);
+        .captureLogs(OMFailoverProxyProviderBase.LOG);
 
     MockFailoverProxyProvider failoverProxyProvider =
         new MockFailoverProxyProvider(conf);
@@ -114,7 +115,7 @@ public class TestOMFailovers {
   }
 
   private final class MockFailoverProxyProvider
-      extends OMFailoverProxyProvider {
+      extends HadoopRpcOMFailoverProxyProvider {
 
     private MockFailoverProxyProvider(ConfigurationSource configuration)
         throws IOException {
@@ -143,7 +144,7 @@ public class TestOMFailovers {
         omProxyInfos.put(nodeId, null);
         omNodeIDList.add(nodeId);
       }
-      setProxies(omProxies, omProxyInfos, omNodeIDList);
+      setProxiesForTesting(omProxies, omProxyInfos, omNodeIDList);
     }
 
     @Override
diff --git a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
index a3b678efa3..408b282c64 100644
--- a/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
+++ b/hadoop-ozone/ozonefs-hadoop2/src/main/java/org/apache/hadoop/fs/ozone/Hadoop27RpcTransport.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.om.ha.HadoopRpcOMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
@@ -47,7 +47,7 @@ public class Hadoop27RpcTransport implements OmTransport {
 
   private final OzoneManagerProtocolPB rpcProxy;
 
-  private final OMFailoverProxyProvider omFailoverProxyProvider;
+  private final HadoopRpcOMFailoverProxyProvider omFailoverProxyProvider;
 
   public Hadoop27RpcTransport(ConfigurationSource conf,
       UserGroupInformation ugi, String omServiceId) throws IOException {
@@ -56,8 +56,8 @@ public class Hadoop27RpcTransport implements OmTransport {
         OzoneManagerProtocolPB.class,
         ProtobufRpcEngine.class);
 
-    this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi,
-        omServiceId, OzoneManagerProtocolPB.class);
+    this.omFailoverProxyProvider = new HadoopRpcOMFailoverProxyProvider(
+            conf, ugi, omServiceId, OzoneManagerProtocolPB.class);
 
     int maxFailovers = conf.getInt(
         OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
@@ -84,7 +84,7 @@ public class Hadoop27RpcTransport implements OmTransport {
       return omResponse;
     } catch (ServiceException e) {
       OMNotLeaderException notLeaderException =
-          OMFailoverProxyProvider.getNotLeaderException(e);
+          HadoopRpcOMFailoverProxyProvider.getNotLeaderException(e);
       if (notLeaderException == null) {
         throw ProtobufHelper.getRemoteException(e);
       }
@@ -99,11 +99,12 @@ public class Hadoop27RpcTransport implements OmTransport {
 
   /**
    * Creates a {@link RetryProxy} encapsulating the
-   * {@link OMFailoverProxyProvider}. The retry proxy fails over on network
-   * exception or if the current proxy is not the leader OM.
+   * {@link HadoopRpcOMFailoverProxyProvider}. The retry proxy fails over on
+   * network exception or if the current proxy is not the leader OM.
    */
   private OzoneManagerProtocolPB createRetryProxy(
-      OMFailoverProxyProvider failoverProxyProvider, int maxFailovers) {
+      HadoopRpcOMFailoverProxyProvider failoverProxyProvider,
+      int maxFailovers) {
 
     OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
         OzoneManagerProtocolPB.class, failoverProxyProvider,
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
index 304a2bfb86..4082dc82f8 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/protocolPB/TestGrpcOmTransport.java
@@ -19,11 +19,17 @@
 package org.apache.hadoop.ozone.protocolPB;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransportFactory;
 import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.ozone.om.protocolPB.Hadoop3OmTransportFactory;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
 import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
@@ -39,34 +45,44 @@ public class TestGrpcOmTransport {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(TestGrpcOmTransport.class);
+  private static OzoneConfiguration conf;
   @Rule
-  public Timeout timeout = Timeout.seconds(30);
+  public Timeout timeout = Timeout.seconds(3000);
 
+  @BeforeClass
+  public static void setUp() {
+    conf = new OzoneConfiguration();
+    RPC.setProtocolEngine(OzoneConfiguration.of(conf),
+        OzoneManagerProtocolPB.class,
+        ProtobufRpcEngine.class);
+  }
 
   @Test
   public void testGrpcOmTransportFactory() throws Exception {
     String omServiceId = "";
-    String transportCls = GrpcOmTransport.class.getName();
-    OzoneConfiguration conf = new OzoneConfiguration();
+    String transportCls = GrpcOmTransportFactory.class.getName();
     conf.set(OZONE_OM_TRANSPORT_CLASS,
         transportCls);
 
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     OmTransport omTransport = OmTransportFactory.create(conf, ugi, omServiceId);
+    omTransport.close();
     Assert.assertEquals(GrpcOmTransport.class.getSimpleName(),
         omTransport.getClass().getSimpleName());
-
   }
 
   @Test
   public void testHrpcOmTransportFactory() throws Exception {
     String omServiceId = "";
-    OzoneConfiguration conf = new OzoneConfiguration();
+    String transportCls = Hadoop3OmTransportFactory.class.getName();
+    conf.set(OZONE_OM_TRANSPORT_CLASS,
+        transportCls);
 
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     OmTransport omTransport = OmTransportFactory.create(conf, ugi, omServiceId);
     // OmTransport should be Hadoop Rpc and
     // fail equality GrpcOmTransport equality test
+    omTransport.close();
     Assert.assertNotEquals(GrpcOmTransport.class.getSimpleName(),
         omTransport.getClass().getSimpleName());
   }
@@ -74,8 +90,6 @@ public class TestGrpcOmTransport {
   @Test
   public void testStartStop() throws Exception {
     String omServiceId = "";
-    OzoneConfiguration conf = new OzoneConfiguration();
-
     UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     GrpcOmTransport client = new GrpcOmTransport(conf, ugi, omServiceId);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org