You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/09/20 20:30:58 UTC

[50/50] [abbrv] hadoop git commit: HDFS-13749. [SBN read] Use getServiceStatus to discover observer namenodes. Contributed by Chao Sun.

HDFS-13749. [SBN read] Use getServiceStatus to discover observer namenodes. Contributed by Chao Sun.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77e106f7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77e106f7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77e106f7

Branch: refs/heads/HDFS-12943
Commit: 77e106f74715a7bb79effe38c245ec7c1566be72
Parents: a3810f7
Author: Erik Krogen <xk...@apache.org>
Authored: Thu Sep 20 13:27:58 2018 -0700
Committer: Erik Krogen <xk...@apache.org>
Committed: Thu Sep 20 13:27:58 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/NameNodeProxiesClient.java      |  47 ++++++++-
 .../ha/AbstractNNFailoverProxyProvider.java     |  36 +++++--
 .../namenode/ha/IPFailoverProxyProvider.java    |   2 +-
 .../namenode/ha/ObserverReadProxyProvider.java  |  49 +--------
 .../ha/TestObserverReadProxyProvider.java       | 105 ++++++++++++++-----
 5 files changed, 151 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/77e106f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
index 284e4ef..f90d671 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
@@ -25,12 +25,16 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
 import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,13 +66,14 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 
 /**
- * Create proxy objects with {@link ClientProtocol} to communicate with a remote
- * NN. Generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol(
+ * Create proxy objects with {@link ClientProtocol} and
+ * {@link HAServiceProtocol} to communicate with a remote NN. For the former,
+ * generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol(
  * Configuration, URI, AtomicBoolean)}, which will create either an HA- or
  * non-HA-enabled client proxy as appropriate.
  *
- * For creating proxy objects with other protocols, please see
- * {@link NameNodeProxies#createProxy(Configuration, URI, Class)}.
+ * For creating proxy objects with other protocols, please see the server-side
+ * counterpart {@code NameNodeProxies#createProxy}
  */
 @InterfaceAudience.Private
 public class NameNodeProxiesClient {
@@ -76,6 +81,11 @@ public class NameNodeProxiesClient {
   private static final Logger LOG = LoggerFactory.getLogger(
       NameNodeProxiesClient.class);
 
+  /** Maximum # of retries for HAProxy with HAServiceProtocol. */
+  private static final int MAX_RETRIES = 3;
+  /** Initial retry delay for HAProxy with HAServiceProtocol. */
+  private static final int DELAY_MILLISECONDS = 200;
+
   /**
    * Wrapper for a client proxy as well as its associated service ID.
    * This is simply used as a tuple-like return type for created NN proxy.
@@ -119,7 +129,6 @@ public class NameNodeProxiesClient {
    * @return an object containing both the proxy and the associated
    *         delegation token service it corresponds to
    * @throws IOException if there is an error creating the proxy
-   * @see {@link NameNodeProxies#createProxy(Configuration, URI, Class)}.
    */
   public static ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol(
       Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth)
@@ -343,6 +352,34 @@ public class NameNodeProxiesClient {
         fallbackToSimpleAuth, null);
   }
 
+  /**
+   * Creates a non-HA proxy object with {@link HAServiceProtocol} to the
+   * given NameNode address, using the provided configuration. The proxy will
+   * use the RPC timeout configuration specified via {@link
+   * org.apache.hadoop.fs.CommonConfigurationKeys#IPC_CLIENT_RPC_TIMEOUT_KEY}.
+   * Upon failures, this will retry up to certain times with {@link RetryProxy}.
+   *
+   * @param address the NameNode address
+   * @param conf the configuration to be used
+   * @return a non-HA proxy with {@link HAServiceProtocol}.
+   */
+  public static HAServiceProtocol createNonHAProxyWithHAServiceProtocol(
+      InetSocketAddress address, Configuration conf) throws IOException {
+    RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
+        MAX_RETRIES, DELAY_MILLISECONDS, TimeUnit.MILLISECONDS);
+
+    HAServiceProtocol proxy =
+        new HAServiceProtocolClientSideTranslatorPB(
+            address, conf, NetUtils.getDefaultSocketFactory(conf),
+            Client.getRpcTimeout(conf));
+    return (HAServiceProtocol) RetryProxy.create(
+        HAServiceProtocol.class,
+        new DefaultFailoverProxyProvider<>(HAServiceProtocol.class, proxy),
+        new HashMap<>(),
+        timeoutPolicy
+    );
+  }
+
   public static ClientProtocol createProxyWithAlignmentContext(
       InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
       boolean withRetries, AtomicBoolean fallbackToSimpleAuth,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77e106f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
index 32edb36..1b5ad16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
@@ -28,11 +28,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HAUtilClient;
+import org.apache.hadoop.hdfs.NameNodeProxiesClient;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -119,23 +122,44 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
      */
     private HAServiceState cachedState;
 
-    public NNProxyInfo(InetSocketAddress address) {
+    /** Proxy for getting HA service status from the given NameNode. */
+    private HAServiceProtocol serviceProxy;
+
+    public NNProxyInfo(InetSocketAddress address, Configuration conf) {
       super(null, address.toString());
       this.address = address;
+      try {
+        serviceProxy = NameNodeProxiesClient
+            .createNonHAProxyWithHAServiceProtocol(address, conf);
+      } catch (IOException ioe) {
+        LOG.error("Failed to create HAServiceProtocol proxy to NameNode" +
+            " at {}", address, ioe);
+        throw new RuntimeException(ioe);
+      }
     }
 
     public InetSocketAddress getAddress() {
       return address;
     }
 
-    public void setCachedState(HAServiceState state) {
-      cachedState = state;
+    public void refreshCachedState() {
+      try {
+        cachedState = serviceProxy.getServiceStatus().getState();
+      } catch (IOException e) {
+        LOG.warn("Failed to connect to {}. Setting cached state to Standby",
+            address, e);
+        cachedState = HAServiceState.STANDBY;
+      }
     }
 
     public HAServiceState getCachedState() {
       return cachedState;
     }
 
+    @VisibleForTesting
+    public void setServiceProxyForTesting(HAServiceProtocol proxy) {
+      this.serviceProxy = proxy;
+    }
   }
 
   @Override
@@ -153,8 +177,8 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
         pi.proxy = factory.createProxy(conf,
             pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth());
       } catch (IOException ioe) {
-        LOG.error("{} Failed to create RPC proxy to NameNode",
-            this.getClass().getSimpleName(), ioe);
+        LOG.error("{} Failed to create RPC proxy to NameNode at {}",
+            this.getClass().getSimpleName(), pi.address, ioe);
         throw new RuntimeException(ioe);
       }
     }
