You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:09 UTC

[hadoop] 06/50: HDFS-13399. [SBN read] Make Client field AlignmentContext non-static. Contributed by Plamen Jeliazkov.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a3521c53fe76af32fa14f6cc141717e61038da15
Author: Plamen Jeliazkov <pl...@gmail.com>
AuthorDate: Mon Jun 4 14:58:47 2018 -0700

    HDFS-13399. [SBN read] Make Client field AlignmentContext non-static. Contributed by Plamen Jeliazkov.
---
 .../main/java/org/apache/hadoop/ipc/Client.java    |  49 ++-
 .../org/apache/hadoop/ipc/ProtobufRpcEngine.java   |  15 +-
 .../src/main/java/org/apache/hadoop/ipc/RPC.java   |  39 +-
 .../main/java/org/apache/hadoop/ipc/RpcEngine.java |   3 +-
 .../main/java/org/apache/hadoop/ipc/Server.java    |  29 +-
 .../org/apache/hadoop/ipc/WritableRpcEngine.java   |  14 +-
 .../test/java/org/apache/hadoop/ipc/TestRPC.java   |   5 +-
 .../java/org/apache/hadoop/hdfs/DFSClient.java     |   9 -
 .../apache/hadoop/hdfs/NameNodeProxiesClient.java  |  12 +-
 .../ha/AbstractNNFailoverProxyProvider.java        |   7 +-
 .../server/namenode/ha/ClientHAProxyFactory.java   |  12 +
 .../hadoop/hdfs/TestStateAlignmentContext.java     | 212 ----------
 .../hdfs/TestStateAlignmentContextWithHA.java      | 467 +++++++++++++++++++++
 13 files changed, 620 insertions(+), 253 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 91a698f..efd77a0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -103,12 +103,6 @@ public class Client implements AutoCloseable {
           return false;
         }
       };
-  private static AlignmentContext alignmentContext;
-
-  /** Set alignment context to use to fetch state alignment info from RPC. */
-  public static void setAlignmentContext(AlignmentContext ac) {
-    alignmentContext = ac;
-  }
 
   @SuppressWarnings("unchecked")
   @Unstable
@@ -345,6 +339,7 @@ public class Client implements AutoCloseable {
     final RPC.RpcKind rpcKind;      // Rpc EngineKind
     boolean done;               // true when call is done
     private final Object externalHandler;
+    private AlignmentContext alignmentContext;
 
     private Call(RPC.RpcKind rpcKind, Writable param) {
       this.rpcKind = rpcKind;
@@ -386,6 +381,15 @@ public class Client implements AutoCloseable {
       }
     }
 
+    /**
+     * Set an AlignmentContext for the call to update when call is done.
+     *
+     * @param ac alignment context to update.
+     */
+    public synchronized void setAlignmentContext(AlignmentContext ac) {
+      this.alignmentContext = ac;
+    }
+
     /** Set the exception when there is an error.
      * Notify the caller the call is done.
      * 
@@ -1114,7 +1118,7 @@ public class Client implements AutoCloseable {
       // Items '1' and '2' are prepared here. 
       RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
           call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
-          clientId, alignmentContext);
+          clientId, call.alignmentContext);
 
       final ResponseBuffer buf = new ResponseBuffer();
       header.writeDelimitedTo(buf);
@@ -1191,9 +1195,9 @@ public class Client implements AutoCloseable {
           Writable value = packet.newInstance(valueClass, conf);
           final Call call = calls.remove(callId);
           call.setRpcResponse(value);
-        }
-        if (alignmentContext != null) {
-          alignmentContext.receiveResponseState(header);
+          if (call.alignmentContext != null) {
+            call.alignmentContext.receiveResponseState(header);
+          }
         }
         // verify that packet length was correct
         if (packet.remaining() > 0) {
@@ -1374,7 +1378,15 @@ public class Client implements AutoCloseable {
       ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)
       throws IOException {
     return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
-      fallbackToSimpleAuth);
+      fallbackToSimpleAuth, null);
+  }
+
+  public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+      ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth,
+      AlignmentContext alignmentContext)
+      throws IOException {
+    return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,
+        fallbackToSimpleAuth, alignmentContext);
   }
 
   private void checkAsyncCall() throws IOException {
@@ -1391,6 +1403,14 @@ public class Client implements AutoCloseable {
     }
   }
 
+  Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+                ConnectionId remoteId, int serviceClass,
+                AtomicBoolean fallbackToSimpleAuth)
+      throws IOException {
+    return call(rpcKind, rpcRequest, remoteId, serviceClass,
+        fallbackToSimpleAuth, null);
+  }
+
   /**
    * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
    * <code>remoteId</code>, returning the rpc response.
@@ -1401,14 +1421,17 @@ public class Client implements AutoCloseable {
    * @param serviceClass - service class for RPC
    * @param fallbackToSimpleAuth - set to true or false during this method to
    *   indicate if a secure client falls back to simple auth
-   * @returns the rpc response
+   * @param alignmentContext - state alignment context
+   * @return the rpc response
    * Throws exceptions if there are network problems or if the remote code
    * threw an exception.
    */
   Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
       ConnectionId remoteId, int serviceClass,
-      AtomicBoolean fallbackToSimpleAuth) throws IOException {
+      AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
+      throws IOException {
     final Call call = createCall(rpcKind, rpcRequest);
+    call.setAlignmentContext(alignmentContext);
     final Connection connection = getConnection(remoteId, call, serviceClass,
         fallbackToSimpleAuth);
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 2734a95..5548566 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -86,7 +86,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy
       ) throws IOException {
     return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
-      rpcTimeout, connectionRetryPolicy, null);
+      rpcTimeout, connectionRetryPolicy, null, null);
   }
 
   @Override
