You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ha...@apache.org on 2019/03/07 03:45:02 UTC

[hadoop] branch trunk updated: HDDS-1175. Serve read requests directly from RocksDB. (#557)

This is an automated email from the ASF dual-hosted git repository.

hanishakoneru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bb12e81  HDDS-1175. Serve read requests directly from RocksDB. (#557)
bb12e81 is described below

commit bb12e81ec84e2a2a1154b756d05336018c70f946
Author: Hanisha Koneru <ko...@gmail.com>
AuthorDate: Wed Mar 6 19:44:55 2019 -0800

    HDDS-1175. Serve read requests directly from RocksDB. (#557)
    
    HDDS-1175. Serve read requests directly from RocksDB.
---
 .../common/src/main/resources/ozone-default.xml    |  10 ++
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   7 +
 .../ozone/om/exceptions/NotLeaderException.java    |  51 +++++++
 .../ozone/om/ha/OMFailoverProxyProvider.java       |  10 +-
 .../hadoop/ozone/om/helpers}/OMRatisHelper.java    |  36 ++---
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  37 +++--
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |  42 +++++-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   4 +-
 .../ozone/om/ratis/OzoneManagerRatisClient.java    |   1 +
 .../ozone/om/ratis/OzoneManagerRatisServer.java    | 157 ++++++++++++++++++++-
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |  19 ++-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |  45 +++++-
 12 files changed, 374 insertions(+), 45 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a4f49e7..a95d9d1 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1646,6 +1646,16 @@
     </description>
   </property>
 
+  <property>
+    <name>ozone.om.ratis.server.role.check.interval</name>
+    <value>15s</value>
+    <tag>OZONE, OM, RATIS, MANAGEMENT</tag>
+    <description>The interval between OM leader performing a role
+      check on its ratis server. Ratis server informs OM if it
+      loses the leader role. The scheduled check is an secondary
+      check to ensure that the leader role is updated periodically
+      .</description>
+  </property>
 
   <property>
     <name>ozone.acl.authorizer.class</name>
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index ab251cb..7b13471 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -183,6 +183,13 @@ public final class OMConfigKeys {
       OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT
       = TimeDuration.valueOf(120, TimeUnit.SECONDS);
 
+  // OM Leader server role check interval
+  public static final String OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY
+      = "ozone.om.ratis.server.role.check.interval";
+  public static final TimeDuration
+      OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
+      = TimeDuration.valueOf(15, TimeUnit.SECONDS);
+
   public static final String OZONE_OM_KERBEROS_KEYTAB_FILE_KEY = "ozone.om."
       + "kerberos.keytab.file";
   public static final String OZONE_OM_KERBEROS_PRINCIPAL_KEY = "ozone.om"
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/NotLeaderException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/NotLeaderException.java
new file mode 100644
index 0000000..974ab0e
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/NotLeaderException.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.exceptions;
+
+import java.io.IOException;
+
+/**
+ * Exception thrown by
+ * {@link org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB} when
+ * a read request is received by a non leader OM node.
+ */
+public class NotLeaderException extends IOException {
+
+  private final String currentPeerId;
+  private final String leaderPeerId;
+
+  public NotLeaderException(String currentPeerIdStr) {
+    super("OM " + currentPeerIdStr + " is not the leader. Could not " +
+        "determine the leader node.");
+    this.currentPeerId = currentPeerIdStr;
+    this.leaderPeerId = null;
+  }
+
+  public NotLeaderException(String currentPeerIdStr,
+      String suggestedLeaderPeerIdStr) {
+    super("OM " + currentPeerIdStr + " is not the leader. Suggested leader is "
+        + suggestedLeaderPeerIdStr);
+    this.currentPeerId = currentPeerIdStr;
+    this.leaderPeerId = suggestedLeaderPeerIdStr;
+  }
+
+  public String getSuggestedLeaderNodeId() {
+    return leaderPeerId;
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index 5c1b39f..b4a4857 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -226,8 +226,14 @@ public class OMFailoverProxyProvider implements
    * not match the current leaderOMNodeId cached by the proxy provider.
    */
   public void performFailoverIfRequired(String newLeaderOMNodeId) {
-    if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
-      LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
+    if (newLeaderOMNodeId == null) {
+      LOG.debug("No suggested leader nodeId. Performing failover to next peer" +
+          " node");
+      performFailover(null);
+    } else {
+      if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
+        LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
+      }
     }
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
similarity index 77%
rename from hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
rename to hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
index 8e4582d..bc64d6c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OMRatisHelper.java
@@ -15,7 +15,7 @@
  * the License.
  */
 
-package org.apache.hadoop.ozone.om.ratis;
+package org.apache.hadoop.ozone.om.helpers;
 
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.conf.Configuration;
@@ -25,8 +25,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMResponse;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
-import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
@@ -54,14 +52,15 @@ public final class OMRatisHelper {
 
   /**
    * Creates a new RaftClient object.
-   * @param rpcType Replication Type
-   * @param omId OM id of the client
-   * @param group RaftGroup
+   *
+   * @param rpcType     Replication Type
+   * @param omId        OM id of the client
+   * @param group       RaftGroup
    * @param retryPolicy Retry policy
    * @return RaftClient object
    */
-  static RaftClient newRaftClient(RpcType rpcType, String omId, RaftGroup
-      group, RetryPolicy retryPolicy,   Configuration conf) {
+  public static RaftClient newRaftClient(RpcType rpcType, String omId, RaftGroup
+      group, RetryPolicy retryPolicy, Configuration conf) {
     LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, omId, group);
     final RaftProperties properties = new RaftProperties();
     RaftConfigKeys.Rpc.setType(properties, rpcType);
@@ -85,36 +84,27 @@ public final class OMRatisHelper {
     return RaftPeerId.valueOf(omId);
   }
 
-  static ByteString convertRequestToByteString(OMRequest request) {
+  public static ByteString convertRequestToByteString(OMRequest request) {
     byte[] requestBytes = request.toByteArray();
     return ByteString.copyFrom(requestBytes);
   }
 
-  static OMRequest convertByteStringToOMRequest(ByteString byteString)
+  public static OMRequest convertByteStringToOMRequest(ByteString byteString)
       throws InvalidProtocolBufferException {
     byte[] bytes = byteString.toByteArray();
     return OMRequest.parseFrom(bytes);
   }
 
-  static Message convertResponseToMessage(OMResponse response) {
+  public static Message convertResponseToMessage(OMResponse response) {
     byte[] requestBytes = response.toByteArray();
     return Message.valueOf(ByteString.copyFrom(requestBytes));
   }
 
-  static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply)
-      throws InvalidProtocolBufferException {
+  public static OMResponse getOMResponseFromRaftClientReply(
+      RaftClientReply reply) throws InvalidProtocolBufferException {
     byte[] bytes = reply.getMessage().getContent().toByteArray();
     return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
         .setLeaderOMNodeId(reply.getReplierId())
         .build();
   }
-
-  static OMResponse getErrorResponse(Type cmdType, Exception e) {
-    return OMResponse.newBuilder()
-        .setCmdType(cmdType)
-        .setSuccess(false)
-        .setMessage(e.getMessage())
-        .setStatus(Status.INTERNAL_ERROR)
-        .build();
-  }
-}
+}
\ No newline at end of file
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 63a656c..c06efdc 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
@@ -195,29 +196,49 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
   private OzoneManagerProtocolPB createRetryProxy(
       OMFailoverProxyProvider failoverProxyProvider,
       int maxRetries, int maxFailovers, int delayMillis, int maxDelayBase) {
+
     RetryPolicy retryPolicyOnNetworkException = RetryPolicies
         .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
             maxFailovers, maxRetries, delayMillis, maxDelayBase);
+
     RetryPolicy retryPolicy = new RetryPolicy() {
       @Override
       public RetryAction shouldRetry(Exception exception, int retries,
           int failovers, boolean isIdempotentOrAtMostOnce)
           throws Exception {
-        if (exception instanceof EOFException ||
-            exception instanceof  ServiceException) {
-          if (retries < maxRetries && failovers < maxFailovers) {
-            return RetryAction.FAILOVER_AND_RETRY;
+
+        if (exception instanceof ServiceException) {
+          Throwable cause = exception.getCause();
+          if (cause instanceof NotLeaderException) {
+            NotLeaderException notLeaderException = (NotLeaderException) cause;
+            omFailoverProxyProvider.performFailoverIfRequired(
+                notLeaderException.getSuggestedLeaderNodeId());
+            return getRetryAction(RetryAction.RETRY, retries, failovers);
           } else {
-            FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
-                "Attempted {} retries and {} failovers", retries, failovers);
-            return RetryAction.FAIL;
+            return getRetryAction(RetryAction.FAILOVER_AND_RETRY, retries,
+                failovers);
           }
+        } else if (exception instanceof EOFException) {
+          return getRetryAction(RetryAction.FAILOVER_AND_RETRY, retries,
+              failovers);
         } else {
           return retryPolicyOnNetworkException.shouldRetry(
-                  exception, retries, failovers, isIdempotentOrAtMostOnce);
+              exception, retries, failovers, isIdempotentOrAtMostOnce);
+        }
+      }
+
+      private RetryAction getRetryAction(RetryAction fallbackAction,
+          int retries, int failovers) {
+        if (retries < maxRetries && failovers < maxFailovers) {
+          return fallbackAction;
+        } else {
+          FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
+              "Attempted {} retries and {} failovers", retries, failovers);
+          return RetryAction.FAIL;
         }
       }
     };
+
     OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
         OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy);
     return proxy;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index da8f870..86a83b7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -53,8 +53,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
@@ -75,7 +73,7 @@ public class TestOzoneManagerHA {
   public ExpectedException exception = ExpectedException.none();
 
   @Rule
-  public Timeout timeout = new Timeout(120_000);
+  public Timeout timeout = new Timeout(300_000);
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -93,7 +91,6 @@ public class TestOzoneManagerHA {
     conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
     conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 3);
     conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 3);
-    conf.setInt(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, 50);
 
     cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
         .setClusterId(clusterId)
@@ -313,4 +310,41 @@ public class TestOzoneManagerHA {
               "3 retries and 3 failovers"));
     }
   }
+
+  @Test
+  public void testReadRequest() throws Exception {
+    String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
+    objectStore.createVolume(volumeName);
+
+    OMFailoverProxyProvider omFailoverProxyProvider =
+        objectStore.getClientProxy().getOMProxyProvider();
+    String currentLeaderNodeId = omFailoverProxyProvider
+        .getCurrentProxyOMNodeId();
+
+    // A read request from any proxy should failover to the current leader OM
+    for (int i = 0; i < numOfOMs; i++) {
+      // Failover OMFailoverProxyProvider to OM at index i
+      OzoneManager ozoneManager = cluster.getOzoneManager(i);
+      String omHostName = ozoneManager.getOmRpcServerAddr().getHostName();
+      int rpcPort = ozoneManager.getOmRpcServerAddr().getPort();
+
+      // Get the ObjectStore and FailoverProxyProvider for OM at index i
+      final ObjectStore store = OzoneClientFactory.getRpcClient(
+          omHostName, rpcPort, conf).getObjectStore();
+      final OMFailoverProxyProvider proxyProvider =
+          store.getClientProxy().getOMProxyProvider();
+
+      // Failover to the OM node that the objectStore points to
+      omFailoverProxyProvider.performFailoverIfRequired(
+          ozoneManager.getOMNodId());
+
+      // A read request should result in the proxyProvider failing over to
+      // leader node.
+      OzoneVolume volume = store.getVolume(volumeName);
+      Assert.assertEquals(volumeName, volume.getName());
+
+      Assert.assertEquals(currentLeaderNodeId,
+          proxyProvider.getCurrentProxyOMNodeId());
+    }
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index fc4ad01..326b12c 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -1236,8 +1236,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         ProtobufRpcEngine.class);
 
     BlockingService omService = newReflectiveBlockingService(
-        new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisClient,
-            isRatisEnabled));
+        new OzoneManagerProtocolServerSideTranslatorPB(this, omRatisServer,
+            omRatisClient, isRatisEnabled));
     return startRpcServer(configuration, omNodeRpcAddr,
         OzoneManagerProtocolPB.class, omService,
         handlerCount);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
