You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2019/10/04 18:41:26 UTC
[hadoop] branch branch-3.1 updated: HDFS-14245. [SBN read] Enable
ObserverReadProxyProvider to work with non-ClientProtocol proxy types.
Contributed by Erik Krogen.
This is an automated email from the ASF dual-hosted git repository.
xkrogen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 9fdb849 HDFS-14245. [SBN read] Enable ObserverReadProxyProvider to work with non-ClientProtocol proxy types. Contributed by Erik Krogen.
9fdb849 is described below
commit 9fdb849e034573bb44abd593eefa1e13a3261376
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Wed Apr 17 14:38:24 2019 -0700
HDFS-14245. [SBN read] Enable ObserverReadProxyProvider to work with non-ClientProtocol proxy types. Contributed by Erik Krogen.
(cherry picked from 5847e0014343f60f853cb796781ca1fa03a72efd)
(cherry picked from 6630c9b75d65deefb5550e355eef7783909a57bc)
---
.../ha/AbstractNNFailoverProxyProvider.java | 3 +-
.../namenode/ha/ObserverReadProxyProvider.java | 54 ++++++++++++++++------
.../namenode/ha/TestDelegationTokensWithHA.java | 2 +-
.../hdfs/server/namenode/ha/TestObserverNode.java | 12 +++++
.../namenode/ha/TestObserverReadProxyProvider.java | 29 ++++++++++++
5 files changed, 83 insertions(+), 17 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
index 572cb1c..646b100 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
@@ -115,7 +115,8 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
/**
* The currently known state of the NameNode represented by this ProxyInfo.
* This may be out of date if the NameNode has changed state since the last
- * time the state was checked.
+ * time the state was checked. If the NameNode could not be contacted, this
+ * will store null to indicate an unknown state.
*/
private HAServiceState cachedState;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index 2ccf885..ac4b1e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -66,7 +66,7 @@ import com.google.common.annotations.VisibleForTesting;
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class ObserverReadProxyProvider<T extends ClientProtocol>
+public class ObserverReadProxyProvider<T>
extends AbstractNNFailoverProxyProvider<T> {
@VisibleForTesting
static final Logger LOG = LoggerFactory.getLogger(
@@ -189,7 +189,13 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
// TODO : make this configurable or remove this variable
- this.observerReadEnabled = true;
+ if (wrappedProxy instanceof ClientProtocol) {
+ this.observerReadEnabled = true;
+ } else {
+ LOG.info("Disabling observer reads for {} because the requested proxy "
+ + "class does not implement {}", uri, ClientProtocol.class.getName());
+ this.observerReadEnabled = false;
+ }
}
public AlignmentContext getAlignmentContext() {
@@ -267,7 +273,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
private HAServiceState getHAServiceState(NNProxyInfo<T> proxyInfo) {
IOException ioe;
try {
- return proxyInfo.proxy.getHAServiceState();
+ return getProxyAsClientProtocol(proxyInfo.proxy).getHAServiceState();
} catch (RemoteException re) {
// Though a Standby will allow a getHAServiceState call, it won't allow
// delegation token lookup, so if DT is used it throws StandbyException
@@ -284,7 +290,19 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
LOG.debug("Failed to connect to {} while fetching HAServiceState",
proxyInfo.getAddress(), ioe);
}
- return HAServiceState.STANDBY;
+ return null;
+ }
+
+ /**
+ * Return the input proxy, cast as a {@link ClientProtocol}. This catches any
+ * {@link ClassCastException} and wraps it in a more helpful message. This
+ * should ONLY be called if the caller is certain that the proxy is, in fact,
+ * a {@link ClientProtocol}.
+ */
+ private ClientProtocol getProxyAsClientProtocol(T proxy) {
+ assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy "
+ + "of class " + proxy.getClass() + " as if it was a ClientProtocol.";
+ return (ClientProtocol) proxy;
}
/**
@@ -299,7 +317,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
if (msynced) {
return; // No need for an msync
}
- failoverProxy.getProxy().proxy.msync();
+ getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
msynced = true;
lastMsyncTimeMs = Time.monotonicNow();
}
@@ -315,7 +333,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
private void autoMsyncIfNecessary() throws IOException {
if (autoMsyncPeriodMs == 0) {
// Always msync
- failoverProxy.getProxy().proxy.msync();
+ getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
} else if (autoMsyncPeriodMs > 0) {
if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
synchronized (this) {
@@ -324,7 +342,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
// Re-check the entry criterion since the status may have changed
// while waiting for the lock.
if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
- failoverProxy.getProxy().proxy.msync();
+ getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
lastMsyncTimeMs = Time.monotonicNow();
}
}
@@ -363,6 +381,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
int failedObserverCount = 0;
int activeCount = 0;
int standbyCount = 0;
+ int unreachableCount = 0;
for (int i = 0; i < nameNodeProxies.size(); i++) {
NNProxyInfo<T> current = getCurrentProxy();
HAServiceState currState = current.getCachedState();
@@ -371,9 +390,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
activeCount++;
} else if (currState == HAServiceState.STANDBY) {
standbyCount++;
+ } else if (currState == null) {
+ unreachableCount++;
}
LOG.debug("Skipping proxy {} for {} because it is in state {}",
- current.proxyInfo, method.getName(), currState);
+ current.proxyInfo, method.getName(),
+ currState == null ? "unreachable" : currState);
changeProxy(current);
continue;
}
@@ -420,10 +442,10 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
// be that there is simply no Observer node running at all.
if (failedObserverCount > 0) {
// If we get here, it means all observers have failed.
- LOG.warn("{} observers have failed for read request {}; "
- + "also found {} standby, {} active. "
- + "Falling back to active.", failedObserverCount,
- method.getName(), standbyCount, activeCount);
+ LOG.warn("{} observers have failed for read request {}; also found "
+ + "{} standby, {} active, and {} unreachable. Falling back "
+ + "to active.", failedObserverCount, method.getName(),
+ standbyCount, activeCount, unreachableCount);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Read falling back to active without observer read "
@@ -432,8 +454,9 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
}
}
- // Either all observers have failed, or that it is a write request.
- // In either case, we'll forward the request to active NameNode.
+ // Either all observers have failed, observer reads are disabled,
+ // or this is a write request. In any case, forward the request to
+ // the active NameNode.
LOG.debug("Using failoverProxy to service {}", method.getName());
ProxyInfo<T> activeProxy = failoverProxy.getProxy();
try {
@@ -455,7 +478,8 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
@Override
public ConnectionId getConnectionId() {
- return RPC.getConnectionIdForProxy(getCurrentProxy().proxy);
+ return RPC.getConnectionIdForProxy(observerReadEnabled
+ ? getCurrentProxy().proxy : failoverProxy.getProxy().proxy);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
index 2a41ec9..2186052 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
@@ -152,7 +152,7 @@ public class TestDelegationTokensWithHA {
cluster.shutdownNameNode(0);
logCapture.clearOutput();
dfs.access(new Path("/"), FsAction.READ);
- assertTrue(logCapture.getOutput().contains("Assuming Standby state"));
+ assertTrue(logCapture.getOutput().contains("Failed to connect to"));
} finally {
logCapture.stopCapturing();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
index b923473..f1eb5a8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.TestFsck;
+import org.apache.hadoop.hdfs.tools.GetGroups;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -115,6 +116,17 @@ public class TestObserverNode {
fail("active cannot be transitioned to observer");
}
+ /**
+ * Test that non-ClientProtocol proxies such as
+ * {@link org.apache.hadoop.tools.GetUserMappingsProtocol} still work
+ * when run in an environment with observers.
+ */
+ @Test
+ public void testGetGroups() throws Exception {
+ GetGroups getGroups = new GetGroups(conf);
+ assertEquals(0, getGroups.run(new String[0]));
+ }
+
@Test
public void testNoObserverToActive() throws Exception {
try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
index caf7d00..13b5774 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -42,9 +43,12 @@ import org.mockito.stubbing.Answer;
import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
/**
* Tests for {@link ObserverReadProxyProvider} under various configurations of
@@ -116,6 +120,31 @@ public class TestObserverReadProxyProvider {
}
@Test
+ public void testWithNonClientProxy() throws Exception {
+ setupProxyProvider(2); // This will initialize all of the instance fields
+ final String fakeUser = "fakeUser";
+ final String[] fakeGroups = {"fakeGroup"};
+ HAProxyFactory<GetUserMappingsProtocol> proxyFactory =
+ new NameNodeHAProxyFactory<GetUserMappingsProtocol>() {
+ @Override
+ public GetUserMappingsProtocol createProxy(Configuration config,
+ InetSocketAddress addr, Class<GetUserMappingsProtocol> xface,
+ UserGroupInformation ugi, boolean withRetries,
+ AtomicBoolean fallbackToSimpleAuth) throws IOException {
+ GetUserMappingsProtocol proxy =
+ mock(GetUserMappingsProtocol.class);
+ when(proxy.getGroupsForUser(fakeUser)).thenReturn(fakeGroups);
+ return proxy;
+ }
+ };
+ ObserverReadProxyProvider<GetUserMappingsProtocol> userProxyProvider =
+ new ObserverReadProxyProvider<>(conf, nnURI,
+ GetUserMappingsProtocol.class, proxyFactory);
+ assertArrayEquals(fakeGroups,
+ userProxyProvider.getProxy().proxy.getGroupsForUser(fakeUser));
+ }
+
+ @Test
public void testReadOperationOnObserver() throws Exception {
setupProxyProvider(3);
namenodeAnswers[0].setActiveState();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org