You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/07/25 17:17:39 UTC

[hadoop] 30/50: HDFS-14035. NN status discovery does not leverage delegation token. Contributed by Chen Liang.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 683daedc1f980cf2cec93a3be4c8ab50a76126fa
Author: Chen Liang <cl...@apache.org>
AuthorDate: Wed Nov 14 13:32:13 2018 -0800

    HDFS-14035. NN status discovery does not leverage delegation token. Contributed by Chen Liang.
---
 .../java/org/apache/hadoop/hdfs/DFSClient.java     | 13 +++++
 .../apache/hadoop/hdfs/NameNodeProxiesClient.java  | 31 ------------
 .../hadoop/hdfs/protocol/ClientProtocol.java       | 13 ++++-
 .../ClientNamenodeProtocolTranslatorPB.java        | 27 ++++++++++
 .../ha/AbstractNNFailoverProxyProvider.java        | 33 ++-----------
 .../namenode/ha/IPFailoverProxyProvider.java       |  2 +-
 .../namenode/ha/ObserverReadProxyProvider.java     |  9 +++-
 .../src/main/proto/ClientNamenodeProtocol.proto    | 10 ++++
 .../apache/hadoop/hdfs/protocol/TestReadOnly.java  |  3 +-
 .../server/federation/router/RouterRpcServer.java  |  9 +++-
 ...ientNamenodeProtocolServerSideTranslatorPB.java | 36 ++++++++++++++
 .../hdfs/server/namenode/NameNodeRpcServer.java    |  6 +++
 .../namenode/ha/TestConsistentReadsObserver.java   |  3 ++
 .../namenode/ha/TestObserverReadProxyProvider.java | 57 +++++++++-------------
 14 files changed, 152 insertions(+), 100 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 4f708a5..d9d6f42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -91,6 +91,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsCreateModes;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
@@ -3056,4 +3057,16 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   public void msync() throws IOException {
     namenode.msync();
   }
