You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2019/03/01 08:29:16 UTC

[hadoop] branch trunk updated: HDDS-1072. Implement RetryProxy and FailoverProxy for OM client.

This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8e12259  HDDS-1072. Implement RetryProxy and FailoverProxy for OM client.
8e12259 is described below

commit 8e1225991d8da7d6801fc3753319139873f23bc9
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Fri Mar 1 00:27:39 2019 -0800

    HDDS-1072. Implement RetryProxy and FailoverProxy for OM client.
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |  17 ++
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   3 +
 .../common/src/main/resources/ozone-default.xml    |  43 +++-
 .../ozone/client/protocol/ClientProtocol.java      |   4 +-
 .../hadoop/ozone/client/rest/RestClient.java       |   4 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  18 +-
 .../hadoop/ozone/client/rpc/ha/OMProxyInfo.java    |  49 ----
 .../ozone/client/rpc/ha/OMProxyProvider.java       | 177 --------------
 .../ozone/om/ha/OMFailoverProxyProvider.java       | 266 +++++++++++++++++++++
 .../apache/hadoop/ozone/om}/ha/package-info.java   |   2 +-
 .../ozone/om/protocol/OzoneManagerProtocol.java    |   7 +
 ...OzoneManagerProtocolClientSideTranslatorPB.java | 105 +++++++-
 .../src/main/proto/OzoneManagerProtocol.proto      |   2 +
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |  38 +--
 .../client/rpc/TestOzoneRpcClientAbstract.java     |   8 +-
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 184 +++++++++++---
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   6 +
 .../hadoop/ozone/om/ratis/OMRatisHelper.java       |   9 +-
 .../ozone/om/ratis/OzoneManagerRatisClient.java    |  27 ++-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |   3 +-
 .../om/ratis/TestOzoneManagerRatisServer.java      |  23 --
 21 files changed, 653 insertions(+), 342 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index cd40f7c..0d73905 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -379,6 +379,23 @@ public final class OzoneConfigKeys {
   public static final String OZONE_FS_ISOLATED_CLASSLOADER =
       "ozone.fs.isolated-classloader";
 
+  // Ozone Client Retry and Failover configurations
+  public static final String OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY =
+      "ozone.client.retry.max.attempts";
+  public static final int OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT =
+      10;
+  public static final String OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY =
+      "ozone.client.failover.max.attempts";
+  public static final int OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT =
+      15;
+  public static final String OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY =
+      "ozone.client.failover.sleep.base.millis";
+  public static final int OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT =
+      500;
+  public static final String OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY =
+      "ozone.client.failover.sleep.max.millis";
+  public static final int OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT =
+      15000;
 
   public static final String OZONE_FREON_HTTP_ENABLED_KEY =
       "ozone.freon.http.enabled";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 37cfb7f..333a032 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -274,4 +274,7 @@ public final class OzoneConsts {
 
   // Default OMServiceID for OM Ratis servers to use as RaftGroupId
   public static final String OM_SERVICE_ID_DEFAULT = "omServiceIdDefault";
+
+  // Dummy OMNodeID for OM Clients to use for a non-HA OM setup
+  public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy";
 }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index faf2f89..af22cc1 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2029,4 +2029,45 @@
     </description>
   </property>
 
-</configuration>
+  <property>
+    <name>ozone.client.retry.max.attempts</name>
+    <value>10</value>
+    <description>
+      Max retry attempts for Ozone RpcClient talking to OzoneManagers.
+    </description>
+  </property>
+  <property>
+    <name>ozone.client.failover.max.attempts</name>
+    <value>15</value>
+    <description>
+      Expert only. The number of client failover attempts that should be
+      made before the failover is considered failed.
+    </description>
+  </property>
+  <property>
+    <name>ozone.client.failover.sleep.base.millis</name>
+    <value>500</value>
+    <description>
+      Expert only. The time to wait, in milliseconds, between failover
+      attempts increases exponentially as a function of the number of
+      attempts made so far, with a random factor of +/- 50%. This option
+      specifies the base value used in the failover calculation. The
+      first failover will retry immediately. The 2nd failover attempt
+      will delay at least ozone.client.failover.sleep.base.millis
+      milliseconds. And so on.
+    </description>
+  </property>
+  <property>
+    <name>ozone.client.failover.sleep.max.millis</name>
+    <value>15000</value>
+    <description>
+      Expert only. The time to wait, in milliseconds, between failover
+      attempts increases exponentially as a function of the number of
+      attempts made so far, with a random factor of +/- 50%. This option
+      specifies the maximum value to wait between failovers.
+      Specifically, the time between two failover attempts will not
+      exceed +/- 50% of ozone.client.failover.sleep.max.millis
+      milliseconds.
+    </description>
+  </property>
+</configuration>
\ No newline at end of file
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 494afae..2bf9089 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 
@@ -510,5 +510,5 @@ public interface ClientProtocol {
   S3SecretValue getS3Secret(String kerberosID) throws IOException;
 
   @VisibleForTesting
-  OMProxyProvider getOMProxyProvider();
+  OMFailoverProxyProvider getOMProxyProvider();
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
index b69d972..eea2809 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
@@ -42,8 +42,8 @@ import org.apache.hadoop.ozone.client.rest.headers.Header;
 import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
 import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
 import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -725,7 +725,7 @@ public class RestClient implements ClientProtocol {
   }
 
   @Override
-  public OMProxyProvider getOMProxyProvider() {
+  public OMFailoverProxyProvider getOMProxyProvider() {
     return null;
   }
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 0875046..4b44770 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -50,8 +50,8 @@ import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -66,6 +66,8 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.hadoop.ozone.om.protocolPB
+    .OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
@@ -85,6 +87,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.io.Text;
 import org.apache.logging.log4j.util.Strings;
+import org.apache.ratis.protocol.ClientId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -107,7 +110,6 @@ public class RpcClient implements ClientProtocol {
   private final OzoneConfiguration conf;
   private final StorageContainerLocationProtocol
       storageContainerLocationClient;
-  private final OMProxyProvider omProxyProvider;
   private final OzoneManagerProtocol ozoneManagerClient;
   private final XceiverClientManager xceiverClientManager;
   private final int chunkSize;
@@ -121,6 +123,7 @@ public class RpcClient implements ClientProtocol {
   private final long streamBufferMaxSize;
   private final long blockSize;
   private final long watchTimeout;
+  private final ClientId clientId = ClientId.randomId();
 
    /**
     * Creates RpcClient instance with the given configuration.
@@ -137,11 +140,8 @@ public class RpcClient implements ClientProtocol {
         OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
     RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
         ProtobufRpcEngine.class);
-    this.omProxyProvider = new OMProxyProvider(conf, ugi);
-    this.ozoneManagerClient =
-        TracingUtil.createProxy(
-            this.omProxyProvider.getProxy(),
-            OzoneManagerProtocol.class);
+    this.ozoneManagerClient = new OzoneManagerProtocolClientSideTranslatorPB(
+        this.conf, clientId.toString(), ugi);
 
     long scmVersion =
         RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
@@ -492,8 +492,8 @@ public class RpcClient implements ClientProtocol {
 
   @Override
   @VisibleForTesting
-  public OMProxyProvider getOMProxyProvider() {
-    return omProxyProvider;
+  public OMFailoverProxyProvider getOMProxyProvider() {
+    return ozoneManagerClient.getOMFailoverProxyProvider();
   }
 
   @Override
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java
deleted file mode 100644
index 01e5562..0000000
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client.rpc.ha;
-
-import org.apache.hadoop.ozone.om.protocolPB
-    .OzoneManagerProtocolClientSideTranslatorPB;
-
-import java.net.InetSocketAddress;
-
-/**
- * Proxy information of OM.
- */
-public final class OMProxyInfo {
-  private InetSocketAddress address;
-  private OzoneManagerProtocolClientSideTranslatorPB omClient;
-
-  public OMProxyInfo(InetSocketAddress addr) {
-    this.address = addr;
-  }
-
-  public InetSocketAddress getAddress() {
-    return address;
-  }
-
-  public OzoneManagerProtocolClientSideTranslatorPB getOMProxy() {
-    return omClient;
-  }
-
-  public void setOMProxy(
-      OzoneManagerProtocolClientSideTranslatorPB clientProxy) {
-    this.omClient = clientProxy;
-  }
-}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java
deleted file mode 100644
index 574cb5f..0000000
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.client.rpc.ha;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.OmUtils;
-import org.apache.hadoop.ozone.om.protocolPB
-    .OzoneManagerProtocolClientSideTranslatorPB;
-import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.ratis.protocol.ClientId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
-
-/**
- * A failover proxy provider implementation which allows clients to configure
- * multiple OMs to connect to. In case of OM failover, client can try
- * connecting to another OM node from the list of proxies.
- */
-public class OMProxyProvider implements Closeable {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(OMProxyProvider.class);
-
-  private List<OMProxyInfo> omProxies;
-
-  private int currentProxyIndex = 0;
-
-  private final Configuration conf;
-  private final long omVersion;
-  private final UserGroupInformation ugi;
-  private ClientId clientId = ClientId.randomId();
-
-  public OMProxyProvider(Configuration configuration,
-      UserGroupInformation ugi) {
-    this.conf = configuration;
-    this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
-    this.ugi = ugi;
-    loadOMClientConfigs(conf);
-  }
-
-  private void loadOMClientConfigs(Configuration config) {
-    this.omProxies = new ArrayList<>();
-
-    Collection<String> omServiceIds = config.getTrimmedStringCollection(
-        OZONE_OM_SERVICE_IDS_KEY);
-
-    if (omServiceIds.size() > 1) {
-      throw new IllegalArgumentException("Multi-OM Services is not supported." +
-          " Please configure only one OM Service ID in " +
-          OZONE_OM_SERVICE_IDS_KEY);
-    }
-
-    for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
-      Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
-
-      for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
-
-        String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
-            serviceId, nodeId);
-        String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
-        if (rpcAddrStr == null) {
-          continue;
-        }
-
-        InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
-
-        // Add the OM client proxy info to list of proxies
-        if (addr != null) {
-          OMProxyInfo omProxyInfo = new OMProxyInfo(addr);
-          omProxies.add(omProxyInfo);
-        } else {
-          LOG.error("Failed to create OM proxy at address {}", rpcAddrStr);
-        }
-      }
-    }
-
-    if (omProxies.isEmpty()) {
-      throw new IllegalArgumentException("Could not find any configured " +
-          "addresses for OM. Please configure the system with "
-          + OZONE_OM_ADDRESS_KEY);
-    }
-  }
-
-  private OzoneManagerProtocolClientSideTranslatorPB getOMClient(
-      InetSocketAddress omAddress) throws IOException {
-    return new OzoneManagerProtocolClientSideTranslatorPB(
-        RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
-            conf, NetUtils.getDefaultSocketFactory(conf),
-            Client.getRpcTimeout(conf)), clientId.toString());
-  }
-
-  /**
-   * Get the proxy object which should be used until the next failover event
-   * occurs. RPC proxy object is intialized lazily.
-   * @return the OM proxy object to invoke methods upon
-   */
-  public synchronized OzoneManagerProtocolClientSideTranslatorPB getProxy() {
-    OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyIndex);
-    return createOMClientIfNeeded(currentOMProxyInfo);
-  }
-
-  private OzoneManagerProtocolClientSideTranslatorPB createOMClientIfNeeded(
-      OMProxyInfo proxyInfo) {
-    if (proxyInfo.getOMProxy() == null) {
-      try {
-        proxyInfo.setOMProxy(getOMClient(proxyInfo.getAddress()));
-      } catch (IOException ioe) {
-        LOG.error("{} Failed to create RPC proxy to OM at {}",
-            this.getClass().getSimpleName(), proxyInfo.getAddress(), ioe);
-        throw new RuntimeException(ioe);
-      }
-    }
-    return proxyInfo.getOMProxy();
-  }
-
-  /**
-   * Called whenever an error warrants failing over. It is determined by the
-   * retry policy.
-   */
-  public void performFailover() {
-    incrementProxyIndex();
-  }
-
-  synchronized void incrementProxyIndex() {
-    currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
-  }
-
-  /**
-   * Close all the proxy objects which have been opened over the lifetime of
-   * the proxy provider.
-   */
-  @Override
-  public synchronized void close() throws IOException {
-    for (OMProxyInfo proxy : omProxies) {
-      OzoneManagerProtocolClientSideTranslatorPB omProxy = proxy.getOMProxy();
-      if (omProxy != null) {
-        RPC.stopProxy(omProxy);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public List<OMProxyInfo> getOMProxies() {
-    return omProxies;
-  }
-}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
new file mode 100644
index 0000000..f5fdf6f
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.ha;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
+
+/**
+ * A failover proxy provider implementation which allows clients to configure
+ * multiple OMs to connect to. In case of OM failover, client can try
+ * connecting to another OM node from the list of proxies.
+ */
+public class OMFailoverProxyProvider implements
+    FailoverProxyProvider<OzoneManagerProtocolPB>, Closeable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMFailoverProxyProvider.class);
+
+  // Map of OMNodeID to its proxy
+  private Map<String, OMProxyInfo> omProxies;
+  private List<String> omNodeIDList;
+
+  private String currentProxyOMNodeId;
+  private int currentProxyIndex;
+
+  private final Configuration conf;
+  private final long omVersion;
+  private final UserGroupInformation ugi;
+
+  public OMFailoverProxyProvider(OzoneConfiguration configuration,
+      UserGroupInformation ugi) throws IOException {
+    this.conf = configuration;
+    this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
+    this.ugi = ugi;
+    loadOMClientConfigs(conf);
+
+    currentProxyIndex = 0;
+    currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
+  }
+
+  /**
+   * Class to store proxy information.
+   */
+  public final class OMProxyInfo
+      extends FailoverProxyProvider.ProxyInfo<OzoneManagerProtocolPB> {
+    private InetSocketAddress address;
+
+    OMProxyInfo(OzoneManagerProtocolPB proxy, String proxyInfoStr,
+        InetSocketAddress addr) {
+      super(proxy, proxyInfoStr);
+      this.address = addr;
+    }
+
+    public InetSocketAddress getAddress() {
+      return address;
+    }
+  }
+
+  private void loadOMClientConfigs(Configuration config) throws IOException {
+    this.omProxies = new HashMap<>();
+    this.omNodeIDList = new ArrayList<>();
+
+    Collection<String> omServiceIds = config.getTrimmedStringCollection(
+        OZONE_OM_SERVICE_IDS_KEY);
+
+    if (omServiceIds.size() > 1) {
+      throw new IllegalArgumentException("Multi-OM Services is not supported." +
+          " Please configure only one OM Service ID in " +
+          OZONE_OM_SERVICE_IDS_KEY);
+    }
+
+    for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
+      Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
+
+      for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+
+        String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+            serviceId, nodeId);
+        String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
+        if (rpcAddrStr == null) {
+          continue;
+        }
+
+        InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
+
+        // Add the OM client proxy info to list of proxies
+        if (addr != null) {
+          StringBuilder proxyInfo = new StringBuilder()
+              .append(nodeId).append("(")
+              .append(NetUtils.getHostPortString(addr)).append(")");
+          OMProxyInfo omProxyInfo = new OMProxyInfo(null,
+              proxyInfo.toString(), addr);
+
+          // For a non-HA OM setup, nodeId might be null. If so, we assign it
+          // a dummy value
+          if (nodeId == null) {
+            nodeId = OzoneConsts.OM_NODE_ID_DUMMY;
+          }
+          omProxies.put(nodeId, omProxyInfo);
+          omNodeIDList.add(nodeId);
+
+        } else {
+          LOG.error("Failed to create OM proxy for {} at address {}",
+              nodeId, rpcAddrStr);
+        }
+      }
+    }
+
+    if (omProxies.isEmpty()) {
+      throw new IllegalArgumentException("Could not find any configured " +
+          "addresses for OM. Please configure the system with "
+          + OZONE_OM_ADDRESS_KEY);
+    }
+  }
+
+  @VisibleForTesting
+  public synchronized String getCurrentProxyOMNodeId() {
+    return currentProxyOMNodeId;
+  }
+
+  private OzoneManagerProtocolPB createOMProxy(InetSocketAddress omAddress)
+      throws IOException {
+    return RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
+        conf, NetUtils.getDefaultSocketFactory(conf),
+        Client.getRpcTimeout(conf));
+  }
+
+  /**
+   * Get the proxy object which should be used until the next failover event
+   * occurs. RPC proxy object is intialized lazily.
+   * @return the OM proxy object to invoke methods upon
+   */
+  @Override
+  public synchronized OMProxyInfo getProxy() {
+    OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyOMNodeId);
+    createOMProxyIfNeeded(currentOMProxyInfo);
+    return currentOMProxyInfo;
+  }
+
+  /**
+   * Creates OM proxy object if it does not already exist.
+   */
+  private OMProxyInfo createOMProxyIfNeeded(OMProxyInfo proxyInfo) {
+    if (proxyInfo.proxy == null) {
+      try {
+        proxyInfo.proxy = createOMProxy(proxyInfo.address);
+      } catch (IOException ioe) {
+        LOG.error("{} Failed to create RPC proxy to OM at {}",
+            this.getClass().getSimpleName(), proxyInfo.address, ioe);
+        throw new RuntimeException(ioe);
+      }
+    }
+    return proxyInfo;
+  }
+
+  /**
+   * Called whenever an error warrants failing over. It is determined by the
+   * retry policy.
+   */
+  @Override
+  public void performFailover(OzoneManagerProtocolPB currentProxy) {
+    int newProxyIndex = incrementProxyIndex();
+    LOG.debug("Failing over OM proxy to index: {}, nodeId: {}",
+        newProxyIndex, omNodeIDList.get(newProxyIndex));
+  }
+
+  /**
+   * Update the proxy index to the next proxy in the list.
+   * @return the new proxy index
+   */
+  private synchronized int incrementProxyIndex() {
+    currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
+    currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
+    return currentProxyIndex;
+  }
+
+  @Override
+  public Class<OzoneManagerProtocolPB> getInterface() {
+    return OzoneManagerProtocolPB.class;
+  }
+
+  /**
+   * Performs failover if the leaderOMNodeId returned through OMReponse does
+   * not match the current leaderOMNodeId cached by the proxy provider.
+   */
+  public void performFailoverIfRequired(String newLeaderOMNodeId) {
+    if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
+      LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
+    }
+  }
+
+  /**
+   * Failover to the OM proxy specified by the new leader OMNodeId.
+   * @param newLeaderOMNodeId OMNodeId to failover to.
+   * @return true if failover is successful, false otherwise.
+   */
+  synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
+    if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) {
+      if (omProxies.containsKey(newLeaderOMNodeId)) {
+        currentProxyOMNodeId = newLeaderOMNodeId;
+        currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Close all the proxy objects which have been opened over the lifetime of
+   * the proxy provider.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    for (OMProxyInfo proxy : omProxies.values()) {
+      OzoneManagerProtocolPB omProxy = proxy.proxy;
+      if (omProxy != null) {
+        RPC.stopProxy(omProxy);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public List<OMProxyInfo> getOMProxies() {
+    return new ArrayList<>(omProxies.values());
+  }
+}
+
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
similarity index 94%
rename from hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java
rename to hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
index df0e69c..a95f09f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.client.rpc.ha;
+package org.apache.hadoop.ozone.om.ha;
 
 /**
  * This package contains Ozone Client's OM Proxy classes.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 6834043..9bb83a0 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 package org.apache.hadoop.ozone.om.protocol;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 
@@ -380,5 +381,11 @@ public interface OzoneManagerProtocol
    * @throws IOException
    */
   S3SecretValue getS3Secret(String kerberosID) throws IOException;
+
+  /**
+   * Get the OM Client's Retry and Failover Proxy provider.
+   * @return OMFailoverProxyProvider
+   */
+  OMFailoverProxyProvider getOMFailoverProxyProvider();
 }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 9136d2b..0d266c6 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -17,17 +17,25 @@
  */
 package org.apache.hadoop.ozone.om.protocolPB;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -102,6 +110,7 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
@@ -111,6 +120,9 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED;
@@ -131,20 +143,89 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
    */
   private static final RpcController NULL_RPC_CONTROLLER = null;
 
+  private final OMFailoverProxyProvider omFailoverProxyProvider;
   private final OzoneManagerProtocolPB rpcProxy;
   private final String clientID;
+  private static final Logger FAILOVER_PROXY_PROVIDER_LOG =
+      LoggerFactory.getLogger(OMFailoverProxyProvider.class);
+
+  public OzoneManagerProtocolClientSideTranslatorPB(
+      OzoneManagerProtocolPB proxy, String clientId) {
+    this.rpcProxy = proxy;
+    this.clientID = clientId;
+    this.omFailoverProxyProvider = null;
+  }
 
   /**
-   * Constructor for KeySpaceManger Client.
-   * @param rpcProxy
+   * Constructor for OM Protocol Client. This creates a {@link RetryProxy}
+   * over {@link OMFailoverProxyProvider} proxy. OMFailoverProxyProvider has
+   * one {@link OzoneManagerProtocolPB} proxy pointing to each OM node in the
+   * cluster.
    */
-  public OzoneManagerProtocolClientSideTranslatorPB(
-      OzoneManagerProtocolPB rpcProxy, String clientId) {
-    this.rpcProxy = rpcProxy;
+  public OzoneManagerProtocolClientSideTranslatorPB(OzoneConfiguration conf,
+      String clientId, UserGroupInformation ugi) throws IOException {
+    this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi);
+
+    int maxRetries = conf.getInt(
+        OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
+        OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
+    int maxFailovers = conf.getInt(
+        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+    int sleepBase = conf.getInt(
+        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY,
+        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
+    int sleepMax = conf.getInt(
+        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY,
+        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT);
+
+    this.rpcProxy = createRetryProxy(omFailoverProxyProvider,
+        maxRetries, maxFailovers, sleepBase, sleepMax);
     this.clientID = clientId;
   }
 
   /**
+   * Creates a {@link RetryProxy} encapsulating the
+   * {@link OMFailoverProxyProvider}. The retry proxy fails over on network
+   * exception or if the current proxy is not the leader OM.
+   */
+  private OzoneManagerProtocolPB createRetryProxy(
+      OMFailoverProxyProvider failoverProxyProvider,
+      int maxRetries, int maxFailovers, int delayMillis, int maxDelayBase) {
+    RetryPolicy retryPolicyOnNetworkException = RetryPolicies
+        .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+            maxFailovers, maxRetries, delayMillis, maxDelayBase);
+    RetryPolicy retryPolicy = new RetryPolicy() {
+      @Override
+      public RetryAction shouldRetry(Exception exception, int retries,
+          int failovers, boolean isIdempotentOrAtMostOnce)
+          throws Exception {
+        if (exception instanceof EOFException ||
+            exception instanceof  ServiceException) {
+          if (retries < maxRetries && failovers < maxFailovers) {
+            return RetryAction.FAILOVER_AND_RETRY;
+          } else {
+            FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
+                "Attempted {} retries and {} failovers", retries, failovers);
+            return RetryAction.FAIL;
+          }
+        } else {
+          return retryPolicyOnNetworkException.shouldRetry(
+                  exception, retries, failovers, isIdempotentOrAtMostOnce);
+        }
+      }
+    };
+    OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
+        OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy);
+    return proxy;
+  }
+
+  @VisibleForTesting
+  public OMFailoverProxyProvider getOMFailoverProxyProvider() {
+    return omFailoverProxyProvider;
+  }
+
+  /**
    * Closes this stream and releases any system resources associated
    * with it. If the stream is already closed then invoking this
    * method has no effect.
@@ -195,7 +276,19 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
       OMRequest payload = OMRequest.newBuilder(omRequest)
           .setTraceID(TracingUtil.exportCurrentSpan())
           .build();
-      return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
+
+      OMResponse omResponse =
+          rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
+
+      if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
+        String leaderOmId = omResponse.getLeaderOMNodeId();
+
+        // Failover to the OM node returned by OMReponse leaderOMNodeId if
+        // current proxy is not pointing to that node.
+        omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
+      }
+
+      return omResponse;
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 9a210e6..28a802c 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -140,6 +140,8 @@ message OMResponse {
 
   required Status status = 5;
 
+  optional string leaderOMNodeId = 6;
+
   optional CreateVolumeResponse              createVolumeResponse          = 11;
   optional SetVolumePropertyResponse         setVolumePropertyResponse     = 12;
   optional CheckVolumeAccessResponse         checkVolumeAccessResponse     = 13;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index a1ef1f6..f84f95e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -32,7 +32,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.BindException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
@@ -48,6 +50,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
   private static final Logger LOG =
       LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class);
 
+  private Map<String, OzoneManager> ozoneManagerMap;
   private List<OzoneManager> ozoneManagers;
 
   private static final Random RANDOM = new Random();
@@ -63,11 +66,12 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
 
   private MiniOzoneHAClusterImpl(
       OzoneConfiguration conf,
-      List<OzoneManager> omList,
+      Map<String, OzoneManager> omMap,
       StorageContainerManager scm,
       List<HddsDatanodeService> hddsDatanodes) {
     super(conf, scm, hddsDatanodes);
-    this.ozoneManagers = omList;
+    this.ozoneManagerMap = omMap;
+    this.ozoneManagers = new ArrayList<>(omMap.values());
   }
 
   /**
@@ -107,6 +111,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
     ozoneManagers.get(index).stop();
   }
 
+  public void stopOzoneManager(String omNodeId) {
+    ozoneManagerMap.get(omNodeId).stop();
+  }
+
   /**
    * Builder for configuring the MiniOzoneCluster to run.
    */
@@ -128,17 +136,17 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
       DefaultMetricsSystem.setMiniClusterMode(true);
       initializeConfiguration();
       StorageContainerManager scm;
-      List<OzoneManager> omList;
+      Map<String, OzoneManager> omMap;
       try {
         scm = createSCM();
         scm.start();
-        omList = createOMService();
+        omMap = createOMService();
       } catch (AuthenticationException ex) {
         throw new IOException("Unable to build MiniOzoneCluster. ", ex);
       }
 
       final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
-      MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omList,
+      MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap,
           scm, hddsDatanodes);
       if (startDataNodes) {
         cluster.startHddsDatanodes();
@@ -171,10 +179,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
      * @throws IOException
      * @throws AuthenticationException
      */
-    private List<OzoneManager> createOMService() throws IOException,
+    private Map<String, OzoneManager> createOMService() throws IOException,
         AuthenticationException {
 
-      List<OzoneManager> omList = new ArrayList<>(numOfOMs);
+      Map<String, OzoneManager> omMap = new HashMap<>();
 
       int retryCount = 0;
       int basePort = 10000;
@@ -186,10 +194,11 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
 
           for (int i = 1; i<= numOfOMs; i++) {
             // Set nodeId
-            conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeIdBaseStr + i);
+            String nodeId = nodeIdBaseStr + i;
+            conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
 
             // Set metadata/DB dir base path
-            String metaDirPath = path + "/" + nodeIdBaseStr + i;
+            String metaDirPath = path + "/" + nodeId;
             conf.set(OZONE_METADATA_DIRS, metaDirPath);
             OMStorage omStore = new OMStorage(conf);
             initializeOmStorage(omStore);
@@ -201,7 +210,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
 
             OzoneManager om = OzoneManager.createOm(null, conf);
             om.setCertClient(certClient);
-            omList.add(om);
+            omMap.put(nodeId, om);
 
             om.start();
             LOG.info("Started OzoneManager RPC server at " +
@@ -211,23 +220,24 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
           // Set default OM address to point to the first OM. Clients would
           // try connecting to this address by default
           conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
-              NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
+              NetUtils.getHostPortString(omMap.get(nodeIdBaseStr + 1)
+                  .getOmRpcServerAddr()));
 
           break;
         } catch (BindException e) {
-          for (OzoneManager om : omList) {
+          for (OzoneManager om : omMap.values()) {
             om.stop();
             om.join();
             LOG.info("Stopping OzoneManager server at " +
                 om.getOmRpcServerAddr());
           }
-          omList.clear();
+          omMap.clear();
           ++retryCount;
           LOG.info("MiniOzoneHACluster port conflicts, retried " +
               retryCount + " times");
         }
       }
-      return omList;
+      return omMap;
     }
 
     /**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 32792ae..0828fe8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -65,8 +65,6 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -76,6 +74,7 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocat
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -189,9 +188,10 @@ public abstract class TestOzoneRpcClientAbstract {
    */
   @Test
   public void testOMClientProxyProvider() {
-    OMProxyProvider omProxyProvider = store.getClientProxy()
+    OMFailoverProxyProvider omFailoverProxyProvider = store.getClientProxy()
         .getOMProxyProvider();
-    List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies();
+    List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
+        omFailoverProxyProvider.getOMProxies();
 
     // For a non-HA OM service, there should be only one OM proxy.
     Assert.assertEquals(1, omProxies.size());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index 62cda91..da8f870 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -18,30 +18,29 @@ package org.apache.hadoop.ozone.om;
 
 
 import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdfs.LogVerificationAppender;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
+import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
-import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.web.handlers.UserArgs;
-import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
-import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
-import org.apache.hadoop.ozone.web.response.VolumeInfo;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.VolumeArgs;
+import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.Timeout;
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.UUID;
@@ -50,6 +49,14 @@ import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
     .NODE_FAILURE_TIMEOUT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
 
 /**
@@ -58,8 +65,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
 public class TestOzoneManagerHA {
 
   private MiniOzoneHAClusterImpl cluster = null;
-  private StorageHandler storageHandler;
-  private UserArgs userArgs;
+  private ObjectStore objectStore;
   private OzoneConfiguration conf;
   private String clusterId;
   private String scmId;
@@ -69,7 +75,7 @@ public class TestOzoneManagerHA {
   public ExpectedException exception = ExpectedException.none();
 
   @Rule
-  public Timeout timeout = new Timeout(60_000);
+  public Timeout timeout = new Timeout(120_000);
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -85,6 +91,9 @@ public class TestOzoneManagerHA {
     scmId = UUID.randomUUID().toString();
     conf.setBoolean(OZONE_ACL_ENABLED, true);
     conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
+    conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 3);
+    conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 3);
+    conf.setInt(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, 50);
 
     cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
         .setClusterId(clusterId)
@@ -93,9 +102,7 @@ public class TestOzoneManagerHA {
         .setNumOfOzoneManagers(numOfOMs)
         .build();
     cluster.waitForClusterToBeReady();
-    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
-    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
-        null, null, null, null);
+    objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
   }
 
   /**
@@ -115,7 +122,7 @@ public class TestOzoneManagerHA {
    */
   @Test
   public void testAllOMNodesRunning() throws Exception {
-    testCreateVolume(true);
+    createVolumeTest(true);
   }
 
   /**
@@ -126,52 +133,56 @@ public class TestOzoneManagerHA {
     cluster.stopOzoneManager(1);
     Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
 
-    testCreateVolume(true);
+    createVolumeTest(true);
   }
 
   /**
    * Test client request fails when 2 OMs are down.
    */
   @Test
-  @Ignore("TODO:HDDS-1158")
   public void testTwoOMNodesDown() throws Exception {
     cluster.stopOzoneManager(1);
     cluster.stopOzoneManager(2);
     Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
 
-    testCreateVolume(false);
+    createVolumeTest(false);
   }
 
   /**
    * Create a volume and test its attribute.
    */
-  private void testCreateVolume(boolean checkSuccess) throws Exception {
+  private void createVolumeTest(boolean checkSuccess) throws Exception {
     String userName = "user" + RandomStringUtils.randomNumeric(5);
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
 
-    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
-    createVolumeArgs.setUserName(userName);
-    createVolumeArgs.setAdminName(adminName);
+    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
+        .setOwner(userName)
+        .setAdmin(adminName)
+        .build();
 
     try {
-      storageHandler.createVolume(createVolumeArgs);
+      objectStore.createVolume(volumeName, createVolumeArgs);
 
-      VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
-      VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
+      OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
 
       if (checkSuccess) {
-        Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
-        Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
+        Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
+        Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
+        Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
       } else {
         // Verify that the request failed
-        Assert.assertTrue(retVolumeinfo.getVolumeName().isEmpty());
         Assert.fail("There is no quorum. Request should have failed");
       }
-    } catch (OMException e) {
+    } catch (ConnectException | RemoteException e) {
       if (!checkSuccess) {
-        GenericTestUtils.assertExceptionContains(
-            "RaftRetryFailureException", e);
+        // If the last OM to be tried by the RetryProxy is down, we would get
+        // ConnectException. Otherwise, we would get a RemoteException from the
+        // last running OM as it would fail to get a quorum.
+        if (e instanceof RemoteException) {
+          GenericTestUtils.assertExceptionContains(
+              "RaftRetryFailureException", e);
+        }
       } else {
         throw e;
       }
@@ -179,14 +190,16 @@ public class TestOzoneManagerHA {
   }
 
   /**
-   * Test that OMProxyProvider creates an OM proxy for each OM in the cluster.
+   * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
+   * cluster.
    */
   @Test
-  public void testOMClientProxyProvide() throws Exception {
+  public void testOMProxyProviderInitialization() throws Exception {
     OzoneClient rpcClient = cluster.getRpcClient();
-    OMProxyProvider omProxyProvider =
+    OMFailoverProxyProvider omFailoverProxyProvider =
         rpcClient.getObjectStore().getClientProxy().getOMProxyProvider();
-    List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies();
+    List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
+        omFailoverProxyProvider.getOMProxies();
 
     Assert.assertEquals(numOfOMs, omProxies.size());
 
@@ -194,7 +207,7 @@ public class TestOzoneManagerHA {
       InetSocketAddress omRpcServerAddr =
           cluster.getOzoneManager(i).getOmRpcServerAddr();
       boolean omClientProxyExists = false;
-      for (OMProxyInfo omProxyInfo : omProxies) {
+      for (OMFailoverProxyProvider.OMProxyInfo omProxyInfo : omProxies) {
         if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
           omClientProxyExists = true;
           break;
@@ -205,4 +218,99 @@ public class TestOzoneManagerHA {
           omClientProxyExists);
     }
   }
+
+  /**
+   * Test OMFailoverProxyProvider failover on connection exception to OM client.
+   */
+  @Test
+  public void testOMProxyProviderFailoverOnConnectionFailure()
+      throws Exception {
+    OMFailoverProxyProvider omFailoverProxyProvider =
+        objectStore.getClientProxy().getOMProxyProvider();
+    String firstProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    createVolumeTest(true);
+
+    // On stopping the current OM Proxy, the next connection attempt should
+    // failover to a another OM proxy.
+    cluster.stopOzoneManager(firstProxyNodeId);
+    Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT * 4);
+
+    // Next request to the proxy provider should result in a failover
+    createVolumeTest(true);
+    Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
+
+    // Get the new OM Proxy NodeId
+    String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    // Verify that a failover occured. the new proxy nodeId should be
+    // different from the old proxy nodeId.
+    Assert.assertNotEquals("Failover did not occur as expected",
+        firstProxyNodeId, newProxyNodeId);
+  }
+
+  /**
+   * Test OMFailoverProxyProvider failover when current OM proxy is not
+   * the current OM Leader.
+   */
+  @Test
+  public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
+    OMFailoverProxyProvider omFailoverProxyProvider =
+        objectStore.getClientProxy().getOMProxyProvider();
+
+    // Run couple of createVolume tests to discover the current Leader OM
+    createVolumeTest(true);
+    createVolumeTest(true);
+
+    // The OMFailoverProxyProvider will point to the current leader OM node.
+    String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    // Perform a manual failover of the proxy provider to move the
+    // currentProxyIndex to a node other than the leader OM.
+    omFailoverProxyProvider.performFailover(
+        omFailoverProxyProvider.getProxy().proxy);
+
+    String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
+    Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId);
+
+    // Once another request is sent to this new proxy node, the leader
+    // information must be returned via the response and a failover must
+    // happen to the leader proxy node.
+    createVolumeTest(true);
+    Thread.sleep(2000);
+
+    String newLeaderOMNodeId =
+        omFailoverProxyProvider.getCurrentProxyOMNodeId();
+
+    // The old and new Leader OM NodeId must match since there was no new
+    // election in the Ratis ring.
+    Assert.assertEquals(leaderOMNodeId, newLeaderOMNodeId);
+  }
+
+  @Test
+  public void testOMRetryProxy() throws Exception {
+    // Stop all the OMs. After making 5 (set maxRetries value) attempts at
+    // connection, the RpcClient should give up.
+    for (int i = 0; i < numOfOMs; i++) {
+      cluster.stopOzoneManager(i);
+    }
+
+    final LogVerificationAppender appender = new LogVerificationAppender();
+    final org.apache.log4j.Logger logger = Logger.getRootLogger();
+    logger.addAppender(appender);
+
+    try {
+      createVolumeTest(true);
+      Assert.fail("TestOMRetryProxy should fail when there are no OMs running");
+    } catch (ConnectException e) {
+      // Each retry attempt tries upto 10 times to connect. So there should be
+      // 3*10 "Retrying connect to server" messages
+      Assert.assertEquals(30,
+          appender.countLinesWithMessage("Retrying connect to server:"));
+
+      Assert.assertEquals(1,
+          appender.countLinesWithMessage("Failed to connect to OM. Attempted " +
+              "3 retries and 3 failovers"));
+    }
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 6f7d7ca..d2ade56 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.security.OzoneSecurityException;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@@ -2461,4 +2462,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   public List<OMNodeDetails> getPeerNodes() {
     return peerNodes;
   }
+
+  @Override
+  public OMFailoverProxyProvider getOMFailoverProxyProvider() {
+    return null;
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
index 9115421..8e4582d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
@@ -32,6 +32,7 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.retry.RetryPolicy;
@@ -100,10 +101,12 @@ public final class OMRatisHelper {
     return Message.valueOf(ByteString.copyFrom(requestBytes));
   }
 
-  static OMResponse convertByteStringToOMResponse(ByteString byteString)
+  static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply)
       throws InvalidProtocolBufferException {
-    byte[] bytes = byteString.toByteArray();
-    return OMResponse.parseFrom(bytes);
+    byte[] bytes = reply.getMessage().getContent().toByteArray();
+    return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
+        .setLeaderOMNodeId(reply.getReplierId())
+        .build();
   }
 
   static OMResponse getErrorResponse(Type cmdType, Exception e) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
index 9e1cafc..1b4c634 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
@@ -23,7 +23,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+
 import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.ServiceException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -53,24 +56,24 @@ public final class OzoneManagerRatisClient implements Closeable {
       OzoneManagerRatisClient.class);
 
   private final RaftGroup raftGroup;
-  private final String omID;
+  private final String omNodeID;
   private final RpcType rpcType;
   private RaftClient raftClient;
   private final RetryPolicy retryPolicy;
   private final Configuration conf;
 
-  private OzoneManagerRatisClient(String omId, RaftGroup raftGroup,
+  private OzoneManagerRatisClient(String omNodeId, RaftGroup raftGroup,
       RpcType rpcType, RetryPolicy retryPolicy,
       Configuration config) {
     this.raftGroup = raftGroup;
-    this.omID = omId;
+    this.omNodeID = omNodeId;
     this.rpcType = rpcType;
     this.retryPolicy = retryPolicy;
     this.conf = config;
   }
 
   public static OzoneManagerRatisClient newOzoneManagerRatisClient(
-      String omId, RaftGroup raftGroup, Configuration conf) {
+      String omNodeId, RaftGroup raftGroup, Configuration conf) {
     final String rpcType = conf.get(
         OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
         OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
@@ -87,19 +90,19 @@ public final class OzoneManagerRatisClient implements Closeable {
     final RetryPolicy retryPolicy = RetryPolicies
         .retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
 
-    return new OzoneManagerRatisClient(omId, raftGroup,
+    return new OzoneManagerRatisClient(omNodeId, raftGroup,
         SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, conf);
   }
 
   public void connect() {
     LOG.debug("Connecting to OM Ratis Server GroupId:{} OM:{}",
-        raftGroup.getGroupId().getUuid().toString(), omID);
+        raftGroup.getGroupId().getUuid().toString(), omNodeID);
 
     // TODO : XceiverClient ratis should pass the config value of
     // maxOutstandingRequests so as to set the upper bound on max no of async
     // requests to be handled by raft client
 
-    raftClient = OMRatisHelper.newRaftClient(rpcType, omID, raftGroup,
+    raftClient = OMRatisHelper.newRaftClient(rpcType, omNodeID, raftGroup,
         retryPolicy, conf);
   }
 
@@ -119,13 +122,12 @@ public final class OzoneManagerRatisClient implements Closeable {
    * @param request Request
    * @return Response to the command
    */
-  public OMResponse sendCommand(OMRequest request) {
+  public OMResponse sendCommand(OMRequest request) throws ServiceException {
     try {
       CompletableFuture<OMResponse> reply = sendCommandAsync(request);
       return reply.get();
     } catch (ExecutionException | InterruptedException e) {
-      LOG.error("Failed to execute command: " + request, e);
-      return OMRatisHelper.getErrorResponse(request.getCmdType(), e);
+      throw new ServiceException(e);
     }
   }
 
@@ -152,9 +154,10 @@ public final class OzoneManagerRatisClient implements Closeable {
                 if (raftRetryFailureException != null) {
                   throw new CompletionException(raftRetryFailureException);
                 }
+
                 OMResponse response = OMRatisHelper
-                    .convertByteStringToOMResponse(reply.getMessage()
-                        .getContent());
+                    .getOMResponseFromRaftClientReply(reply);
+
                 return response;
               } catch (InvalidProtocolBufferException e) {
                 throw new CompletionException(e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 5684fa5..2f1d64d8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -80,7 +80,8 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
   /**
    * Submits request to OM's Ratis server.
    */
-  private OMResponse submitRequestToRatis(OMRequest request) {
+  private OMResponse submitRequestToRatis(OMRequest request)
+      throws ServiceException {
     return omRatisClient.sendCommand(request);
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
index 83d2245..8a8be35 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
@@ -34,8 +34,6 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMNodeDetails;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
-    .OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -110,27 +108,6 @@ public class TestOzoneManagerRatisServer {
   }
 
   /**
-   * Submit any request to OM Ratis server and check that the dummy response
-   * message is received.
-   */
-  @Test
-  public void testSubmitRatisRequest() throws Exception {
-    // Wait for leader election
-    Thread.sleep(LEADER_ELECTION_TIMEOUT * 2);
-    OMRequest request = OMRequest.newBuilder()
-        .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
-        .setClientId(clientId)
-        .build();
-
-    OMResponse response = omRatisClient.sendCommand(request);
-
-    Assert.assertEquals(OzoneManagerProtocolProtos.Type.CreateVolume,
-        response.getCmdType());
-    Assert.assertEquals(false, response.getSuccess());
-    Assert.assertEquals(false, response.hasCreateVolumeResponse());
-  }
-
-  /**
    * Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are
    * categorized in {@link OmUtils#isReadOnly(OMRequest)}.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org