@@ -178,7 +202,7 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
 
     Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
     for (InetSocketAddress address : addressesOfNns) {
-      proxies.add(new NNProxyInfo<T>(address));
+      proxies.add(new NNProxyInfo<T>(address, conf));
     }
     // Randomize the list to prevent all clients pointing to the same one
     boolean randomized = getRandomOrder(conf, uri);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77e106f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
index e703740..8062e79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
@@ -48,7 +48,7 @@ public class IPFailoverProxyProvider<T> extends
   public IPFailoverProxyProvider(Configuration conf, URI uri,
       Class<T> xface, HAProxyFactory<T> factory) {
     super(conf, uri, xface, factory);
-    this.nnProxyInfo = new NNProxyInfo<T>(DFSUtilClient.getNNAddress(uri));
+    this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri), conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77e106f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index e819282..690ee0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -27,12 +27,10 @@ import java.net.URI;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.ClientGSIContext;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.io.retry.AtMostOnce;
 import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.io.retry.RetryPolicies;
@@ -40,8 +38,6 @@ import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.StandbyException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -181,49 +177,6 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     return lastProxy;
   }
 
-  private static <T extends ClientProtocol> HAServiceState getServiceState(
-      NNProxyInfo<T> pi) {
-    // TODO: should introduce new ClientProtocol method to verify the
-    // underlying service state, which does not require superuser access
-    // The is a workaround
-    IOException ioe = null;
-    try {
-      // Verify write access first
-      pi.proxy.reportBadBlocks(new LocatedBlock[0]);
-      return HAServiceState.ACTIVE; // Only active NameNode allows write
-    } catch (RemoteException re) {
-      IOException sbe = re.unwrapRemoteException(StandbyException.class);
-      if (!(sbe instanceof StandbyException)) {
-        ioe = re;
-      }
-    } catch (IOException e) {
-      ioe = e;
-    }
-    if (ioe != null) {
-      LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
-      return HAServiceState.STANDBY; // Just assume standby in this case
-                                     // Anything besides observer is fine
-    }
-    // Verify read access
-    // For now we assume only Observer nodes allow reads
-    // Stale reads on StandbyNode should be turned off
-    try {
-      pi.proxy.checkAccess("/", FsAction.READ);
-      return HAServiceState.OBSERVER;
-    } catch (RemoteException re) {
-      IOException sbe = re.unwrapRemoteException(StandbyException.class);
-      if (!(sbe instanceof StandbyException)) {
-        ioe = re;
-      }
-    } catch (IOException e) {
-      ioe = e;
-    }
-    if (ioe != null) {
-      LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
-    }
-    return HAServiceState.STANDBY;
-  }
-
   /**
    * Return the currently used proxy. If there is none, first calls
    * {@link #changeProxy(NNProxyInfo)} to initialize one.
@@ -254,7 +207,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     currentProxy = null;
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    currentProxy.setCachedState(getServiceState(currentProxy));
+    currentProxy.refreshCachedState();
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/77e106f7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
index 4d5bc13..3f56c96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
@@ -22,10 +22,13 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceStatus;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -38,10 +41,12 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
-
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link ObserverReadProxyProvider} under various configurations of
@@ -56,7 +61,7 @@ public class TestObserverReadProxyProvider {
   private Configuration conf;
 
   private ObserverReadProxyProvider<ClientProtocol> proxyProvider;
-  private ClientProtocolAnswer[] namenodeAnswers;
+  private NameNodeAnswer[] namenodeAnswers;
   private String[] namenodeAddrs;
 
   @Before
@@ -70,32 +75,53 @@ public class TestObserverReadProxyProvider {
   private void setupProxyProvider(int namenodeCount) throws Exception {
     String[] namenodeIDs = new String[namenodeCount];
     namenodeAddrs = new String[namenodeCount];
-    namenodeAnswers = new ClientProtocolAnswer[namenodeCount];
+    namenodeAnswers = new NameNodeAnswer[namenodeCount];
     ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
     Map<String, ClientProtocol> proxyMap = new HashMap<>();
+    HAServiceProtocol[] serviceProxies = new HAServiceProtocol[namenodeCount];
+    Map<String, HAServiceProtocol> serviceProxyMap = new HashMap<>();
     for (int i  = 0; i < namenodeCount; i++) {
       namenodeIDs[i] = "nn" + i;
       namenodeAddrs[i] = "namenode" + i + ".test:8020";
       conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns +
           "." + namenodeIDs[i], namenodeAddrs[i]);
-      namenodeAnswers[i] = new ClientProtocolAnswer();
+      namenodeAnswers[i] = new NameNodeAnswer();
       proxies[i] = mock(ClientProtocol.class);
-      doWrite(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
-      doRead(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
+      doWrite(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
+          .when(proxies[i]));
+      doRead(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
+          .when(proxies[i]));
+      serviceProxies[i] = mock(HAServiceProtocol.class);
+      Mockito.doAnswer(namenodeAnswers[i].serviceAnswer)
+          .when(serviceProxies[i]).getServiceStatus();
       proxyMap.put(namenodeAddrs[i], proxies[i]);
+      serviceProxyMap.put(namenodeAddrs[i], serviceProxies[i]);
     }
     conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
         Joiner.on(",").join(namenodeIDs));
-    proxyProvider = new ObserverReadProxyProvider<>(conf, nnURI,
-        ClientProtocol.class, new ClientHAProxyFactory<ClientProtocol>() {
+    proxyProvider = new ObserverReadProxyProvider<ClientProtocol>(conf, nnURI,
+        ClientProtocol.class,
+        new ClientHAProxyFactory<ClientProtocol>() {
+          @Override
+          public ClientProtocol createProxy(Configuration config,
+              InetSocketAddress nnAddr, Class<ClientProtocol> xface,
+              UserGroupInformation ugi, boolean withRetries,
+              AtomicBoolean fallbackToSimpleAuth) {
+            return proxyMap.get(nnAddr.toString());
+          }
+        })  {
       @Override
-      public ClientProtocol createProxy(Configuration conf,
-          InetSocketAddress nnAddr, Class<ClientProtocol> xface,
-          UserGroupInformation ugi, boolean withRetries,
-          AtomicBoolean fallbackToSimpleAuth) {
-        return proxyMap.get(nnAddr.toString());
+      protected List<NNProxyInfo<ClientProtocol>> getProxyAddresses(
+          URI uri, String addressKey) {
+        List<NNProxyInfo<ClientProtocol>> nnProxies =
+            super.getProxyAddresses(uri, addressKey);
+        for (NNProxyInfo<ClientProtocol> nnProxy : nnProxies) {
+          String addressStr = nnProxy.getAddress().toString();
+          nnProxy.setServiceProxyForTesting(serviceProxyMap.get(addressStr));
+        }
+        return nnProxies;
       }
-    });
+    };
     proxyProvider.setObserverReadEnabled(true);
   }
 
@@ -275,39 +301,62 @@ public class TestObserverReadProxyProvider {
   }
 
   /**
-   * An {@link Answer} used for mocking of a {@link ClientProtocol}. Setting
-   * the state or unreachability of this Answer will make the linked
-   * ClientProtocol respond as if it was communicating with a NameNode of
-   * the corresponding state. It is in Standby state by default.
+   * An {@link Answer} used for mocking of {@link ClientProtocol} and
+   * {@link HAServiceProtocol}. Setting the state or unreachability of this
+   * Answer will make the linked ClientProtocol respond as if it was
+   * communicating with a NameNode of the corresponding state. It is in Standby
+   * state by default.
    */
