You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:40 UTC

[hadoop] 37/50: HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit c1c061d767381023eed829ddbb0af4d32db2493b
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Thu Dec 13 14:31:41 2018 -0800

    HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko.
---
 .../org/apache/hadoop/ipc/AlignmentContext.java    | 12 ++++-
 .../main/java/org/apache/hadoop/ipc/Server.java    | 27 ++++++-----
 .../org/apache/hadoop/hdfs/ClientGSIContext.java   |  7 ++-
 .../hdfs/server/namenode/GlobalStateIdContext.java | 52 +++++++++++++++++++---
 .../hadoop/hdfs/server/namenode/ha/HATestUtil.java | 20 +++++++++
 .../server/namenode/ha/TestMultiObserverNode.java  | 14 ++++++
 6 files changed, 111 insertions(+), 21 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
index a435ff6..bcddfbf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.ipc;
 
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
@@ -64,9 +66,15 @@ public interface AlignmentContext {
    * client state info during RPC response header processing.
    *
    * @param header The RPC request header.
-   * @return state id of in the request header.
+   * @param threshold a parameter to verify a condition when server
+   *        should reject client request due to its state being too far
+   *        misaligned with the client state.
+   *        See implementation for more details.
+   * @return state id required for the server to execute the call.
+   * @throws IOException
    */
-  long receiveRequestState(RpcRequestHeaderProto header);
+  long receiveRequestState(RpcRequestHeaderProto header, long threshold)
+      throws IOException;
 
   /**
    * Returns the last seen state id of the alignment context instance.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 20fee61..4f11541 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -2536,6 +2536,7 @@ public abstract class Server {
 
       // Save the priority level assignment by the scheduler
       call.setPriorityLevel(callQueue.getPriorityLevel(call));
+      call.markCallCoordinated(false);
       if(alignmentContext != null && call.rpcRequest != null &&
           (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) {
         // if call.rpcRequest is not RpcProtobufRequest, will skip the following
@@ -2544,23 +2545,21 @@ public abstract class Server {
         // coordinated.
         String methodName;
         String protoName;
+        ProtobufRpcEngine.RpcProtobufRequest req =
+            (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
         try {
-          ProtobufRpcEngine.RpcProtobufRequest req =
-              (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
           methodName = req.getRequestHeader().getMethodName();
           protoName = req.getRequestHeader().getDeclaringClassProtocolName();
+          if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
+            call.markCallCoordinated(true);
+            long stateId;
+            stateId = alignmentContext.receiveRequestState(
+                header, getMaxIdleTime());
+            call.setClientStateId(stateId);
+          }
         } catch (IOException ioe) {
-          throw new RpcServerException("Rpc request header check fail", ioe);
-        }
-        if (!alignmentContext.isCoordinatedCall(protoName, methodName)) {
-          call.markCallCoordinated(false);
-        } else {
-          call.markCallCoordinated(true);
-          long stateId = alignmentContext.receiveRequestState(header);
-          call.setClientStateId(stateId);
+          throw new RpcServerException("Processing RPC request caught ", ioe);
         }
-      } else {
-        call.markCallCoordinated(false);
       }
 
       try {
@@ -3613,6 +3612,10 @@ public abstract class Server {
     }
   }
 
+  protected int getMaxIdleTime() {
+    return connectionManager.maxIdleTime;
+  }
+
   public String getServerName() {
     return serverName;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
index 6d366a6..a7bdd14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.LongAccumulator;
 
 /**
@@ -60,7 +61,8 @@ public class ClientGSIContext implements AlignmentContext {
   }
 
   /**
-   * Client side implementation for receiving state alignment info in responses.
+   * Client side implementation for receiving state alignment info
+   * in responses.
    */
   @Override
   public void receiveResponseState(RpcResponseHeaderProto header) {
@@ -80,7 +82,8 @@ public class ClientGSIContext implements AlignmentContext {
    * Client does not receive RPC requests therefore this does nothing.
    */
   @Override
-  public long receiveRequestState(RpcRequestHeaderProto header) {
+  public long receiveRequestState(RpcRequestHeaderProto header, long threshold)
+      throws IOException {
     // Do nothing.
     return 0;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
index ecb9fd3..2e48654 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
@@ -20,12 +20,15 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import java.lang.reflect.Method;
 import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
 import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
@@ -36,8 +39,23 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 @InterfaceAudience.Private
 @InterfaceStability.Stable
 class GlobalStateIdContext implements AlignmentContext {
-  private final FSNamesystem namesystem;
+  /**
+   * Estimated number of journal transactions a typical NameNode can execute
+   * per second. The number is used to estimate how long a client's
+   * RPC request will wait in the call queue before the Observer catches up
+   * with its state id.
+   */
+  private static final long ESTIMATED_TRANSACTIONS_PER_SECOND = 10000L;
 
+  /**
+   * The client wait time on an RPC request is composed of
+   * the server execution time plus the communication time.
+   * This is an expected fraction of the total wait time spent on
+   * server execution.
+   */
+  private static final float ESTIMATED_SERVER_TIME_MULTIPLIER = 0.8f;
+
+  private final FSNamesystem namesystem;
   private final HashSet<String> coordinatedMethods;
 
   /**
@@ -88,17 +106,41 @@ class GlobalStateIdContext implements AlignmentContext {
   }
 
   /**
-   * Server side implementation for processing state alignment info in requests.
+   * Server-side implementation for processing state alignment info in
+   * requests.
+   * For Observer it compares the client and the server states and determines
+   * if it makes sense to wait until the server catches up with the client
+   * state. If not the server throws RetriableException so that the client
+   * could retry the call according to the retry policy with another Observer
+   * or the Active NameNode.
+   *
+   * @param header The RPC request header.
+   * @param clientWaitTime time in milliseconds indicating how long client
+   *    waits for the server response. It is used to verify if the client's
+   *    state is too far ahead of the server's
+   * @return the minimum of the state ids of the client or the server.
+   * @throws RetriableException if Observer is too far behind.
    */
   @Override
-  public long receiveRequestState(RpcRequestHeaderProto header) {
+  public long receiveRequestState(RpcRequestHeaderProto header,
+      long clientWaitTime) throws RetriableException {
     long serverStateId =
         namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
     long clientStateId = header.getStateId();
     if (clientStateId > serverStateId &&
-        HAServiceProtocol.HAServiceState.ACTIVE.equals(namesystem.getState())) {
+        HAServiceState.ACTIVE.equals(namesystem.getState())) {
       FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
           ", but server state is: " + serverStateId);
+      return serverStateId;
+    }
+    if (HAServiceState.OBSERVER.equals(namesystem.getState()) &&
+        clientStateId - serverStateId >
+        ESTIMATED_TRANSACTIONS_PER_SECOND
+            * TimeUnit.MILLISECONDS.toSeconds(clientWaitTime)
+            * ESTIMATED_SERVER_TIME_MULTIPLIER) {
+      throw new RetriableException(
+          "Observer Node is too far behind: serverStateId = "
+              + serverStateId + " clientStateId = " + clientStateId);
     }
     return clientStateId;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index d1095ad..9e83fc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -25,6 +25,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSUtil.createUri;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -34,6 +35,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.LongAccumulator;
 
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
@@ -43,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.ClientGSIContext;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -334,4 +337,21 @@ public abstract class HATestUtil {
       }
     }
   }
+
+  /**
+   * Customize stateId of the client AlignmentContext for testing.
+   */
+  public static long setACStateId(DistributedFileSystem dfs,
+      long stateId) throws Exception {
+    ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
+        ((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
+            dfs.getClient().getNamenode())).getProxyProvider();
+    ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext());
+    Field f = ac.getClass().getDeclaredField("lastSeenStateId");
+    f.setAccessible(true);
+    LongAccumulator lastSeenStateId = (LongAccumulator)f.get(ac);
+    long currentStateId = lastSeenStateId.getThenReset();
+    lastSeenStateId.accumulate(stateId);
+    return currentStateId;
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java
index 4aa3133..a8e1245 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -138,6 +139,19 @@ public class TestMultiObserverNode {
     dfsCluster.transitionToObserver(3);
   }
 
+  @Test
+  public void testObserverFallBehind() throws Exception {
+    dfs.mkdir(testPath, FsPermission.getDefault());
+    assertSentTo(0);
+
+    // Set large state Id on the client
+    long realStateId = HATestUtil.setACStateId(dfs, 500000);
+    dfs.getFileStatus(testPath);
+    // Should end up on ANN
+    assertSentTo(0);
+    HATestUtil.setACStateId(dfs, realStateId);
+  }
+
   private void assertSentTo(int... nnIndices) throws IOException {
     assertTrue("Request was not sent to any of the expected namenodes.",
         HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIndices));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org