+
+  /**
+   * An unblocking call to get the HA service state of NameNode.
+   *
+   * @return HA state of NameNode
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public HAServiceProtocol.HAServiceState getHAServiceState()
+      throws IOException {
+    return namenode.getHAServiceState();
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
index b71e84d..93227bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
@@ -25,16 +25,13 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol;
-import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
 import org.apache.hadoop.ipc.AlignmentContext;
-import org.apache.hadoop.ipc.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -353,34 +350,6 @@ public class NameNodeProxiesClient {
         fallbackToSimpleAuth, null);
   }
 
-  /**
-   * Creates a non-HA proxy object with {@link HAServiceProtocol} to the
-   * given NameNode address, using the provided configuration. The proxy will
-   * use the RPC timeout configuration specified via {@link
-   * org.apache.hadoop.fs.CommonConfigurationKeys#IPC_CLIENT_RPC_TIMEOUT_KEY}.
-   * Upon failures, this will retry up to certain times with {@link RetryProxy}.
-   *
-   * @param address the NameNode address
-   * @param conf the configuration to be used
-   * @return a non-HA proxy with {@link HAServiceProtocol}.
-   */
-  public static HAServiceProtocol createNonHAProxyWithHAServiceProtocol(
-      InetSocketAddress address, Configuration conf) throws IOException {
-    RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
-        MAX_RETRIES, DELAY_MILLISECONDS, TimeUnit.MILLISECONDS);
-
-    HAServiceProtocol proxy =
-        new HAServiceProtocolClientSideTranslatorPB(
-            address, conf, NetUtils.getDefaultSocketFactory(conf),
-            Client.getRpcTimeout(conf));
-    return (HAServiceProtocol) RetryProxy.create(
-        HAServiceProtocol.class,
-        new DefaultFailoverProxyProvider<>(HAServiceProtocol.class, proxy),
-        new HashMap<>(),
-        timeoutPolicy
-    );
-  }
-
   public static ClientProtocol createProxyWithAlignmentContext(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
       boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index bb7092c..9c4f39e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.crypto.CryptoProtocolVersion;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
@@ -1581,7 +1582,7 @@ public interface ClientProtocol {
    * @throws IOException see specific implementation
    */
   @Idempotent
-  @ReadOnly // TODO : after HDFS-13749 is done, change to coordinated call
+  @ReadOnly(isCoordinated = true)
   void checkAccess(String path, FsAction mode) throws IOException;
 
   /**
@@ -1736,6 +1737,16 @@ public interface ClientProtocol {
       EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException;
 
   /**
+   * Get HA service state of the server.
+   *
+   * @return server HA state
+   * @throws IOException
+   */
+  @Idempotent
+  @ReadOnly
+  HAServiceProtocol.HAServiceState getHAServiceState() throws IOException;
+
+  /**
    * Called by client to wait until the server has reached the state id of the
    * client. The client and server state id are given by client side and server
    * side alignment context respectively. This can be a blocking call.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 36e8149..441e039 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.hdfs.inotify.EventBatchList;
 import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
@@ -140,6 +142,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSna
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesResponseProto;
@@ -1890,4 +1893,28 @@ public class ClientNamenodeProtocolTranslatorPB implements
       throw ProtobufHelper.getRemoteException(e);
     }
   }
+
+  @Override
+  public HAServiceProtocol.HAServiceState getHAServiceState()
+      throws IOException {
+    HAServiceStateRequestProto req =
+        HAServiceStateRequestProto.newBuilder().build();
+    try {
+      HAServiceStateProto res =
+          rpcProxy.getHAServiceState(null, req).getState();
+      switch(res) {
+      case ACTIVE:
+        return HAServiceProtocol.HAServiceState.ACTIVE;
+      case STANDBY:
+        return HAServiceProtocol.HAServiceState.STANDBY;
+      case OBSERVER:
+        return HAServiceProtocol.HAServiceState.OBSERVER;
+      case INITIALIZING:
+      default:
+        return HAServiceProtocol.HAServiceState.INITIALIZING;
+      }
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
index 1b5ad16..572cb1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
@@ -28,14 +28,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HAUtilClient;
-import org.apache.hadoop.hdfs.NameNodeProxiesClient;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -122,44 +119,22 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
      */
     private HAServiceState cachedState;
 
-    /** Proxy for getting HA service status from the given NameNode. */
-    private HAServiceProtocol serviceProxy;
-
-    public NNProxyInfo(InetSocketAddress address, Configuration conf) {
+    public NNProxyInfo(InetSocketAddress address) {
       super(null, address.toString());
       this.address = address;
-      try {
-        serviceProxy = NameNodeProxiesClient
-            .createNonHAProxyWithHAServiceProtocol(address, conf);
-      } catch (IOException ioe) {
-        LOG.error("Failed to create HAServiceProtocol proxy to NameNode" +
-            " at {}", address, ioe);
-        throw new RuntimeException(ioe);
-      }
     }
 
     public InetSocketAddress getAddress() {
       return address;
     }
 
-    public void refreshCachedState() {
-      try {
-        cachedState = serviceProxy.getServiceStatus().getState();
-      } catch (IOException e) {
-        LOG.warn("Failed to connect to {}. Setting cached state to Standby",
-            address, e);
-        cachedState = HAServiceState.STANDBY;
-      }
+    public void setCachedState(HAServiceState state) {
+      cachedState = state;
     }
 
     public HAServiceState getCachedState() {
       return cachedState;
     }
-
-    @VisibleForTesting
-    public void setServiceProxyForTesting(HAServiceProtocol proxy) {
-      this.serviceProxy = proxy;
-    }
   }
 
   @Override
@@ -202,7 +177,7 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
 
     Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
     for (InetSocketAddress address : addressesOfNns) {
-      proxies.add(new NNProxyInfo<T>(address, conf));
+      proxies.add(new NNProxyInfo<T>(address));
     }
     // Randomize the list to prevent all clients pointing to the same one
     boolean randomized = getRandomOrder(conf, uri);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
index 8062e79..3eb181d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
@@ -48,7 +48,7 @@ public class IPFailoverProxyProvider<T> extends
   public IPFailoverProxyProvider(Configuration conf, URI uri,
       Class<T> xface, HAProxyFactory<T> factory) {
     super(conf, uri, xface, factory);
-    this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri), conf);
+    this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri));
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index 17bad65..1e85a8e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -211,7 +211,14 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     currentProxy = null;
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    currentProxy.refreshCachedState();
+    try {
+      HAServiceState state = currentProxy.proxy.getHAServiceState();
+      currentProxy.setCachedState(state);
+    } catch (IOException e) {
+      LOG.info("Failed to connect to {}. Setting cached state to Standby",
+          currentProxy.getAddress(), e);
+      currentProxy.setCachedState(HAServiceState.STANDBY);
+    }
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
index a5aa082..2bbca61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientNamenodeProtocol.proto
@@ -35,6 +35,7 @@ import "xattr.proto";
 import "encryption.proto";
 import "inotify.proto";
 import "erasurecoding.proto";