-  private static class ClientProtocolAnswer implements Answer<Void> {
+  private static class NameNodeAnswer {
 
     private volatile boolean unreachable = false;
     // Standby state by default
     private volatile boolean allowWrites = false;
     private volatile boolean allowReads = false;
 
-    @Override
-    public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-      if (unreachable) {
-        throw new IOException("Unavailable");
+    private ClientProtocolAnswer clientAnswer = new ClientProtocolAnswer();
+    private HAServiceProtocolAnswer serviceAnswer =
+        new HAServiceProtocolAnswer();
+
+    private class HAServiceProtocolAnswer implements Answer<HAServiceStatus> {
+      @Override
+      public HAServiceStatus answer(InvocationOnMock invocation)
+          throws Throwable {
+        HAServiceStatus status = mock(HAServiceStatus.class);
+        if (allowReads && allowWrites) {
+          when(status.getState()).thenReturn(HAServiceState.ACTIVE);
+        } else if (allowReads) {
+          when(status.getState()).thenReturn(HAServiceState.OBSERVER);
+        } else {
+          when(status.getState()).thenReturn(HAServiceState.STANDBY);
+        }
+        return status;
       }
-      switch (invocationOnMock.getMethod().getName()) {
+    }
+
+    private class ClientProtocolAnswer implements Answer<Void> {
+      @Override
+      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+        if (unreachable) {
+          throw new IOException("Unavailable");
+        }
+        switch (invocationOnMock.getMethod().getName()) {
         case "reportBadBlocks":
           if (!allowWrites) {
-            throw new RemoteException(StandbyException.class.getCanonicalName(),
-                "No writes!");
+            throw new RemoteException(
+                StandbyException.class.getCanonicalName(), "No writes!");
           }
           return null;
         case "checkAccess":
           if (!allowReads) {
-            throw new RemoteException(StandbyException.class.getCanonicalName(),
-                "No reads!");
+            throw new RemoteException(
+                StandbyException.class.getCanonicalName(), "No reads!");
           }
           return null;
         default:
           throw new IllegalArgumentException(
               "Only reportBadBlocks and checkAccess supported!");
+        }
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org