@@ -94,10 +94,12 @@ public class ProtobufRpcEngine implements RpcEngine {
   public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
       InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
       SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
-      AtomicBoolean fallbackToSimpleAuth) throws IOException {
+      AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
+      throws IOException {
 
     final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
-        rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
+        rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth,
+        alignmentContext);
     return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
         protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
   }
@@ -122,15 +124,18 @@ public class ProtobufRpcEngine implements RpcEngine {
     private final long clientProtocolVersion;
     private final String protocolName;
     private AtomicBoolean fallbackToSimpleAuth;
+    private AlignmentContext alignmentContext;
 
     private Invoker(Class<?> protocol, InetSocketAddress addr,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
         int rpcTimeout, RetryPolicy connectionRetryPolicy,
-        AtomicBoolean fallbackToSimpleAuth) throws IOException {
+        AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
+        throws IOException {
       this(protocol, Client.ConnectionId.getConnectionId(
           addr, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf),
           conf, factory);
       this.fallbackToSimpleAuth = fallbackToSimpleAuth;
+      this.alignmentContext = alignmentContext;
     }
     
     /**
@@ -227,7 +232,7 @@ public class ProtobufRpcEngine implements RpcEngine {
       try {
         val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
             new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
-            fallbackToSimpleAuth);
+            fallbackToSimpleAuth, alignmentContext);
 
       } catch (Throwable e) {
         if (LOG.isTraceEnabled()) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 36d5400..5440780 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -586,7 +586,44 @@ public class RPC {
     }
     return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
         addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
-        fallbackToSimpleAuth);
+        fallbackToSimpleAuth, null);
+  }
+
+  /**
+   * Get a protocol proxy that contains a proxy connection to a remote server
+   * and a set of methods that are supported by the server
+   *
+   * @param protocol protocol
+   * @param clientVersion client's version
+   * @param addr server address
+   * @param ticket security ticket
+   * @param conf configuration
+   * @param factory socket factory
+   * @param rpcTimeout max time for each rpc; 0 means no timeout
+   * @param connectionRetryPolicy retry policy
+   * @param fallbackToSimpleAuth set to true or false during calls to indicate
+   *   if a secure client falls back to simple auth
+   * @param alignmentContext state alignment context
+   * @return the proxy
+   * @throws IOException if any error occurs
+   */
+  public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
+                                long clientVersion,
+                                InetSocketAddress addr,
+                                UserGroupInformation ticket,
+                                Configuration conf,
+                                SocketFactory factory,
+                                int rpcTimeout,
+                                RetryPolicy connectionRetryPolicy,
+                                AtomicBoolean fallbackToSimpleAuth,
+                                AlignmentContext alignmentContext)
+       throws IOException {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      SaslRpcServer.init(conf);
+    }
+    return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion,
+        addr, ticket, conf, factory, rpcTimeout, connectionRetryPolicy,
+        fallbackToSimpleAuth, alignmentContext);
   }
 
    /**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
index 8a43172..0f5769e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java
@@ -50,7 +50,8 @@ public interface RpcEngine {
                   UserGroupInformation ticket, Configuration conf,
                   SocketFactory factory, int rpcTimeout,
                   RetryPolicy connectionRetryPolicy,
-                  AtomicBoolean fallbackToSimpleAuth) throws IOException;
+                  AtomicBoolean fallbackToSimpleAuth,
+                  AlignmentContext alignmentContext) throws IOException;
 
   /** 
    * Construct a server for a protocol implementation instance.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 6b54352..7aa8001 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -835,10 +835,15 @@ public abstract class Server {
     final Writable rpcRequest;    // Serialized Rpc request from client
     ByteBuffer rpcResponse;       // the response for this call
 
+    private RpcResponseHeaderProto bufferedHeader; // the response header
+    private Writable bufferedRv;                   // the byte response
+
     RpcCall(RpcCall call) {
       super(call);
       this.connection = call.connection;
       this.rpcRequest = call.rpcRequest;
+      this.bufferedRv = call.bufferedRv;
+      this.bufferedHeader = call.bufferedHeader;
     }
 
     RpcCall(Connection connection, int id) {
@@ -859,6 +864,14 @@ public abstract class Server {
       this.rpcRequest = param;
     }
 
+    public void setBufferedHeader(RpcResponseHeaderProto header) {
+      this.bufferedHeader = header;
+    }
+
+    public void setBufferedRv(Writable rv) {
+      this.bufferedRv = rv;
+    }
+
     @Override
     public String getProtocol() {
       return "rpc";
@@ -947,6 +960,13 @@ public abstract class Server {
         setupResponse(call,
             RpcStatusProto.FATAL, RpcErrorCodeProto.ERROR_RPC_SERVER,
             null, t.getClass().getName(), StringUtils.stringifyException(t));
+      } else if (alignmentContext != null) {
+        // rebuild response with state context in header
+        RpcResponseHeaderProto.Builder responseHeader =
+            call.bufferedHeader.toBuilder();
+        alignmentContext.updateResponseState(responseHeader);
+        RpcResponseHeaderProto builtHeader = responseHeader.build();
+        setupResponse(call, builtHeader, call.bufferedRv);
       }
       connection.sendResponse(call);
     }
@@ -2936,9 +2956,6 @@ public abstract class Server {
     headerBuilder.setRetryCount(call.retryCount);
     headerBuilder.setStatus(status);
     headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
-    if(alignmentContext != null) {
-      alignmentContext.updateResponseState(headerBuilder);
-    }
 
     if (status == RpcStatusProto.SUCCESS) {
       RpcResponseHeaderProto header = headerBuilder.build();
@@ -2965,6 +2982,12 @@ public abstract class Server {
 
   private void setupResponse(RpcCall call,
       RpcResponseHeaderProto header, Writable rv) throws IOException {
+    if (alignmentContext != null && call.bufferedHeader == null
+        && call.bufferedRv == null) {
+      call.setBufferedHeader(header);
+      call.setBufferedRv(rv);
+    }
+
     final byte[] response;
     if (rv == null || (rv instanceof RpcWritable.ProtobufWrapper)) {
       response = setupResponseForProtobuf(header, rv);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index 507517b..2e3b559 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -214,16 +214,19 @@ public class WritableRpcEngine implements RpcEngine {
     private Client client;
     private boolean isClosed = false;
     private final AtomicBoolean fallbackToSimpleAuth;
+    private final AlignmentContext alignmentContext;
 
     public Invoker(Class<?> protocol,
                    InetSocketAddress address, UserGroupInformation ticket,
                    Configuration conf, SocketFactory factory,
-                   int rpcTimeout, AtomicBoolean fallbackToSimpleAuth)
+                   int rpcTimeout, AtomicBoolean fallbackToSimpleAuth,
+                   AlignmentContext alignmentContext)
         throws IOException {
       this.remoteId = Client.ConnectionId.getConnectionId(address, protocol,
           ticket, rpcTimeout, null, conf);
       this.client = CLIENTS.getClient(conf, factory);
       this.fallbackToSimpleAuth = fallbackToSimpleAuth;
+      this.alignmentContext = alignmentContext;
     }
 
     @Override
@@ -246,7 +249,7 @@ public class WritableRpcEngine implements RpcEngine {
       try {
         value = (ObjectWritable)
           client.call(RPC.RpcKind.RPC_WRITABLE, new Invocation(method, args),
-            remoteId, fallbackToSimpleAuth);
+            remoteId, fallbackToSimpleAuth, alignmentContext);
       } finally {
         if (traceScope != null) traceScope.close();
       }
@@ -289,7 +292,7 @@ public class WritableRpcEngine implements RpcEngine {
                          int rpcTimeout, RetryPolicy connectionRetryPolicy)
     throws IOException {
     return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
-      rpcTimeout, connectionRetryPolicy, null);
+      rpcTimeout, connectionRetryPolicy, null, null);
   }
 
   /** Construct a client-side proxy object that implements the named protocol,
@@ -301,7 +304,8 @@ public class WritableRpcEngine implements RpcEngine {
                          InetSocketAddress addr, UserGroupInformation ticket,
                          Configuration conf, SocketFactory factory,
                          int rpcTimeout, RetryPolicy connectionRetryPolicy,
-                         AtomicBoolean fallbackToSimpleAuth)
+                         AtomicBoolean fallbackToSimpleAuth,
+                         AlignmentContext alignmentContext)
     throws IOException {    
 
     if (connectionRetryPolicy != null) {
@@ -311,7 +315,7 @@ public class WritableRpcEngine implements RpcEngine {
 
     T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
         new Class[] { protocol }, new Invoker(protocol, addr, ticket, conf,
-            factory, rpcTimeout, fallbackToSimpleAuth));
+            factory, rpcTimeout, fallbackToSimpleAuth, alignmentContext));
     return new ProtocolProxy<T>(protocol, proxy, true);
   }
   
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index 3b93a1e..7b4f690 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -279,7 +279,7 @@ public class TestRPC extends TestRpcBase {
         SocketFactory factory, int rpcTimeout,
         RetryPolicy connectionRetryPolicy) throws IOException {
       return getProxy(protocol, clientVersion, addr, ticket, conf, factory,
-          rpcTimeout, connectionRetryPolicy, null);
+          rpcTimeout, connectionRetryPolicy, null, null);
     }
 
     @SuppressWarnings("unchecked")
@@ -288,7 +288,8 @@ public class TestRPC extends TestRpcBase {
         Class<T> protocol, long clientVersion, InetSocketAddress addr,
         UserGroupInformation ticket, Configuration conf, SocketFactory factory,
         int rpcTimeout, RetryPolicy connectionRetryPolicy,
-        AtomicBoolean fallbackToSimpleAuth) throws IOException {
+        AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext)
+        throws IOException {
       T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
           new Class[] { protocol }, new StoppedInvocationHandler());
       return new ProtocolProxy<T>(protocol, proxy, false);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 0f4d5c1..ce1083d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -166,7 +166,6 @@ import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.LossyRetryInvocationHandler;
-import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RetriableException;
@@ -242,7 +241,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
   private final int smallBufferSize;
   private final long serverDefaultsValidityPeriod;
-  private final ClientGCIContext alignmentContext;
 
   public DfsClientConf getConf() {
     return dfsClientConf;
@@ -398,8 +396,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     this.saslClient = new SaslDataTransferClient(
         conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
         TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
-    this.alignmentContext = new ClientGCIContext();
-    Client.setAlignmentContext(alignmentContext);
   }
 
   /**
@@ -548,11 +544,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return clientRunning;
   }
 
-  @VisibleForTesting
-  ClientGCIContext getAlignmentContext() {
-    return alignmentContext;
-  }
-
   long getLastLeaseRenewal() {
     return lastLeaseRenewal;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
index 897ecc8..65c79df 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -337,6 +338,15 @@ public class NameNodeProxiesClient {
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
       boolean withRetries, AtomicBoolean fallbackToSimpleAuth)
       throws IOException {
+    return createProxyWithAlignmentContext(address, conf, ugi, withRetries,
+        fallbackToSimpleAuth, null);
+  }
+
+  public static ClientProtocol createProxyWithAlignmentContext(
+      InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
+      boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
+      AlignmentContext alignmentContext)
+      throws IOException {
     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
         ProtobufRpcEngine.class);
 
@@ -354,7 +364,7 @@ public class NameNodeProxiesClient {
         ClientNamenodeProtocolPB.class, version, address, ugi, conf,
         NetUtils.getDefaultSocketFactory(conf),
         org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
-        fallbackToSimpleAuth).getProxy();
+        fallbackToSimpleAuth, alignmentContext).getProxy();
 
     if (withRetries) { // create the proxy with retries
       Map<String, RetryPolicy> methodNameToPolicyMap = new HashMap<>();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
index 252b70d..1cf00cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HAUtilClient;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,7 +107,11 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
     return fallbackToSimpleAuth;
   }
 
-  /**
+  public synchronized AlignmentContext getAlignmentContext() {
+    return null; // by default the context is null
+  }
+
+      /**
    * ProxyInfo to a NameNode. Includes its address.
    */
   public static class NNProxyInfo<T> extends ProxyInfo<T> {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java
index b887d87..7b251d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ClientHAProxyFactory.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.NameNodeProxiesClient;
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
@@ -26,11 +27,22 @@ import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class ClientHAProxyFactory<T> implements HAProxyFactory<T> {
+
+  private AlignmentContext alignmentContext;
+
+  public void setAlignmentContext(AlignmentContext alignmentContext) {
+    this.alignmentContext = alignmentContext;
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public T createProxy(Configuration conf, InetSocketAddress nnAddr,
       Class<T> xface, UserGroupInformation ugi, boolean withRetries,
       AtomicBoolean fallbackToSimpleAuth) throws IOException {
+    if (alignmentContext != null) {
+      return (T) NameNodeProxiesClient.createProxyWithAlignmentContext(
+        nnAddr, conf, ugi, false, fallbackToSimpleAuth, alignmentContext);
+    }
     return (T) NameNodeProxiesClient.createNonHAProxyWithClientProtocol(
       nnAddr, conf, ugi, false, fallbackToSimpleAuth);
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
deleted file mode 100644
index ce4639f..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.junit.Assert.assertThat;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.ipc.AlignmentContext;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Class is used to test server sending state alignment information to clients
- * via RPC and likewise clients receiving and updating their last known
- * state alignment info.
- * These tests check that after a single RPC call a client will have caught up
- * to the most recent alignment state of the server.
- */
-public class TestStateAlignmentContext {
-
-  static final long BLOCK_SIZE = 64 * 1024;
-  private static final int NUMDATANODES = 3;
-  private static final Configuration CONF = new HdfsConfiguration();
-
-  private static MiniDFSCluster cluster;
-  private static DistributedFileSystem dfs;
-
-  @BeforeClass
-  public static void startUpCluster() throws IOException {
-    // disable block scanner
-    CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
-    // Set short retry timeouts so this test runs faster
-    CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
-    CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
-    cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
-        .build();
-    cluster.waitActive();
-  }
-
-  @Before
-  public void before() throws IOException {
-    dfs = cluster.getFileSystem();
-  }
-
-  @AfterClass
-  public static void shutDownCluster() throws IOException {
-    if (dfs != null) {
-      dfs.close();
-      dfs = null;
-    }
-    if (cluster != null) {
-      cluster.shutdown();
-      cluster = null;
-    }
-  }
-
-  @After
-  public void after() throws IOException {
-    dfs.close();
-  }
-
-  /**
-   * This test checks if after a client writes we can see the state id in
-   * updated via the response.
-   */
-  @Test
-  public void testStateTransferOnWrite() throws Exception {
-    long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
-    DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
-    long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
-    long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
-    // Write(s) should have increased state. Check for greater than.
-    assertThat(clientState > preWriteState, is(true));
-    // Client and server state should be equal.
-    assertThat(clientState, is(postWriteState));
-  }
-
-  /**
-   * This test checks if after a client reads we can see the state id in
-   * updated via the response.
-   */
-  @Test
-  public void testStateTransferOnRead() throws Exception {
-    DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123");
-    long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
-    DFSTestUtil.readFile(dfs, new Path("/testFile2"));
-    // Read should catch client up to last written state.
-    long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
-    assertThat(clientState, is(lastWrittenId));
-  }
-
-  /**
-   * This test checks that a fresh client starts with no state and becomes
-   * updated of state from RPC call.
-   */
-  @Test
-  public void testStateTransferOnFreshClient() throws Exception {
-    DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
-    long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
-    try (DistributedFileSystem clearDfs =
-             (DistributedFileSystem) FileSystem.get(CONF)) {
-      ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext();
-      assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
-      DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
-      assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
-    }
-  }
-
-  /**
-   * This test mocks an AlignmentContext and ensures that DFSClient
-   * writes its lastSeenStateId into RPC requests.
-   */
-  @Test
-  public void testClientSendsState() throws Exception {
-    AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
-    AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
-    Client.setAlignmentContext(spiedAlignContext);
-
-    // Collect RpcRequestHeaders for verification later.
-    final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> collectedHeaders =
-        new ArrayList<>();
-    Mockito.doAnswer(a -> {
-      Object[] arguments = a.getArguments();
-      RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
-          (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
-      collectedHeaders.add(header);
-      return a.callRealMethod();
-    }).when(spiedAlignContext).updateRequestState(Mockito.any());
-
-    DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
-
-    // Ensure first header and last header have different state.
-    assertThat(collectedHeaders.size() > 1, is(true));
-    assertThat(collectedHeaders.get(0).getStateId(),
-        is(not(collectedHeaders.get(collectedHeaders.size() - 1))));
-
-    // Ensure collected RpcRequestHeaders are in increasing order.
-    long lastHeader = collectedHeaders.get(0).getStateId();
-    for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
-        collectedHeaders.subList(1, collectedHeaders.size())) {
-      long currentHeader = header.getStateId();
-      assertThat(currentHeader >= lastHeader, is(true));
-      lastHeader = header.getStateId();
-    }
-  }
-
-  /**
-   * This test mocks an AlignmentContext to send stateIds greater than
-   * server's stateId in RPC requests.
-   */
-  @Test
-  public void testClientSendsGreaterState() throws Exception {
-    AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
-    AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
-    Client.setAlignmentContext(spiedAlignContext);
-
-    // Make every client call have a stateId > server's stateId.
-    Mockito.doAnswer(a -> {
-      Object[] arguments = a.getArguments();
-      RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
-          (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
-      try {
-        return a.callRealMethod();
-      } finally {
-        header.setStateId(Long.MAX_VALUE);
-      }
-    }).when(spiedAlignContext).updateRequestState(Mockito.any());
-
-    GenericTestUtils.LogCapturer logCapturer =
-        GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
-    DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
-    logCapturer.stopCapturing();
-
-    String output = logCapturer.getOutput();
-    assertThat(output, containsString("A client sent stateId: "));
-  }
-
-}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
new file mode 100644
index 0000000..3437bb0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.junit.Assert.assertThat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
+import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class is used to test server sending state alignment information to clients
+ * via RPC and likewise clients receiving and updating their last known
+ * state alignment info.
+ * These tests check that after a single RPC call a client will have caught up
+ * to the most recent alignment state of the server.
+ */
+public class TestStateAlignmentContextWithHA {
+
+  private static final int NUMDATANODES = 1;
+  private static final int NUMCLIENTS = 10;
+  private static final int NUMFILES = 300;
+  private static final Configuration CONF = new HdfsConfiguration();
+  private static final String NAMESERVICE = "nameservice";
+  private static final List<ClientGCIContext> AC_LIST = new ArrayList<>();
+
+  private static MiniDFSCluster cluster;
+  private static List<Worker> clients;
+  private static ClientGCIContext spy;
+
+  private DistributedFileSystem dfs;
+  private int active = 0;
+  private int standby = 1;
+
+  static class AlignmentContextProxyProvider<T>
+      extends ConfiguredFailoverProxyProvider<T> {
+
+    private ClientGCIContext alignmentContext;
+
+    public AlignmentContextProxyProvider(
+        Configuration conf, URI uri, Class<T> xface,
+        HAProxyFactory<T> factory) throws IOException {
+      super(conf, uri, xface, factory);
+
+      // Create and set AlignmentContext in HAProxyFactory.
+      // All proxies by factory will now have AlignmentContext assigned.
+      this.alignmentContext = (spy != null ? spy : new ClientGCIContext());
+      ((ClientHAProxyFactory) factory).setAlignmentContext(alignmentContext);
+
+      AC_LIST.add(alignmentContext);
+    }
+
+    @Override // AbstractNNFailoverProxyProvider
+    public synchronized ClientGCIContext getAlignmentContext() {
+      return this.alignmentContext;
+    }
+  }
+
+  static class SpyConfiguredContextProxyProvider<T>
+      extends ConfiguredFailoverProxyProvider<T> {
+
+    private ClientGCIContext alignmentContext;
+
+    public SpyConfiguredContextProxyProvider(
+        Configuration conf, URI uri, Class<T> xface,
+        HAProxyFactory<T> factory) throws IOException {
+      super(conf, uri, xface, factory);
+
+      // Create but DON'T set in HAProxyFactory.
+      this.alignmentContext = (spy != null ? spy : new ClientGCIContext());
+
+      AC_LIST.add(alignmentContext);
+    }
+  }
+
+  @BeforeClass
+  public static void startUpCluster() throws IOException {
+    // disable block scanner
+    CONF.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
+    // Set short retry timeouts so this test runs faster
+    CONF.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 10);
+    CONF.setBoolean("fs.hdfs.impl.disable.cache", true);
+
+    MiniDFSNNTopology.NSConf nsConf = new MiniDFSNNTopology.NSConf(NAMESERVICE);
+    nsConf.addNN(new MiniDFSNNTopology.NNConf("nn1"));
+    nsConf.addNN(new MiniDFSNNTopology.NNConf("nn2"));
+
+    cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(NUMDATANODES)
+        .nnTopology(MiniDFSNNTopology.simpleHATopology().addNameservice(nsConf))
+        .build();
+    cluster.waitActive();
+    cluster.transitionToActive(0);
+  }
+
+  @Before
+  public void before() throws IOException, URISyntaxException {
+    killWorkers();
+    HATestUtil.setFailoverConfigurations(cluster, CONF, NAMESERVICE, 0);
+    CONF.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
+        "." + NAMESERVICE, AlignmentContextProxyProvider.class.getName());
+    dfs = (DistributedFileSystem) FileSystem.get(CONF);
+  }
+
+  @AfterClass
+  public static void shutDownCluster() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @After
+  public void after() throws IOException {
+    cluster.transitionToStandby(1);
+    cluster.transitionToActive(0);
+    active = 0;
+    standby = 1;
+    if (dfs != null) {
+      dfs.close();
+      dfs = null;
+    }
+    AC_LIST.clear();
+    spy = null;
+  }
+
+  /**
+   * This test checks if after a client writes we can see the state id in
+   * updated via the response.
+   */
+  @Test
+  public void testNoStateOnConfiguredProxyProvider() throws Exception {
+    Configuration confCopy = new Configuration(CONF);
+    confCopy.set(HdfsClientConfigKeys.Failover.PROXY_PROVIDER_KEY_PREFIX +
+        "." + NAMESERVICE, SpyConfiguredContextProxyProvider.class.getName());
+
+    try (DistributedFileSystem clearDfs =
+             (DistributedFileSystem) FileSystem.get(confCopy)) {
+      ClientGCIContext clientState = getContext(1);
+      assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
+      DFSTestUtil.writeFile(clearDfs, new Path("/testFileNoState"), "no_state");
+      assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
+    }
+  }
+
+  /**
+   * This test checks if after a client writes we can see the state id in
+   * updated via the response.
+   */
+  @Test
+  public void testStateTransferOnWrite() throws Exception {
+    long preWriteState =
+        cluster.getNamesystem(active).getLastWrittenTransactionId();
+    DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
+    long clientState = getContext(0).getLastSeenStateId();
+    long postWriteState =
+        cluster.getNamesystem(active).getLastWrittenTransactionId();
+    // Write(s) should have increased state. Check for greater than.
+    assertThat(clientState > preWriteState, is(true));
+    // Client and server state should be equal.
+    assertThat(clientState, is(postWriteState));
+  }
+
+  /**
+   * This test checks if after a client reads we can see the state id in
+   * updated via the response.
+   */
+  @Test
+  public void testStateTransferOnRead() throws Exception {
+    DFSTestUtil.writeFile(dfs, new Path("/testFile2"), "123");
+    long lastWrittenId =
+        cluster.getNamesystem(active).getLastWrittenTransactionId();
+    DFSTestUtil.readFile(dfs, new Path("/testFile2"));
+    // Read should catch client up to last written state.
+    long clientState = getContext(0).getLastSeenStateId();
+    assertThat(clientState, is(lastWrittenId));
+  }
+
+  /**
+   * This test checks that a fresh client starts with no state and becomes
+   * updated of state from RPC call.
+   */
+  @Test
+  public void testStateTransferOnFreshClient() throws Exception {
+    DFSTestUtil.writeFile(dfs, new Path("/testFile3"), "ezpz");
+    long lastWrittenId =
+        cluster.getNamesystem(active).getLastWrittenTransactionId();
+    try (DistributedFileSystem clearDfs =
+             (DistributedFileSystem) FileSystem.get(CONF)) {
+      ClientGCIContext clientState = getContext(1);
+      assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
+      DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
+      assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
+    }
+  }
+
+  /**
+   * This test mocks an AlignmentContext and ensures that DFSClient
+   * writes its lastSeenStateId into RPC requests.
+   */
+  @Test
+  public void testClientSendsState() throws Exception {
+    ClientGCIContext alignmentContext = new ClientGCIContext();
+    ClientGCIContext spiedAlignContext = Mockito.spy(alignmentContext);
+    spy = spiedAlignContext;
+
+    try (DistributedFileSystem clearDfs =
+             (DistributedFileSystem) FileSystem.get(CONF)) {
+
+      // Collect RpcRequestHeaders for verification later.
+      final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> headers =
+          new ArrayList<>();
+      Mockito.doAnswer(a -> {
+        Object[] arguments = a.getArguments();
+        RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+            (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+        headers.add(header);
+        return a.callRealMethod();
+      }).when(spiedAlignContext).updateRequestState(Mockito.any());
+
+      DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv");
+
+      // Ensure first header and last header have different state.
+      assertThat(headers.size() > 1, is(true));
+      assertThat(headers.get(0).getStateId(),
+          is(not(headers.get(headers.size() - 1))));
+
+      // Ensure collected RpcRequestHeaders are in increasing order.
+      long lastHeader = headers.get(0).getStateId();
+      for (RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
+          headers.subList(1, headers.size())) {
+        long currentHeader = header.getStateId();
+        assertThat(currentHeader >= lastHeader, is(true));
+        lastHeader = header.getStateId();
+      }
+    }
+  }
+
+  /**
+   * This test mocks an AlignmentContext to send stateIds greater than
+   * server's stateId in RPC requests.
+   */
+  @Test
+  public void testClientSendsGreaterState() throws Exception {
+    ClientGCIContext alignmentContext = new ClientGCIContext();
+    ClientGCIContext spiedAlignContext = Mockito.spy(alignmentContext);
+    spy = spiedAlignContext;
+
+    try (DistributedFileSystem clearDfs =
+             (DistributedFileSystem) FileSystem.get(CONF)) {
+
+      // Make every client call have a stateId > server's stateId.
+      Mockito.doAnswer(a -> {
+        Object[] arguments = a.getArguments();
+        RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+            (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+        try {
+          return a.callRealMethod();
+        } finally {
+          header.setStateId(Long.MAX_VALUE);
+        }
+      }).when(spiedAlignContext).updateRequestState(Mockito.any());
+
+      GenericTestUtils.LogCapturer logCapturer =
+          GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
+
+      DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv");
+      logCapturer.stopCapturing();
+
+      String output = logCapturer.getOutput();
+      assertThat(output, containsString("A client sent stateId: "));
+    }
+  }
+
+  /**
+   * This test checks if after a client writes we can see the state id in
+   * updated via the response.
+   */
+  @Test
+  public void testStateTransferOnWriteWithFailover() throws Exception {
+    long preWriteState =
+        cluster.getNamesystem(active).getLastWrittenTransactionId();
+    // Write using HA client.
+    DFSTestUtil.writeFile(dfs, new Path("/testFile1FO"), "123");
+    long clientState = getContext(0).getLastSeenStateId();
+    long postWriteState =
+        cluster.getNamesystem(active).getLastWrittenTransactionId();
+    // Write(s) should have increased state. Check for greater than.
+    assertThat(clientState > preWriteState, is(true));
+    // Client and server state should be equal.
+    assertThat(clientState, is(postWriteState));
+
+    // Failover NameNode.
+    failOver();
+
+    // Write using HA client.
+    DFSTestUtil.writeFile(dfs, new Path("/testFile2FO"), "456");
+    long clientStateFO = getContext(0).getLastSeenStateId();
+    long writeStateFO =
+        cluster.getNamesystem(active).getLastWrittenTransactionId();
+
+    // Write(s) should have increased state. Check for greater than.
+    assertThat(clientStateFO > postWriteState, is(true));
+    // Client and server state should be equal.
+    assertThat(clientStateFO, is(writeStateFO));
+  }
+
+  @Test(timeout=300000)
+  public void testMultiClientStatesWithRandomFailovers() throws Exception {
+    // We want threads to run during failovers; assuming at minimum 4 cores,
+    // would like to see 2 clients competing against 2 NameNodes.
+    ExecutorService execService = Executors.newFixedThreadPool(2);
+    clients = new ArrayList<>(NUMCLIENTS);
+    for (int i = 1; i <= NUMCLIENTS; i++) {
+      DistributedFileSystem haClient =
+          (DistributedFileSystem) FileSystem.get(CONF);
+      clients.add(new Worker(haClient, NUMFILES, "/testFile3FO_", i));
+    }
+
+    // Execute workers in threadpool with random failovers.
+    List<Future<STATE>> futures = submitAll(execService, clients);
+    execService.shutdown();
+
+    boolean finished = false;
+    while (!finished) {
+      failOver();
+      finished = execService.awaitTermination(1L, TimeUnit.SECONDS);
+    }
+
+    // Validation.
+    for (Future<STATE> future : futures) {
+      assertThat(future.get(), is(STATE.SUCCESS));
+    }
+  }
+
+  private ClientGCIContext getContext(int clientCreationIndex) {
+    return AC_LIST.get(clientCreationIndex);
+  }
+
+  private void failOver() throws IOException {
+    cluster.transitionToStandby(active);
+    cluster.transitionToActive(standby);
+    int tempActive = active;
+    active = standby;
+    standby = tempActive;
+  }
+
+  /* Executor.invokeAll() is blocking so utilizing submit instead. */
+  private static List<Future<STATE>> submitAll(ExecutorService executor,
+                                              Collection<Worker> calls) {
+    List<Future<STATE>> futures = new ArrayList<>(calls.size());
+    for (Worker call : calls) {
+      Future<STATE> future = executor.submit(call);
+      futures.add(future);
+    }
+    return futures;
+  }
+
+  private void killWorkers() throws IOException {
+    if (clients != null) {
+      for(Worker worker : clients) {
+        worker.kill();
+      }
+      clients = null;
+    }
+  }
+
+  private enum STATE { SUCCESS, FAIL, ERROR }
+
+  private class Worker implements Callable<STATE> {
+    private final DistributedFileSystem client;
+    private final int filesToMake;
+    private String filePath;
+    private final int nonce;
+
+    Worker(DistributedFileSystem client,
+           int filesToMake,
+           String filePath,
+           int nonce) {
+      this.client = client;
+      this.filesToMake = filesToMake;
+      this.filePath = filePath;
+      this.nonce = nonce;
+    }
+
+    @Override
+    public STATE call() {
+      try {
+        for (int i = 0; i < filesToMake; i++) {
+          long preClientStateFO =
+              getContext(nonce).getLastSeenStateId();
+
+          // Write using HA client.
+          Path path = new Path(filePath + nonce + i);
+          DFSTestUtil.writeFile(client, path, "erk");
+
+          long postClientStateFO =
+              getContext(nonce).getLastSeenStateId();
+
+          // Write(s) should have increased state. Check for greater than.
+          if (postClientStateFO <= preClientStateFO) {
+            System.out.println("FAIL: Worker started with: " +
+                preClientStateFO + ", but finished with: " + postClientStateFO);
+            return STATE.FAIL;
+          }
+        }
+        client.close();
+        return STATE.SUCCESS;
+      } catch (IOException e) {
+        System.out.println("ERROR: Worker failed with: " + e);
+        return STATE.ERROR;
+      }
+    }
+
+    public void kill() throws IOException {
+      client.dfs.closeAllFilesBeingWritten(true);
+      client.dfs.closeOutputStreams(true);
+      client.dfs.closeConnectionToNamenode();
+      client.dfs.close();
+      client.close();
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org