+import "HAServiceProtocol.proto";
 
 /**
  * The ClientNamenodeProtocol Service defines the interface between a client 
@@ -808,6 +809,13 @@ message MsyncRequestProto {
 message MsyncResponseProto {
 }
 
+message HAServiceStateRequestProto {
+}
+
+message HAServiceStateResponseProto {
+  required hadoop.common.HAServiceStateProto state = 1;
+}
+
 service ClientNamenodeProtocol {
   rpc getBlockLocations(GetBlockLocationsRequestProto)
       returns(GetBlockLocationsResponseProto);
@@ -990,4 +998,6 @@ service ClientNamenodeProtocol {
       returns(ListOpenFilesResponseProto);
   rpc msync(MsyncRequestProto)
       returns(MsyncResponseProto);
+  rpc getHAServiceState(HAServiceStateRequestProto)
+      returns(HAServiceStateResponseProto);
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
index 57db8ac..e0432f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/protocol/TestReadOnly.java
@@ -72,7 +72,8 @@ public class TestReadOnly {
           "getCurrentEditLogTxid",
           "getEditsFromTxid",
           "getQuotaUsage",
-          "msync"
+          "msync",
+          "getHAServiceState"
       )
   );
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 47dfc46..06edcb4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -2064,7 +2065,13 @@ public class RouterRpcServer extends AbstractService
         OpenFilesIterator.FILTER_PATH_DEFAULT);
   }
 
-  @Override
+  @Override // ClientProtocol
+  public HAServiceProtocol.HAServiceState getHAServiceState()
+      throws IOException {
+    return null;
+  }
+
+  @Override // ClientProtocol
   public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
       EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
     checkOperation(OperationCategory.READ, false);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
index d5f7b9b..3584587 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
 import org.apache.hadoop.fs.permission.FsCreateModes;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
+import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
 import org.apache.hadoop.hdfs.AddBlockFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -150,6 +153,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetSto
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePoliciesResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetStoragePolicyResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.HAServiceStateResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.IsFileClosedResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ListCacheDirectivesRequestProto;
@@ -1838,4 +1843,35 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public HAServiceStateResponseProto getHAServiceState(
+      RpcController controller,
+      HAServiceStateRequestProto request) throws ServiceException {
+    try {
+      HAServiceProtocol.HAServiceState state = server.getHAServiceState();
+      HAServiceStateProto retState;
+      switch (state) {
+      case ACTIVE:
+        retState = HAServiceProtocolProtos.HAServiceStateProto.ACTIVE;
+        break;
+      case STANDBY:
+        retState = HAServiceProtocolProtos.HAServiceStateProto.STANDBY;
+        break;
+      case OBSERVER:
+        retState = HAServiceProtocolProtos.HAServiceStateProto.OBSERVER;
+        break;
+      case INITIALIZING:
+      default:
+        retState = HAServiceProtocolProtos.HAServiceStateProto.INITIALIZING;
+        break;
+      }
+      HAServiceStateResponseProto.Builder builder =
+          HAServiceStateResponseProto.newBuilder();
+      builder.setState(retState);
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 5d8e12b..644a480 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1346,6 +1346,12 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // ClientProtocol
+  public HAServiceState getHAServiceState() throws IOException {
+    checkNNStartup();
+    return nn.getServiceStatus().getState();
+  }
+
+  @Override // ClientProtocol
   public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
       throws IOException {
     checkNNStartup();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
index 3048842..e1fadaf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
@@ -89,6 +89,9 @@ public class TestConsistentReadsObserver {
     // 0 == not completed, 1 == succeeded, -1 == failed
     AtomicInteger readStatus = new AtomicInteger(0);
 
+    // Making an uncoordinated call, which initialize the proxy
+    // to Observer node.
+    dfs.getClient().getHAServiceState();
     dfs.mkdir(testPath, FsPermission.getDefault());
     assertSentTo(0);
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
index dfd8488..caf7d00 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
@@ -27,8 +27,6 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.ha.HAServiceProtocol;
-import org.apache.hadoop.ha.HAServiceStatus;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -47,7 +45,6 @@ import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link ObserverReadProxyProvider} under various configurations of
@@ -79,8 +76,6 @@ public class TestObserverReadProxyProvider {
     namenodeAnswers = new NameNodeAnswer[namenodeCount];
     ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
     Map<String, ClientProtocol> proxyMap = new HashMap<>();
-    HAServiceProtocol[] serviceProxies = new HAServiceProtocol[namenodeCount];
-    Map<String, HAServiceProtocol> serviceProxyMap = new HashMap<>();
     for (int i  = 0; i < namenodeCount; i++) {
       namenodeIDs[i] = "nn" + i;
       namenodeAddrs[i] = "namenode" + i + ".test:8020";
@@ -92,11 +87,9 @@ public class TestObserverReadProxyProvider {
           .when(proxies[i]));
       doRead(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
           .when(proxies[i]));
-      serviceProxies[i] = mock(HAServiceProtocol.class);
-      Mockito.doAnswer(namenodeAnswers[i].serviceAnswer)
-          .when(serviceProxies[i]).getServiceStatus();
+      Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
+          .when(proxies[i]).getHAServiceState();
       proxyMap.put(namenodeAddrs[i], proxies[i]);
-      serviceProxyMap.put(namenodeAddrs[i], serviceProxies[i]);
     }
     conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
         Joiner.on(",").join(namenodeIDs));
@@ -116,10 +109,6 @@ public class TestObserverReadProxyProvider {
           URI uri, String addressKey) {
         List<NNProxyInfo<ClientProtocol>> nnProxies =
             super.getProxyAddresses(uri, addressKey);
-        for (NNProxyInfo<ClientProtocol> nnProxy : nnProxies) {
-          String addressStr = nnProxy.getAddress().toString();
-          nnProxy.setServiceProxyForTesting(serviceProxyMap.get(addressStr));
-        }
         return nnProxies;
       }
     };
@@ -322,8 +311,8 @@ public class TestObserverReadProxyProvider {
   }
 
   /**
-   * An {@link Answer} used for mocking of {@link ClientProtocol} and
-   * {@link HAServiceProtocol}. Setting the state or unreachability of this
+   * An {@link Answer} used for mocking of {@link ClientProtocol}.
+   * Setting the state or unreachability of this
    * Answer will make the linked ClientProtocol respond as if it was
    * communicating with a NameNode of the corresponding state. It is in Standby
    * state by default.
@@ -338,31 +327,29 @@ public class TestObserverReadProxyProvider {
     private volatile boolean allowReads = false;
 
     private ClientProtocolAnswer clientAnswer = new ClientProtocolAnswer();
-    private HAServiceProtocolAnswer serviceAnswer =
-        new HAServiceProtocolAnswer();
-
-    private class HAServiceProtocolAnswer implements Answer<HAServiceStatus> {
-      @Override
-      public HAServiceStatus answer(InvocationOnMock invocation)
-          throws Throwable {
-        HAServiceStatus status = mock(HAServiceStatus.class);
-        if (allowReads && allowWrites) {
-          when(status.getState()).thenReturn(HAServiceState.ACTIVE);
-        } else if (allowReads) {
-          when(status.getState()).thenReturn(HAServiceState.OBSERVER);
-        } else {
-          when(status.getState()).thenReturn(HAServiceState.STANDBY);
-        }
-        return status;
-      }
-    }
 
-    private class ClientProtocolAnswer implements Answer<Void> {
+    private class ClientProtocolAnswer implements Answer<Object> {
       @Override
-      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+      public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
         if (unreachable) {
           throw new IOException("Unavailable");
         }
+        // retryActive should be checked before getHAServiceState.
+        // Check getHAServiceState first here only because in test,
+        // it relies read call, which relies on getHAServiceState
+        // to have passed already. May revisit future.
+        if (invocationOnMock.getMethod()
+            .getName().equals("getHAServiceState")) {
+          HAServiceState status;
+          if (allowReads && allowWrites) {
+            status = HAServiceState.ACTIVE;
+          } else if (allowReads) {
+            status = HAServiceState.OBSERVER;
+          } else {
+            status = HAServiceState.STANDBY;
+          }
+          return status;
+        }
         if (retryActive) {
           throw new RemoteException(
               ObserverRetryOnActiveException.class.getCanonicalName(),


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org