You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/09/20 20:30:58 UTC
[50/50] [abbrv] hadoop git commit: HDFS-13749. [SBN read] Use
getServiceStatus to discover observer namenodes. Contributed by Chao Sun.
HDFS-13749. [SBN read] Use getServiceStatus to discover observer namenodes. Contributed by Chao Sun.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/77e106f7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/77e106f7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/77e106f7
Branch: refs/heads/HDFS-12943
Commit: 77e106f74715a7bb79effe38c245ec7c1566be72
Parents: a3810f7
Author: Erik Krogen <xk...@apache.org>
Authored: Thu Sep 20 13:27:58 2018 -0700
Committer: Erik Krogen <xk...@apache.org>
Committed: Thu Sep 20 13:27:58 2018 -0700
----------------------------------------------------------------------
.../hadoop/hdfs/NameNodeProxiesClient.java | 47 ++++++++-
.../ha/AbstractNNFailoverProxyProvider.java | 36 +++++--
.../namenode/ha/IPFailoverProxyProvider.java | 2 +-
.../namenode/ha/ObserverReadProxyProvider.java | 49 +--------
.../ha/TestObserverReadProxyProvider.java | 105 ++++++++++++++-----
5 files changed, 151 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77e106f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
index 284e4ef..f90d671 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java
@@ -25,12 +25,16 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,13 +66,14 @@ import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
/**
- * Create proxy objects with {@link ClientProtocol} to communicate with a remote
- * NN. Generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol(
+ * Create proxy objects with {@link ClientProtocol} and
+ * {@link HAServiceProtocol} to communicate with a remote NN. For the former,
+ * generally use {@link NameNodeProxiesClient#createProxyWithClientProtocol(
* Configuration, URI, AtomicBoolean)}, which will create either an HA- or
* non-HA-enabled client proxy as appropriate.
*
- * For creating proxy objects with other protocols, please see
- * {@link NameNodeProxies#createProxy(Configuration, URI, Class)}.
+ * For creating proxy objects with other protocols, please see the server-side
+ * counterpart {@code NameNodeProxies#createProxy}
*/
@InterfaceAudience.Private
public class NameNodeProxiesClient {
@@ -76,6 +81,11 @@ public class NameNodeProxiesClient {
private static final Logger LOG = LoggerFactory.getLogger(
NameNodeProxiesClient.class);
+ /** Maximum # of retries for HAProxy with HAServiceProtocol. */
+ private static final int MAX_RETRIES = 3;
+ /** Initial retry delay for HAProxy with HAServiceProtocol. */
+ private static final int DELAY_MILLISECONDS = 200;
+
/**
* Wrapper for a client proxy as well as its associated service ID.
* This is simply used as a tuple-like return type for created NN proxy.
@@ -119,7 +129,6 @@ public class NameNodeProxiesClient {
* @return an object containing both the proxy and the associated
* delegation token service it corresponds to
* @throws IOException if there is an error creating the proxy
- * @see {@link NameNodeProxies#createProxy(Configuration, URI, Class)}.
*/
public static ProxyAndInfo<ClientProtocol> createProxyWithClientProtocol(
Configuration conf, URI nameNodeUri, AtomicBoolean fallbackToSimpleAuth)
@@ -343,6 +352,34 @@ public class NameNodeProxiesClient {
fallbackToSimpleAuth, null);
}
+ /**
+ * Creates a non-HA proxy object with {@link HAServiceProtocol} to the
+ * given NameNode address, using the provided configuration. The proxy will
+ * use the RPC timeout configuration specified via {@link
+ * org.apache.hadoop.fs.CommonConfigurationKeys#IPC_CLIENT_RPC_TIMEOUT_KEY}.
+ * Upon failures, this will retry up to certain times with {@link RetryProxy}.
+ *
+ * @param address the NameNode address
+ * @param conf the configuration to be used
+ * @return a non-HA proxy with {@link HAServiceProtocol}.
+ */
+ public static HAServiceProtocol createNonHAProxyWithHAServiceProtocol(
+ InetSocketAddress address, Configuration conf) throws IOException {
+ RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(
+ MAX_RETRIES, DELAY_MILLISECONDS, TimeUnit.MILLISECONDS);
+
+ HAServiceProtocol proxy =
+ new HAServiceProtocolClientSideTranslatorPB(
+ address, conf, NetUtils.getDefaultSocketFactory(conf),
+ Client.getRpcTimeout(conf));
+ return (HAServiceProtocol) RetryProxy.create(
+ HAServiceProtocol.class,
+ new DefaultFailoverProxyProvider<>(HAServiceProtocol.class, proxy),
+ new HashMap<>(),
+ timeoutPolicy
+ );
+ }
+
public static ClientProtocol createProxyWithAlignmentContext(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AtomicBoolean fallbackToSimpleAuth,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77e106f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
index 32edb36..1b5ad16 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
@@ -28,11 +28,14 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient;
+import org.apache.hadoop.hdfs.NameNodeProxiesClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.security.UserGroupInformation;
@@ -119,23 +122,44 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
*/
private HAServiceState cachedState;
- public NNProxyInfo(InetSocketAddress address) {
+ /** Proxy for getting HA service status from the given NameNode. */
+ private HAServiceProtocol serviceProxy;
+
+ public NNProxyInfo(InetSocketAddress address, Configuration conf) {
super(null, address.toString());
this.address = address;
+ try {
+ serviceProxy = NameNodeProxiesClient
+ .createNonHAProxyWithHAServiceProtocol(address, conf);
+ } catch (IOException ioe) {
+ LOG.error("Failed to create HAServiceProtocol proxy to NameNode" +
+ " at {}", address, ioe);
+ throw new RuntimeException(ioe);
+ }
}
public InetSocketAddress getAddress() {
return address;
}
- public void setCachedState(HAServiceState state) {
- cachedState = state;
+ public void refreshCachedState() {
+ try {
+ cachedState = serviceProxy.getServiceStatus().getState();
+ } catch (IOException e) {
+ LOG.warn("Failed to connect to {}. Setting cached state to Standby",
+ address, e);
+ cachedState = HAServiceState.STANDBY;
+ }
}
public HAServiceState getCachedState() {
return cachedState;
}
+ @VisibleForTesting
+ public void setServiceProxyForTesting(HAServiceProtocol proxy) {
+ this.serviceProxy = proxy;
+ }
}
@Override
@@ -153,8 +177,8 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
pi.proxy = factory.createProxy(conf,
pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth());
} catch (IOException ioe) {
- LOG.error("{} Failed to create RPC proxy to NameNode",
- this.getClass().getSimpleName(), ioe);
+ LOG.error("{} Failed to create RPC proxy to NameNode at {}",
+ this.getClass().getSimpleName(), pi.address, ioe);
throw new RuntimeException(ioe);
}
}
@@ -178,7 +202,7 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
for (InetSocketAddress address : addressesOfNns) {
- proxies.add(new NNProxyInfo<T>(address));
+ proxies.add(new NNProxyInfo<T>(address, conf));
}
// Randomize the list to prevent all clients pointing to the same one
boolean randomized = getRandomOrder(conf, uri);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77e106f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
index e703740..8062e79 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
@@ -48,7 +48,7 @@ public class IPFailoverProxyProvider<T> extends
public IPFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory) {
super(conf, uri, xface, factory);
- this.nnProxyInfo = new NNProxyInfo<T>(DFSUtilClient.getNNAddress(uri));
+ this.nnProxyInfo = new NNProxyInfo<>(DFSUtilClient.getNNAddress(uri), conf);
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77e106f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index e819282..690ee0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -27,12 +27,10 @@ import java.net.URI;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.io.retry.RetryPolicies;
@@ -40,8 +38,6 @@ import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.ipc.StandbyException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -181,49 +177,6 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
return lastProxy;
}
- private static <T extends ClientProtocol> HAServiceState getServiceState(
- NNProxyInfo<T> pi) {
- // TODO: should introduce new ClientProtocol method to verify the
- // underlying service state, which does not require superuser access
- // The is a workaround
- IOException ioe = null;
- try {
- // Verify write access first
- pi.proxy.reportBadBlocks(new LocatedBlock[0]);
- return HAServiceState.ACTIVE; // Only active NameNode allows write
- } catch (RemoteException re) {
- IOException sbe = re.unwrapRemoteException(StandbyException.class);
- if (!(sbe instanceof StandbyException)) {
- ioe = re;
- }
- } catch (IOException e) {
- ioe = e;
- }
- if (ioe != null) {
- LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
- return HAServiceState.STANDBY; // Just assume standby in this case
- // Anything besides observer is fine
- }
- // Verify read access
- // For now we assume only Observer nodes allow reads
- // Stale reads on StandbyNode should be turned off
- try {
- pi.proxy.checkAccess("/", FsAction.READ);
- return HAServiceState.OBSERVER;
- } catch (RemoteException re) {
- IOException sbe = re.unwrapRemoteException(StandbyException.class);
- if (!(sbe instanceof StandbyException)) {
- ioe = re;
- }
- } catch (IOException e) {
- ioe = e;
- }
- if (ioe != null) {
- LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
- }
- return HAServiceState.STANDBY;
- }
-
/**
* Return the currently used proxy. If there is none, first calls
* {@link #changeProxy(NNProxyInfo)} to initialize one.
@@ -254,7 +207,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
currentProxy = null;
currentIndex = (currentIndex + 1) % nameNodeProxies.size();
currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
- currentProxy.setCachedState(getServiceState(currentProxy));
+ currentProxy.refreshCachedState();
LOG.debug("Changed current proxy from {} to {}",
initial == null ? "none" : initial.proxyInfo,
currentProxy.proxyInfo);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/77e106f7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
index 4d5bc13..3f56c96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
@@ -22,10 +22,13 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -38,10 +41,12 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
-
+import static org.mockito.Mockito.when;
/**
* Tests for {@link ObserverReadProxyProvider} under various configurations of
@@ -56,7 +61,7 @@ public class TestObserverReadProxyProvider {
private Configuration conf;
private ObserverReadProxyProvider<ClientProtocol> proxyProvider;
- private ClientProtocolAnswer[] namenodeAnswers;
+ private NameNodeAnswer[] namenodeAnswers;
private String[] namenodeAddrs;
@Before
@@ -70,32 +75,53 @@ public class TestObserverReadProxyProvider {
private void setupProxyProvider(int namenodeCount) throws Exception {
String[] namenodeIDs = new String[namenodeCount];
namenodeAddrs = new String[namenodeCount];
- namenodeAnswers = new ClientProtocolAnswer[namenodeCount];
+ namenodeAnswers = new NameNodeAnswer[namenodeCount];
ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
Map<String, ClientProtocol> proxyMap = new HashMap<>();
+ HAServiceProtocol[] serviceProxies = new HAServiceProtocol[namenodeCount];
+ Map<String, HAServiceProtocol> serviceProxyMap = new HashMap<>();
for (int i = 0; i < namenodeCount; i++) {
namenodeIDs[i] = "nn" + i;
namenodeAddrs[i] = "namenode" + i + ".test:8020";
conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns +
"." + namenodeIDs[i], namenodeAddrs[i]);
- namenodeAnswers[i] = new ClientProtocolAnswer();
+ namenodeAnswers[i] = new NameNodeAnswer();
proxies[i] = mock(ClientProtocol.class);
- doWrite(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
- doRead(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
+ doWrite(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
+ .when(proxies[i]));
+ doRead(Mockito.doAnswer(namenodeAnswers[i].clientAnswer)
+ .when(proxies[i]));
+ serviceProxies[i] = mock(HAServiceProtocol.class);
+ Mockito.doAnswer(namenodeAnswers[i].serviceAnswer)
+ .when(serviceProxies[i]).getServiceStatus();
proxyMap.put(namenodeAddrs[i], proxies[i]);
+ serviceProxyMap.put(namenodeAddrs[i], serviceProxies[i]);
}
conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
Joiner.on(",").join(namenodeIDs));
- proxyProvider = new ObserverReadProxyProvider<>(conf, nnURI,
- ClientProtocol.class, new ClientHAProxyFactory<ClientProtocol>() {
+ proxyProvider = new ObserverReadProxyProvider<ClientProtocol>(conf, nnURI,
+ ClientProtocol.class,
+ new ClientHAProxyFactory<ClientProtocol>() {
+ @Override
+ public ClientProtocol createProxy(Configuration config,
+ InetSocketAddress nnAddr, Class<ClientProtocol> xface,
+ UserGroupInformation ugi, boolean withRetries,
+ AtomicBoolean fallbackToSimpleAuth) {
+ return proxyMap.get(nnAddr.toString());
+ }
+ }) {
@Override
- public ClientProtocol createProxy(Configuration conf,
- InetSocketAddress nnAddr, Class<ClientProtocol> xface,
- UserGroupInformation ugi, boolean withRetries,
- AtomicBoolean fallbackToSimpleAuth) {
- return proxyMap.get(nnAddr.toString());
+ protected List<NNProxyInfo<ClientProtocol>> getProxyAddresses(
+ URI uri, String addressKey) {
+ List<NNProxyInfo<ClientProtocol>> nnProxies =
+ super.getProxyAddresses(uri, addressKey);
+ for (NNProxyInfo<ClientProtocol> nnProxy : nnProxies) {
+ String addressStr = nnProxy.getAddress().toString();
+ nnProxy.setServiceProxyForTesting(serviceProxyMap.get(addressStr));
+ }
+ return nnProxies;
}
- });
+ };
proxyProvider.setObserverReadEnabled(true);
}
@@ -275,39 +301,62 @@ public class TestObserverReadProxyProvider {
}
/**
- * An {@link Answer} used for mocking of a {@link ClientProtocol}. Setting
- * the state or unreachability of this Answer will make the linked
- * ClientProtocol respond as if it was communicating with a NameNode of
- * the corresponding state. It is in Standby state by default.
+ * An {@link Answer} used for mocking of {@link ClientProtocol} and
+ * {@link HAServiceProtocol}. Setting the state or unreachability of this
+ * Answer will make the linked ClientProtocol respond as if it was
+ * communicating with a NameNode of the corresponding state. It is in Standby
+ * state by default.
*/
- private static class ClientProtocolAnswer implements Answer<Void> {
+ private static class NameNodeAnswer {
private volatile boolean unreachable = false;
// Standby state by default
private volatile boolean allowWrites = false;
private volatile boolean allowReads = false;
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- if (unreachable) {
- throw new IOException("Unavailable");
+ private ClientProtocolAnswer clientAnswer = new ClientProtocolAnswer();
+ private HAServiceProtocolAnswer serviceAnswer =
+ new HAServiceProtocolAnswer();
+
+ private class HAServiceProtocolAnswer implements Answer<HAServiceStatus> {
+ @Override
+ public HAServiceStatus answer(InvocationOnMock invocation)
+ throws Throwable {
+ HAServiceStatus status = mock(HAServiceStatus.class);
+ if (allowReads && allowWrites) {
+ when(status.getState()).thenReturn(HAServiceState.ACTIVE);
+ } else if (allowReads) {
+ when(status.getState()).thenReturn(HAServiceState.OBSERVER);
+ } else {
+ when(status.getState()).thenReturn(HAServiceState.STANDBY);
+ }
+ return status;
}
- switch (invocationOnMock.getMethod().getName()) {
+ }
+
+ private class ClientProtocolAnswer implements Answer<Void> {
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ if (unreachable) {
+ throw new IOException("Unavailable");
+ }
+ switch (invocationOnMock.getMethod().getName()) {
case "reportBadBlocks":
if (!allowWrites) {
- throw new RemoteException(StandbyException.class.getCanonicalName(),
- "No writes!");
+ throw new RemoteException(
+ StandbyException.class.getCanonicalName(), "No writes!");
}
return null;
case "checkAccess":
if (!allowReads) {
- throw new RemoteException(StandbyException.class.getCanonicalName(),
- "No reads!");
+ throw new RemoteException(
+ StandbyException.class.getCanonicalName(), "No reads!");
}
return null;
default:
throw new IllegalArgumentException(
"Only reportBadBlocks and checkAccess supported!");
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org