You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/08/31 16:11:13 UTC
[47/47] hadoop git commit: HDFS-13779. [SBN read] Implement proper
failover and observer failure handling logic for for
ObserverReadProxyProvider. Contributed by Erik Krogen.
HDFS-13779. [SBN read] Implement proper failover and observer failure handling logic for for ObserverReadProxyProvider. Contributed by Erik Krogen.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/039c158d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/039c158d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/039c158d
Branch: refs/heads/HDFS-12943
Commit: 039c158d2c8a45906e6ea5f9661391bc541ab0cb
Parents: 5320173
Author: Erik Krogen <xk...@apache.org>
Authored: Fri Aug 24 05:04:27 2018 -0700
Committer: Erik Krogen <xk...@apache.org>
Committed: Fri Aug 31 09:09:59 2018 -0700
----------------------------------------------------------------------
.../ha/AbstractNNFailoverProxyProvider.java | 16 +
.../namenode/ha/ObserverReadProxyProvider.java | 255 ++++++++------
.../server/namenode/ha/TestObserverNode.java | 27 +-
.../ha/TestObserverReadProxyProvider.java | 335 +++++++++++++++++++
4 files changed, 532 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c158d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
index 252b70d..32edb36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -111,6 +112,12 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
*/
public static class NNProxyInfo<T> extends ProxyInfo<T> {
private InetSocketAddress address;
+ /**
+ * The currently known state of the NameNode represented by this ProxyInfo.
+ * This may be out of date if the NameNode has changed state since the last
+ * time the state was checked.
+ */
+ private HAServiceState cachedState;
public NNProxyInfo(InetSocketAddress address) {
super(null, address.toString());
@@ -120,6 +127,15 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
public InetSocketAddress getAddress() {
return address;
}
+
+ public void setCachedState(HAServiceState state) {
+ cachedState = state;
+ }
+
+ public HAServiceState getCachedState() {
+ return cachedState;
+ }
+
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c158d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index dcae2db..e819282 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -20,18 +20,24 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
-import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.retry.AtMostOnce;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -59,16 +65,18 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
private static final Logger LOG = LoggerFactory.getLogger(
ObserverReadProxyProvider.class);
- /** Client-side context for syncing with the NameNode server side */
- private AlignmentContext alignmentContext;
+ /** Client-side context for syncing with the NameNode server side. */
+ private final AlignmentContext alignmentContext;
- private AbstractNNFailoverProxyProvider<T> failoverProxy;
- /** All NameNdoe proxies */
- private List<NNProxyInfo<T>> nameNodeProxies =
- new ArrayList<NNProxyInfo<T>>();
- /** Proxies for the observer namenodes */
- private final List<NNProxyInfo<T>> observerProxies =
- new ArrayList<NNProxyInfo<T>>();
+ /** The inner proxy provider used for active/standby failover. */
+ private final AbstractNNFailoverProxyProvider<T> failoverProxy;
+ /** List of all NameNode proxies. */
+ private final List<NNProxyInfo<T>> nameNodeProxies;
+
+ /** The policy used to determine if an exception is fatal or retriable. */
+ private final RetryPolicy observerRetryPolicy;
+ /** The combined proxy which redirects to other proxies as necessary. */
+ private final ProxyInfo<T> combinedProxy;
/**
* Whether reading from observer is enabled. If this is false, all read
@@ -77,12 +85,19 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
private boolean observerReadEnabled;
/**
- * Thread-local index to record the current index in the observer list.
+ * The index into the nameNodeProxies list currently being used. Should only
+ * be accessed in synchronized methods.
+ */
+ private int currentIndex = -1;
+ /**
+ * The proxy being used currently; this will match with currentIndex above.
+ * This field is volatile to allow reads without synchronization; updates
+ * should still be performed synchronously to maintain consistency between
+ * currentIndex and this field.
*/
- private static final ThreadLocal<Integer> currentIndex =
- ThreadLocal.withInitial(() -> 0);
+ private volatile NNProxyInfo<T> currentProxy;
- /** The last proxy that has been used. Only used for testing */
+ /** The last proxy that has been used. Only used for testing. */
private volatile ProxyInfo<T> lastProxy = null;
/**
@@ -90,63 +105,53 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
* {@link ConfiguredFailoverProxyProvider} for failover.
*/
public ObserverReadProxyProvider(
- Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory)
- throws IOException {
+ Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
this(conf, uri, xface, factory,
- new ConfiguredFailoverProxyProvider<T>(conf, uri, xface,factory));
+ new ConfiguredFailoverProxyProvider<>(conf, uri, xface,factory));
}
+ @SuppressWarnings("unchecked")
public ObserverReadProxyProvider(
Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory,
- AbstractNNFailoverProxyProvider<T> failoverProxy)
- throws IOException {
+ AbstractNNFailoverProxyProvider<T> failoverProxy) {
super(conf, uri, xface, factory);
this.failoverProxy = failoverProxy;
this.alignmentContext = new ClientGSIContext();
((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
+ // Don't bother configuring the number of retries and such on the retry
+ // policy since it is mainly only used for determining whether or not an
+ // exception is retriable or fatal
+ observerRetryPolicy = RetryPolicies.failoverOnNetworkException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL, 1);
+
// Get all NameNode proxies
nameNodeProxies = getProxyAddresses(uri,
HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
- // Find out all the observer proxies
- for (NNProxyInfo<T> pi : nameNodeProxies) {
- createProxyIfNeeded(pi);
- if (isObserverState(pi)) {
- observerProxies.add(pi);
- }
- }
-
- // TODO: No observers is not an error
- // Just direct all reads go to the active NameNode
- if (observerProxies.isEmpty()) {
- throw new RuntimeException("Couldn't find any namenode proxy in " +
- "OBSERVER state");
- }
- }
-
- public synchronized AlignmentContext getAlignmentContext() {
- return alignmentContext;
- }
- @SuppressWarnings("unchecked")
- @Override
- public synchronized ProxyInfo<T> getProxy() {
- // We just create a wrapped proxy containing all the proxies
+ // Create a wrapped proxy containing all the proxies. Since this combined
+ // proxy is just redirecting to other proxies, all invocations can share it.
StringBuilder combinedInfo = new StringBuilder("[");
-
- for (int i = 0; i < this.observerProxies.size(); i++) {
+ for (int i = 0; i < nameNodeProxies.size(); i++) {
if (i > 0) {
combinedInfo.append(",");
}
- combinedInfo.append(observerProxies.get(i).proxyInfo);
+ combinedInfo.append(nameNodeProxies.get(i).proxyInfo);
}
-
combinedInfo.append(']');
T wrappedProxy = (T) Proxy.newProxyInstance(
ObserverReadInvocationHandler.class.getClassLoader(),
- new Class<?>[]{xface},
- new ObserverReadInvocationHandler(observerProxies));
- return new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
+ new Class<?>[] { xface }, new ObserverReadInvocationHandler());
+ combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
+ }
+
+ public AlignmentContext getAlignmentContext() {
+ return alignmentContext;
+ }
+
+ @Override
+ public ProxyInfo<T> getProxy() {
+ return combinedProxy;
}
@Override
@@ -159,8 +164,11 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
*
* @return whether the 'method' is a read-only operation.
*/
- private boolean isRead(Method method) {
- return method.isAnnotationPresent(ReadOnly.class);
+ private static boolean isRead(Method method) {
+ if (!method.isAnnotationPresent(ReadOnly.class)) {
+ return false;
+ }
+ return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly();
}
@VisibleForTesting
@@ -168,21 +176,13 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
this.observerReadEnabled = flag;
}
- /**
- * After getting exception 'ex', whether we should retry the current request
- * on a different observer.
- */
- private boolean shouldRetry(Exception ex) throws Exception {
- // TODO: implement retry policy
- return true;
- }
-
@VisibleForTesting
ProxyInfo<T> getLastProxy() {
return lastProxy;
}
- boolean isObserverState(NNProxyInfo<T> pi) {
+ private static <T extends ClientProtocol> HAServiceState getServiceState(
+ NNProxyInfo<T> pi) {
// TODO: should introduce new ClientProtocol method to verify the
// underlying service state, which does not require superuser access
// The is a workaround
@@ -190,7 +190,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
try {
// Verify write access first
pi.proxy.reportBadBlocks(new LocatedBlock[0]);
- return false; // Only active NameNode allows write
+ return HAServiceState.ACTIVE; // Only active NameNode allows write
} catch (RemoteException re) {
IOException sbe = re.unwrapRemoteException(StandbyException.class);
if (!(sbe instanceof StandbyException)) {
@@ -200,15 +200,16 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
ioe = e;
}
if (ioe != null) {
- LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
- return false;
+ LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
+ return HAServiceState.STANDBY; // Just assume standby in this case
+ // Anything besides observer is fine
}
// Verify read access
// For now we assume only Observer nodes allow reads
// Stale reads on StandbyNode should be turned off
try {
pi.proxy.checkAccess("/", FsAction.READ);
- return true;
+ return HAServiceState.OBSERVER;
} catch (RemoteException re) {
IOException sbe = re.unwrapRemoteException(StandbyException.class);
if (!(sbe instanceof StandbyException)) {
@@ -218,29 +219,60 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
ioe = e;
}
if (ioe != null) {
- LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
+ LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
}
- return false;
+ return HAServiceState.STANDBY;
}
+ /**
+ * Return the currently used proxy. If there is none, first calls
+ * {@link #changeProxy(NNProxyInfo)} to initialize one.
+ */
+ private NNProxyInfo<T> getCurrentProxy() {
+ if (currentProxy == null) {
+ changeProxy(null);
+ }
+ return currentProxy;
+ }
- class ObserverReadInvocationHandler implements InvocationHandler {
- final List<NNProxyInfo<T>> observerProxies;
- final ProxyInfo<T> activeProxy;
-
- ObserverReadInvocationHandler(List<NNProxyInfo<T>> observerProxies) {
- this.observerProxies = observerProxies;
- this.activeProxy = failoverProxy.getProxy();
+ /**
+ * Move to the next proxy in the proxy list. If the NNProxyInfo supplied by
+ * the caller does not match the current proxy, the call is ignored; this is
+ * to handle concurrent calls (to avoid changing the proxy multiple times).
+ * The service state of the newly selected proxy will be updated before
+ * returning.
+ *
+ * @param initial The expected current proxy
+ */
+ private synchronized void changeProxy(NNProxyInfo<T> initial) {
+ if (currentProxy != initial) {
+ // Must have been a concurrent modification; ignore the move request
+ return;
}
+ // Attempt to force concurrent callers of getCurrentProxy to wait for the
+ // new proxy; best-effort by setting currentProxy to null
+ currentProxy = null;
+ currentIndex = (currentIndex + 1) % nameNodeProxies.size();
+ currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
+ currentProxy.setCachedState(getServiceState(currentProxy));
+ LOG.debug("Changed current proxy from {} to {}",
+ initial == null ? "none" : initial.proxyInfo,
+ currentProxy.proxyInfo);
+ }
+
+ /**
+ * An InvocationHandler to handle incoming requests. This class's invoke
+ * method contains the primary logic for redirecting to observers.
+ *
+ * If observer reads are enabled, attempt to send read operations to the
+ * current proxy. If it is not an observer, or the observer fails, adjust
+ * the current proxy and retry on the next one. If all proxies are tried
+ * without success, the request is forwarded to the active.
+ *
+ * Write requests are always forwarded to the active.
+ */
+ private class ObserverReadInvocationHandler implements InvocationHandler {
- /**
- * Sends read operations to the observer (if enabled) specified by the
- * current index, and send write operations to the active. If a observer
- * fails, we increment the index and retry the next one. If all observers
- * fail, the request is forwarded to the active.
- *
- * Write requests are always forwarded to the active.
- */
@Override
public Object invoke(Object proxy, final Method method, final Object[] args)
throws Throwable {
@@ -248,33 +280,65 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
Object retVal;
if (observerReadEnabled && isRead(method)) {
- // Loop through all the proxies, starting from the current index.
- for (int i = 0; i < observerProxies.size(); i++) {
- NNProxyInfo<T> current = observerProxies.get(currentIndex.get());
+ int failedObserverCount = 0;
+ int activeCount = 0;
+ int standbyCount = 0;
+ for (int i = 0; i < nameNodeProxies.size(); i++) {
+ NNProxyInfo<T> current = getCurrentProxy();
+ HAServiceState currState = current.getCachedState();
+ if (currState != HAServiceState.OBSERVER) {
+ if (currState == HAServiceState.ACTIVE) {
+ activeCount++;
+ } else if (currState == HAServiceState.STANDBY) {
+ standbyCount++;
+ }
+ LOG.debug("Skipping proxy {} for {} because it is in state {}",
+ current.proxyInfo, method.getName(), currState);
+ changeProxy(current);
+ continue;
+ }
+ LOG.debug("Attempting to service {} using proxy {}",
+ method.getName(), current.proxyInfo);
try {
retVal = method.invoke(current.proxy, args);
lastProxy = current;
+ LOG.debug("Invocation of {} using {} was successful",
+ method.getName(), current.proxyInfo);
return retVal;
- } catch (Exception e) {
- if (!shouldRetry(e)) {
+ } catch (InvocationTargetException ite) {
+ if (!(ite.getCause() instanceof Exception)) {
+ throw ite.getCause();
+ }
+ Exception e = (Exception) ite.getCause();
+ RetryAction retryInfo = observerRetryPolicy.shouldRetry(e, 0, 0,
+ method.isAnnotationPresent(Idempotent.class)
+ || method.isAnnotationPresent(AtMostOnce.class));
+ if (retryInfo.action == RetryAction.RetryDecision.FAIL) {
throw e;
+ } else {
+ failedObserverCount++;
+ LOG.warn(
+ "Invocation returned exception on [{}]; {} failure(s) so far",
+ current.proxyInfo, failedObserverCount, e);
+ changeProxy(current);
}
- currentIndex.set((currentIndex.get() + 1) % observerProxies.size());
- LOG.warn("Invocation returned exception on [{}]",
- current.proxyInfo, e.getCause());
}
}
// If we get here, it means all observers have failed.
- LOG.warn("All observers have failed for read request {}. " +
- "Fall back on active: {}", method.getName(), activeProxy);
+ LOG.warn("{} observers have failed for read request {}; also found " +
+ "{} standby and {} active. Falling back to active.",
+ failedObserverCount, standbyCount, activeCount, method.getName());
}
// Either all observers have failed, or that it is a write request.
// In either case, we'll forward the request to active NameNode.
+ LOG.debug("Using failoverProxy to service {}", method.getName());
+ ProxyInfo<T> activeProxy = failoverProxy.getProxy();
try {
retVal = method.invoke(activeProxy.proxy, args);
- } catch (Exception e) {
+ } catch (InvocationTargetException e) {
+ // This exception will be handled by higher layers
throw e.getCause();
}
lastProxy = activeProxy;
@@ -284,7 +348,6 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
@Override
public synchronized void close() throws IOException {
- failoverProxy.close();
for (ProxyInfo<T> pi : nameNodeProxies) {
if (pi.proxy != null) {
if (pi.proxy instanceof Closeable) {
@@ -292,8 +355,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
} else {
RPC.stopProxy(pi.proxy);
}
+ // Set to null to avoid the failoverProxy having to re-do the close
+ // if it is sharing a proxy instance
+ pi.proxy = null;
}
}
+ failoverProxy.close();
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c158d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
index de34454..16371b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
@@ -27,22 +27,23 @@ import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Proxy;
import java.net.URI;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -200,6 +201,9 @@ public class TestObserverNode {
// Start the observer again - requests should go to observer
dfsCluster.restartNameNode(2);
dfsCluster.transitionToObserver(2);
+ // The first request goes to the active because it has not refreshed yet;
+ // the second will properly go to the observer
+ dfs.getFileStatus(testPath);
dfs.getFileStatus(testPath);
assertSentTo(2);
}
@@ -231,6 +235,9 @@ public class TestObserverNode {
dfsCluster.transitionToObserver(2);
dfs.getFileStatus(testPath);
+ // The first request goes to the active because it has not refreshed yet;
+ // the second will properly go to the observer
+ dfs.getFileStatus(testPath);
assertSentTo(2);
}
@@ -291,6 +298,10 @@ public class TestObserverNode {
assertEquals(0, rc);
}
+ // TODO this does not currently work because fetching the service state from
+ // e.g. the StandbyNameNode also waits for the transaction ID to catch up.
+ // This is disabled pending HDFS-13872 and HDFS-13749.
+ @Ignore("Disabled until HDFS-13872 and HDFS-13749 are committed")
@Test
public void testMsyncSimple() throws Exception {
// disable fast path here because this test's assertions are based on the
@@ -304,7 +315,8 @@ public class TestObserverNode {
setUpCluster(1);
setObserverRead(true);
- AtomicBoolean readSucceed = new AtomicBoolean(false);
+ // 0 == not completed, 1 == succeeded, -1 == failed
+ AtomicInteger readStatus = new AtomicInteger(0);
dfs.mkdir(testPath, FsPermission.getDefault());
assertSentTo(0);
@@ -313,20 +325,21 @@ public class TestObserverNode {
try {
// this read will block until roll and tail edits happen.
dfs.getFileStatus(testPath);
- readSucceed.set(true);
+ readStatus.set(1);
} catch (IOException e) {
e.printStackTrace();
+ readStatus.set(-1);
}
});
reader.start();
// the reader is still blocking, not succeeded yet.
- assertFalse(readSucceed.get());
+ assertEquals(0, readStatus.get());
rollEditLogAndTail(0);
// wait a while for all the change to be done
- Thread.sleep(100);
+ GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
// the reader should have succeed.
- assertTrue(readSucceed.get());
+ assertEquals(1, readStatus.get());
}
private void setUpCluster(int numObservers) throws Exception {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c158d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
new file mode 100644
index 0000000..4d5bc13
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import com.google.common.base.Joiner;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * Tests for {@link ObserverReadProxyProvider} under various configurations of
+ * NameNode states. Mainly testing that the proxy provider picks the correct
+ * NameNode to communicate with.
+ */
+public class TestObserverReadProxyProvider {
+
+ private static final LocatedBlock[] EMPTY_BLOCKS = new LocatedBlock[0];
+ private String ns;
+ private URI nnURI;
+ private Configuration conf;
+
+ private ObserverReadProxyProvider<ClientProtocol> proxyProvider;
+ private ClientProtocolAnswer[] namenodeAnswers;
+ private String[] namenodeAddrs;
+
+ @Before
+ public void setup() throws Exception {
+ ns = "testcluster";
+ nnURI = URI.create("hdfs://" + ns);
+ conf = new Configuration();
+ conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
+ }
+
+ private void setupProxyProvider(int namenodeCount) throws Exception {
+ String[] namenodeIDs = new String[namenodeCount];
+ namenodeAddrs = new String[namenodeCount];
+ namenodeAnswers = new ClientProtocolAnswer[namenodeCount];
+ ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
+ Map<String, ClientProtocol> proxyMap = new HashMap<>();
+ for (int i = 0; i < namenodeCount; i++) {
+ namenodeIDs[i] = "nn" + i;
+ namenodeAddrs[i] = "namenode" + i + ".test:8020";
+ conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns +
+ "." + namenodeIDs[i], namenodeAddrs[i]);
+ namenodeAnswers[i] = new ClientProtocolAnswer();
+ proxies[i] = mock(ClientProtocol.class);
+ doWrite(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
+ doRead(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
+ proxyMap.put(namenodeAddrs[i], proxies[i]);
+ }
+ conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
+ Joiner.on(",").join(namenodeIDs));
+ proxyProvider = new ObserverReadProxyProvider<>(conf, nnURI,
+ ClientProtocol.class, new ClientHAProxyFactory<ClientProtocol>() {
+ @Override
+ public ClientProtocol createProxy(Configuration conf,
+ InetSocketAddress nnAddr, Class<ClientProtocol> xface,
+ UserGroupInformation ugi, boolean withRetries,
+ AtomicBoolean fallbackToSimpleAuth) {
+ return proxyMap.get(nnAddr.toString());
+ }
+ });
+ proxyProvider.setObserverReadEnabled(true);
+ }
+
+ @Test
+ public void testReadOperationOnObserver() throws Exception {
+ setupProxyProvider(3);
+ namenodeAnswers[0].setActiveState();
+ namenodeAnswers[2].setObserverState();
+
+ doRead();
+ assertHandledBy(2);
+ }
+
+ @Test
+ public void testWriteOperationOnActive() throws Exception {
+ setupProxyProvider(3);
+ namenodeAnswers[0].setActiveState();
+ namenodeAnswers[2].setObserverState();
+
+ doWrite();
+ assertHandledBy(0);
+ }
+
+ @Test
+ public void testUnreachableObserverWithNoBackup() throws Exception {
+ setupProxyProvider(2);
+ namenodeAnswers[0].setActiveState();
+ namenodeAnswers[1].setObserverState();
+
+ namenodeAnswers[1].setUnreachable(true);
+ // Confirm that read still succeeds even though observer is not available
+ doRead();
+ assertHandledBy(0);
+ }
+
+ @Test
+ public void testUnreachableObserverWithMultiple() throws Exception {
+ setupProxyProvider(4);
+ namenodeAnswers[0].setActiveState();
+ namenodeAnswers[2].setObserverState();
+ namenodeAnswers[3].setObserverState();
+
+ doRead();
+ assertHandledBy(2);
+
+ namenodeAnswers[2].setUnreachable(true);
+ doRead();
+ // Fall back to the second observer node
+ assertHandledBy(3);
+
+ namenodeAnswers[2].setUnreachable(false);
+ doRead();
+ // Current index has changed, so although the first observer is back,
+ // it should continue requesting from the second observer
+ assertHandledBy(3);
+
+ namenodeAnswers[3].setUnreachable(true);
+ doRead();
+ // Now that second is unavailable, go back to using the first observer
+ assertHandledBy(2);
+
+ namenodeAnswers[2].setUnreachable(true);
+ doRead();
+ // Both observers are now unavailable, so it should fall back to active
+ assertHandledBy(0);
+ }
+
+ @Test
+ public void testObserverToActive() throws Exception {
+ setupProxyProvider(3);
+ namenodeAnswers[0].setActiveState();
+ namenodeAnswers[1].setObserverState();
+ namenodeAnswers[2].setObserverState();
+
+ doWrite();
+ assertHandledBy(0);
+
+ // Transition an observer to active
+ namenodeAnswers[0].setStandbyState();
+ namenodeAnswers[1].setActiveState();
+ try {
+ doWrite();
+ fail("Write should fail; failover required");
+ } catch (RemoteException re) {
+ assertEquals(re.getClassName(),
+ StandbyException.class.getCanonicalName());
+ }
+ proxyProvider.performFailover(proxyProvider.getProxy().proxy);
+ doWrite();
+ // After failover, previous observer is now active
+ assertHandledBy(1);
+ doRead();
+ assertHandledBy(2);
+
+ // Transition back to original state but second observer not available
+ namenodeAnswers[0].setActiveState();
+ namenodeAnswers[1].setObserverState();
+ namenodeAnswers[2].setUnreachable(true);
+ for (int i = 0; i < 2; i++) {
+ try {
+ doWrite();
+ fail("Should have failed");
+ } catch (IOException ioe) {
+ proxyProvider.performFailover(proxyProvider.getProxy().proxy);
+ }
+ }
+ doWrite();
+ assertHandledBy(0);
+
+ doRead();
+ assertHandledBy(1);
+ }
+
+ @Test
+ public void testObserverToStandby() throws Exception {
+ setupProxyProvider(3);
+ namenodeAnswers[0].setActiveState();
+ namenodeAnswers[1].setObserverState();
+ namenodeAnswers[2].setObserverState();
+
+ doRead();
+ assertHandledBy(1);
+
+ namenodeAnswers[1].setStandbyState();
+ doRead();
+ assertHandledBy(2);
+
+ namenodeAnswers[2].setStandbyState();
+ doRead();
+ assertHandledBy(0);
+
+ namenodeAnswers[1].setObserverState();
+ doRead();
+ assertHandledBy(1);
+ }
+
+ @Test
+ public void testSingleObserverToStandby() throws Exception {
+ setupProxyProvider(2);
+ namenodeAnswers[0].setActiveState();
+ namenodeAnswers[1].setObserverState();
+
+ doRead();
+ assertHandledBy(1);
+
+ namenodeAnswers[1].setStandbyState();
+ doRead();
+ assertHandledBy(0);
+
+ namenodeAnswers[1].setObserverState();
+ // The proxy provider still thinks the second NN is in observer state,
+ // so it will take a second call for it to notice the new observer
+ doRead();
+ doRead();
+ assertHandledBy(1);
+ }
+
+ private void doRead() throws Exception {
+ doRead(proxyProvider.getProxy().proxy);
+ }
+
+ private void doWrite() throws Exception {
+ doWrite(proxyProvider.getProxy().proxy);
+ }
+
+ private void assertHandledBy(int namenodeIdx) {
+ assertEquals(namenodeAddrs[namenodeIdx],
+ proxyProvider.getLastProxy().proxyInfo);
+ }
+
+ private static void doWrite(ClientProtocol client) throws Exception {
+ client.reportBadBlocks(EMPTY_BLOCKS);
+ }
+
+ private static void doRead(ClientProtocol client) throws Exception {
+ client.checkAccess("/", FsAction.READ);
+ }
+
+ /**
+ * An {@link Answer} used for mocking of a {@link ClientProtocol}. Setting
+ * the state or unreachability of this Answer will make the linked
+ * ClientProtocol respond as if it was communicating with a NameNode of
+ * the corresponding state. It is in Standby state by default.
+ */
+ private static class ClientProtocolAnswer implements Answer<Void> {
+
+ private volatile boolean unreachable = false;
+ // Standby state by default
+ private volatile boolean allowWrites = false;
+ private volatile boolean allowReads = false;
+
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ if (unreachable) {
+ throw new IOException("Unavailable");
+ }
+ switch (invocationOnMock.getMethod().getName()) {
+ case "reportBadBlocks":
+ if (!allowWrites) {
+ throw new RemoteException(StandbyException.class.getCanonicalName(),
+ "No writes!");
+ }
+ return null;
+ case "checkAccess":
+ if (!allowReads) {
+ throw new RemoteException(StandbyException.class.getCanonicalName(),
+ "No reads!");
+ }
+ return null;
+ default:
+ throw new IllegalArgumentException(
+ "Only reportBadBlocks and checkAccess supported!");
+ }
+ }
+
+ void setUnreachable(boolean unreachable) {
+ this.unreachable = unreachable;
+ }
+
+ void setActiveState() {
+ allowReads = true;
+ allowWrites = true;
+ }
+
+ void setStandbyState() {
+ allowReads = false;
+ allowWrites = false;
+ }
+
+ void setObserverState() {
+ allowReads = true;
+ allowWrites = false;
+ }
+
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org