index 1b4c634..c9c48a4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
@@ -30,6 +30,7 @@ import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index e03293a..a3cde3e 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -27,8 +27,13 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
@@ -41,6 +46,11 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
@@ -50,6 +60,7 @@ import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
@@ -69,7 +80,22 @@ public final class OzoneManagerRatisServer {
   private final RaftGroupId raftGroupId;
   private final RaftGroup raftGroup;
   private final RaftPeerId raftPeerId;
+
   private final OzoneManagerProtocol ozoneManager;
+  private final ClientId clientId = ClientId.randomId();
+
+  private final ScheduledExecutorService scheduledRoleChecker;
+  private long roleCheckInitialDelayMs = 1000; // 1 second default
+  private long roleCheckIntervalMs;
+  private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
+  private Optional<RaftPeerRole> cachedPeerRole = Optional.empty();
+  private Optional<RaftPeerId> cachedLeaderPeerId = Optional.empty();
+
+  private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+
+  private static long nextCallId() {
+    return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+  }
 
   /**
    * Returns an OM Ratis server.
@@ -108,6 +134,20 @@ public final class OzoneManagerRatisServer {
         .setProperties(serverProperties)
         .setStateMachine(getStateMachine(this.raftGroupId))
         .build();
+
+    // Run a scheduler to check and update the server role on the leader
+    // periodically
+    this.scheduledRoleChecker = Executors.newSingleThreadScheduledExecutor();
+    this.scheduledRoleChecker.scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        // Run this check only on the leader OM
+        if (cachedPeerRole.isPresent() &&
+            cachedPeerRole.get() == RaftPeerRole.LEADER) {
+          updateServerRole();
+        }
+      }
+    }, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
   }
 
   /**
@@ -156,7 +196,11 @@ public final class OzoneManagerRatisServer {
    * Returns OzoneManager StateMachine.
    */
   private BaseStateMachine getStateMachine(RaftGroupId gid) {
-    return  new OzoneManagerStateMachine(ozoneManager);
+    return  new OzoneManagerStateMachine(this);
+  }
+
+  public OzoneManagerProtocol getOzoneManager() {
+    return ozoneManager;
   }
 
   /**
@@ -323,6 +367,19 @@ public final class OzoneManagerRatisServer {
     RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
         nodeFailureTimeout);
 
+    TimeUnit roleCheckIntervalUnit =
+        OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
+            .getUnit();
+    long roleCheckIntervalDuration = conf.getTimeDuration(
+        OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY,
+        OMConfigKeys.OZONE_OM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT
+            .getDuration(), nodeFailureTimeoutUnit);
+    this.roleCheckIntervalMs = TimeDuration.valueOf(
+        roleCheckIntervalDuration, roleCheckIntervalUnit)
+        .toLong(TimeUnit.MILLISECONDS);
+    this.roleCheckInitialDelayMs = leaderElectionMinTimeout
+        .toLong(TimeUnit.MILLISECONDS);
+
     /**
      * TODO: when ratis snapshots are implemented, set snapshot threshold and
      * queue size.
@@ -331,6 +388,104 @@ public final class OzoneManagerRatisServer {
     return properties;
   }
 
+  /**
+   * Check the cached leader status.
+   * @return true if cached role is Leader, false otherwise.
+   */
+  private boolean checkCachedPeerRoleIsLeader() {
+    this.roleCheckLock.readLock().lock();
+    try {
+      if (cachedPeerRole.isPresent() &&
+          cachedPeerRole.get() == RaftPeerRole.LEADER) {
+        return true;
+      }
+      return false;
+    } finally {
+      this.roleCheckLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Check if the current OM node is the leader node.
+   * @return true if Leader, false otherwise.
+   */
+  public boolean isLeader() {
+    if (checkCachedPeerRoleIsLeader()) {
+      return true;
+    }
+
+    // Get the server role from ratis server and update the cached values.
+    updateServerRole();
+
+    // After updating the server role, check and return if leader or not.
+    return checkCachedPeerRoleIsLeader();
+  }
+
+  /**
+   * Get the suggested leader peer id.
+   * @return RaftPeerId of the suggested leader node.
+   */
+  public Optional<RaftPeerId> getCachedLeaderPeerId() {
+    this.roleCheckLock.readLock().lock();
+    try {
+      return cachedLeaderPeerId;
+    } finally {
+      this.roleCheckLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get the gorup info (peer role and leader peer id) from Ratis server and
+   * update the OM server role.
+   */
+  public void updateServerRole() {
+    try {
+      GroupInfoReply groupInfo = getGroupInfo();
+      RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
+      RaftPeerRole thisNodeRole = roleInfoProto.getRole();
+
+      if (thisNodeRole.equals(RaftPeerRole.LEADER)) {
+        setServerRole(thisNodeRole, raftPeerId);
+
+      } else if (thisNodeRole.equals(RaftPeerRole.FOLLOWER)) {
+        ByteString leaderNodeId = roleInfoProto.getFollowerInfo()
+            .getLeaderInfo().getId().getId();
+        RaftPeerId leaderPeerId = RaftPeerId.valueOf(leaderNodeId);
+
+        setServerRole(thisNodeRole, leaderPeerId);
+
+      } else {
+        setServerRole(thisNodeRole, null);
+
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to retrieve RaftPeerRole. Setting cached role to " +
+          "{} and resetting leader info.", RaftPeerRole.UNRECOGNIZED, e);
+      setServerRole(null, null);
+    }
+  }
+
+  /**
+   * Set the current server role and the leader peer id.
+   */
+  private void setServerRole(RaftPeerRole currentRole,
+      RaftPeerId leaderPeerId) {
+    this.roleCheckLock.writeLock().lock();
+    try {
+      this.cachedPeerRole = Optional.ofNullable(currentRole);
+      this.cachedLeaderPeerId = Optional.ofNullable(leaderPeerId);
+    } finally {
+      this.roleCheckLock.writeLock().unlock();
+    }
+  }
+
+  private GroupInfoReply getGroupInfo() throws IOException {
+    GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
+        raftPeerId, raftGroupId, nextCallId());
+    GroupInfoReply groupInfo = server.getGroupInfo(groupInfoRequest);
+    return groupInfo;
+  }
+
   public int getServerPort() {
     return port;
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
index 701ac16..acbbd34 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.ozone.om.ratis;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.ozone.container.common.transport.server.ratis
     .ContainerStateMachine;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.OMRatisHelper;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
@@ -54,11 +56,15 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
       LoggerFactory.getLogger(ContainerStateMachine.class);
   private final SimpleStateMachineStorage storage =
       new SimpleStateMachineStorage();
+  private final OzoneManagerRatisServer omRatisServer;
+  private final OzoneManagerProtocol ozoneManager;
   private final OzoneManagerRequestHandler handler;
   private RaftGroupId raftGroupId;
 
-  public OzoneManagerStateMachine(OzoneManagerProtocol om) {
-    this.handler = new OzoneManagerRequestHandler(om);
+  public OzoneManagerStateMachine(OzoneManagerRatisServer ratisServer) {
+    this.omRatisServer = ratisServer;
+    this.ozoneManager = omRatisServer.getOzoneManager();
+    this.handler = new OzoneManagerRequestHandler(ozoneManager);
   }
 
   /**
@@ -138,6 +144,15 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
   }
 
   /**
+   * Notifies the state machine that the raft peer is no longer leader.
+   */
+  @Override
+  public void notifyNotLeader(Collection<TransactionContext> pendingEntries)
+      throws IOException {
+    omRatisServer.updateServerRole();
+  }
+
+  /**
    * Submits request to OM and returns the response Message.
    * @param request OMRequest
    * @return response from OM
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 2f1d64d8..395cc42 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -17,18 +17,24 @@
 package org.apache.hadoop.ozone.protocolPB;
 
 import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.exceptions.NotLeaderException;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import io.opentracing.Scope;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
+
 /**
  * This class is the server-side translator that forwards requests received on
  * {@link OzoneManagerProtocolPB}
@@ -38,6 +44,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
     OzoneManagerProtocolPB {
   private static final Logger LOG = LoggerFactory
       .getLogger(OzoneManagerProtocolServerSideTranslatorPB.class);
+  private final OzoneManagerRatisServer omRatisServer;
   private final OzoneManagerRatisClient omRatisClient;
   private final OzoneManagerRequestHandler handler;
   private final boolean isRatisEnabled;
@@ -48,9 +55,10 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
    * @param impl OzoneManagerProtocolPB
    */
   public OzoneManagerProtocolServerSideTranslatorPB(
-      OzoneManagerProtocol impl, OzoneManagerRatisClient ratisClient,
-      boolean enableRatis) {
+      OzoneManagerProtocol impl, OzoneManagerRatisServer ratisServer,
+      OzoneManagerRatisClient ratisClient, boolean enableRatis) {
     handler = new OzoneManagerRequestHandler(impl);
+    this.omRatisServer = ratisServer;
     this.omRatisClient = ratisClient;
     this.isRatisEnabled = enableRatis;
   }
@@ -68,7 +76,12 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
             request.getTraceID());
     try {
       if (isRatisEnabled) {
-        return submitRequestToRatis(request);
+        // Check if the request is a read only request
+        if (OmUtils.isReadOnly(request)) {
+          return submitReadRequestToOM(request);
+        } else {
+          return submitRequestToRatis(request);
+        }
       } else {
         return submitRequestDirectlyToOM(request);
       }
@@ -85,6 +98,32 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
     return omRatisClient.sendCommand(request);
   }
 
+  private OMResponse submitReadRequestToOM(OMRequest request)
+      throws ServiceException {
+    // Check if this OM is the leader.
+    if (omRatisServer.isLeader()) {
+      return handler.handle(request);
+    } else {
+      RaftPeerId raftPeerId = omRatisServer.getRaftPeerId();
+      Optional<RaftPeerId> leaderRaftPeerId = omRatisServer
+          .getCachedLeaderPeerId();
+
+      NotLeaderException notLeaderException;
+      if (leaderRaftPeerId.isPresent()) {
+        notLeaderException = new NotLeaderException(raftPeerId.toString());
+      } else {
+        notLeaderException = new NotLeaderException(
+            raftPeerId.toString(), leaderRaftPeerId.toString());
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(notLeaderException.getMessage());
+      }
+
+      throw new ServiceException(notLeaderException);
+    }
+  }
+
   /**
    * Submits request directly to OM.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org