You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2018/08/25 15:49:43 UTC
[26/50] [abbrv] hadoop git commit: HDFS-13848. Refactor NameNode
failover proxy providers. Contributed by Konstantin Shvachko.
HDFS-13848. Refactor NameNode failover proxy providers. Contributed by Konstantin Shvachko.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a4121c71
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a4121c71
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a4121c71
Branch: refs/heads/YARN-3409
Commit: a4121c71c29d13866a605d9c0d013e5de9c147c3
Parents: a5eba25
Author: Konstantin V Shvachko <sh...@apache.org>
Authored: Fri Aug 24 18:27:30 2018 -0700
Committer: Konstantin V Shvachko <sh...@apache.org>
Committed: Fri Aug 24 18:27:30 2018 -0700
----------------------------------------------------------------------
.../hadoop/io/retry/FailoverProxyProvider.java | 15 +-
.../ha/AbstractNNFailoverProxyProvider.java | 152 ++++++++++++++++++-
.../ha/ConfiguredFailoverProxyProvider.java | 141 ++---------------
.../namenode/ha/IPFailoverProxyProvider.java | 51 +------
4 files changed, 176 insertions(+), 183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4121c71/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
index c73e083..f2fa3af 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java
@@ -30,27 +30,30 @@ import org.apache.hadoop.classification.InterfaceStability;
*/
@InterfaceStability.Evolving
public interface FailoverProxyProvider<T> extends Closeable {
- public static final class ProxyInfo<T> {
- public final T proxy;
+ static class ProxyInfo<T> {
+ public T proxy;
/*
* The information (e.g., the IP address) of the current proxy object. It
* provides information for debugging purposes.
*/
- public final String proxyInfo;
+ public String proxyInfo;
public ProxyInfo(T proxy, String proxyInfo) {
this.proxy = proxy;
this.proxyInfo = proxyInfo;
}
+ private String proxyName() {
+ return proxy != null ? proxy.getClass().getSimpleName() : "UnknownProxy";
+ }
+
public String getString(String methodName) {
- return proxy.getClass().getSimpleName() + "." + methodName
- + " over " + proxyInfo;
+ return proxyName() + "." + methodName + " over " + proxyInfo;
}
@Override
public String toString() {
- return proxy.getClass().getSimpleName() + " over " + proxyInfo;
+ return proxyName() + " over " + proxyInfo;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4121c71/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
index e0fdb32..252b70d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
@@ -18,14 +18,68 @@
package org.apache.hadoop.hdfs.server.namenode.ha;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.HAUtilClient;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public abstract class AbstractNNFailoverProxyProvider<T> implements
FailoverProxyProvider <T> {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(AbstractNNFailoverProxyProvider.class);
- private AtomicBoolean fallbackToSimpleAuth;
+ protected Configuration conf;
+ protected Class<T> xface;
+ protected HAProxyFactory<T> factory;
+ protected UserGroupInformation ugi;
+ protected AtomicBoolean fallbackToSimpleAuth;
+
+ protected AbstractNNFailoverProxyProvider() {
+ }
+
+ protected AbstractNNFailoverProxyProvider(Configuration conf, URI uri,
+ Class<T> xface, HAProxyFactory<T> factory) {
+ this.conf = new Configuration(conf);
+ this.xface = xface;
+ this.factory = factory;
+ try {
+ this.ugi = UserGroupInformation.getCurrentUser();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ int maxRetries = this.conf.getInt(
+ HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
+ HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
+ this.conf.setInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
+ maxRetries);
+
+ int maxRetriesOnSocketTimeouts = this.conf.getInt(
+ HdfsClientConfigKeys
+ .Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+ HdfsClientConfigKeys
+ .Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
+ this.conf.setInt(
+ CommonConfigurationKeysPublic
+ .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
+ maxRetriesOnSocketTimeouts);
+ }
/**
* Inquire whether logical HA URI is used for the implementation. If it is
@@ -51,4 +105,100 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
public synchronized AtomicBoolean getFallbackToSimpleAuth() {
return fallbackToSimpleAuth;
}
+
+ /**
+ * ProxyInfo to a NameNode. Includes its address.
+ */
+ public static class NNProxyInfo<T> extends ProxyInfo<T> {
+ private InetSocketAddress address;
+
+ public NNProxyInfo(InetSocketAddress address) {
+ super(null, address.toString());
+ this.address = address;
+ }
+
+ public InetSocketAddress getAddress() {
+ return address;
+ }
+ }
+
+ @Override
+ public Class<T> getInterface() {
+ return xface;
+ }
+
+ /**
+ * Create a proxy if it has not been created yet.
+ */
+ protected NNProxyInfo<T> createProxyIfNeeded(NNProxyInfo<T> pi) {
+ if (pi.proxy == null) {
+ assert pi.getAddress() != null : "Proxy address is null";
+ try {
+ pi.proxy = factory.createProxy(conf,
+ pi.getAddress(), xface, ugi, false, getFallbackToSimpleAuth());
+ } catch (IOException ioe) {
+ LOG.error("{} Failed to create RPC proxy to NameNode",
+ this.getClass().getSimpleName(), ioe);
+ throw new RuntimeException(ioe);
+ }
+ }
+ return pi;
+ }
+
+ /**
+ * Get list of configured NameNode proxy addresses.
+ * Randomize the list if requested.
+ */
+ protected List<NNProxyInfo<T>> getProxyAddresses(URI uri, String addressKey) {
+ final List<NNProxyInfo<T>> proxies = new ArrayList<NNProxyInfo<T>>();
+ Map<String, Map<String, InetSocketAddress>> map =
+ DFSUtilClient.getAddresses(conf, null, addressKey);
+ Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
+
+ if (addressesInNN == null || addressesInNN.size() == 0) {
+ throw new RuntimeException("Could not find any configured addresses " +
+ "for URI " + uri);
+ }
+
+ Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
+ for (InetSocketAddress address : addressesOfNns) {
+ proxies.add(new NNProxyInfo<T>(address));
+ }
+ // Randomize the list to prevent all clients pointing to the same one
+ boolean randomized = getRandomOrder(conf, uri);
+ if (randomized) {
+ Collections.shuffle(proxies);
+ }
+
+ // The client may have a delegation token set for the logical
+ // URI of the cluster. Clone this token to apply to each of the
+ // underlying IPC addresses so that the IPC code can find it.
+ HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
+ return proxies;
+ }
+
+ /**
+ * Check whether random order is configured for failover proxy provider
+ * for the namenode/nameservice.
+ *
+ * @param conf Configuration
+ * @param nameNodeUri The URI of namenode/nameservice
+ * @return random order configuration
+ */
+ public static boolean getRandomOrder(
+ Configuration conf, URI nameNodeUri) {
+ String host = nameNodeUri.getHost();
+ String configKeyWithHost = HdfsClientConfigKeys.Failover.RANDOM_ORDER
+ + "." + host;
+
+ if (conf.get(configKeyWithHost) != null) {
+ return conf.getBoolean(
+ configKeyWithHost,
+ HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
+ }
+
+ return conf.getBoolean(
+ HdfsClientConfigKeys.Failover.RANDOM_ORDER,
+ HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4121c71/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
index f46532a..92e75ce 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ConfiguredFailoverProxyProvider.java
@@ -19,23 +19,11 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.Closeable;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.hdfs.DFSUtilClient;
-import org.apache.hadoop.hdfs.HAUtilClient;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
@@ -48,17 +36,9 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RP
public class ConfiguredFailoverProxyProvider<T> extends
AbstractNNFailoverProxyProvider<T> {
- private static final Logger LOG =
- LoggerFactory.getLogger(ConfiguredFailoverProxyProvider.class);
-
- protected final Configuration conf;
- protected final List<AddressRpcProxyPair<T>> proxies =
- new ArrayList<AddressRpcProxyPair<T>>();
- protected final UserGroupInformation ugi;
- protected final Class<T> xface;
+ protected final List<NNProxyInfo<T>> proxies;
private int currentProxyIndex = 0;
- protected final HAProxyFactory<T> factory;
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory) {
@@ -67,83 +47,8 @@ public class ConfiguredFailoverProxyProvider<T> extends
public ConfiguredFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory, String addressKey) {
- this.xface = xface;
- this.conf = new Configuration(conf);
- int maxRetries = this.conf.getInt(
- HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
- HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
- this.conf.setInt(
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
- maxRetries);
-
- int maxRetriesOnSocketTimeouts = this.conf.getInt(
- HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
- HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
- this.conf.setInt(
- CommonConfigurationKeysPublic
- .IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
- maxRetriesOnSocketTimeouts);
-
- try {
- ugi = UserGroupInformation.getCurrentUser();
-
- Map<String, Map<String, InetSocketAddress>> map =
- DFSUtilClient.getAddresses(conf, null, addressKey);
- Map<String, InetSocketAddress> addressesInNN = map.get(uri.getHost());
-
- if (addressesInNN == null || addressesInNN.size() == 0) {
- throw new RuntimeException("Could not find any configured addresses " +
- "for URI " + uri);
- }
-
- Collection<InetSocketAddress> addressesOfNns = addressesInNN.values();
- for (InetSocketAddress address : addressesOfNns) {
- proxies.add(new AddressRpcProxyPair<T>(address));
- }
- // Randomize the list to prevent all clients pointing to the same one
- boolean randomized = getRandomOrder(conf, uri);
- if (randomized) {
- Collections.shuffle(proxies);
- }
-
- // The client may have a delegation token set for the logical
- // URI of the cluster. Clone this token to apply to each of the
- // underlying IPC addresses so that the IPC code can find it.
- HAUtilClient.cloneDelegationTokenForLogicalUri(ugi, uri, addressesOfNns);
- this.factory = factory;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Check whether random order is configured for failover proxy provider
- * for the namenode/nameservice.
- *
- * @param conf Configuration
- * @param nameNodeUri The URI of namenode/nameservice
- * @return random order configuration
- */
- private static boolean getRandomOrder(
- Configuration conf, URI nameNodeUri) {
- String host = nameNodeUri.getHost();
- String configKeyWithHost = HdfsClientConfigKeys.Failover.RANDOM_ORDER
- + "." + host;
-
- if (conf.get(configKeyWithHost) != null) {
- return conf.getBoolean(
- configKeyWithHost,
- HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
- }
-
- return conf.getBoolean(
- HdfsClientConfigKeys.Failover.RANDOM_ORDER,
- HdfsClientConfigKeys.Failover.RANDOM_ORDER_DEFAULT);
- }
-
- @Override
- public Class<T> getInterface() {
- return xface;
+ super(conf, uri, xface, factory);
+ this.proxies = getProxyAddresses(uri, addressKey);
}
/**
@@ -151,21 +56,8 @@ public class ConfiguredFailoverProxyProvider<T> extends
*/
@Override
public synchronized ProxyInfo<T> getProxy() {
- AddressRpcProxyPair<T> current = proxies.get(currentProxyIndex);
- return getProxy(current);
- }
-
- protected ProxyInfo<T> getProxy(AddressRpcProxyPair<T> current) {
- if (current.namenode == null) {
- try {
- current.namenode = factory.createProxy(conf,
- current.address, xface, ugi, false, getFallbackToSimpleAuth());
- } catch (IOException e) {
- LOG.error("Failed to create RPC proxy to NameNode", e);
- throw new RuntimeException(e);
- }
- }
- return new ProxyInfo<T>(current.namenode, current.address.toString());
+ NNProxyInfo<T> current = proxies.get(currentProxyIndex);
+ return createProxyIfNeeded(current);
}
@Override
@@ -178,30 +70,17 @@ public class ConfiguredFailoverProxyProvider<T> extends
}
/**
- * A little pair object to store the address and connected RPC proxy object to
- * an NN. Note that {@link AddressRpcProxyPair#namenode} may be null.
- */
- protected static class AddressRpcProxyPair<T> {
- public final InetSocketAddress address;
- public T namenode;
-
- public AddressRpcProxyPair(InetSocketAddress address) {
- this.address = address;
- }
- }
-
- /**
* Close all the proxy objects which have been opened over the lifetime of
* this proxy provider.
*/
@Override
public synchronized void close() throws IOException {
- for (AddressRpcProxyPair<T> proxy : proxies) {
- if (proxy.namenode != null) {
- if (proxy.namenode instanceof Closeable) {
- ((Closeable)proxy.namenode).close();
+ for (ProxyInfo<T> proxy : proxies) {
+ if (proxy.proxy != null) {
+ if (proxy.proxy instanceof Closeable) {
+ ((Closeable)proxy.proxy).close();
} else {
- RPC.stopProxy(proxy.namenode);
+ RPC.stopProxy(proxy.proxy);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a4121c71/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
index ed250a0..e703740 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/IPFailoverProxyProvider.java
@@ -19,15 +19,11 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.Closeable;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.hdfs.DFSUtilClient;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
/**
* A NNFailoverProxyProvider implementation which works on IP failover setup.
@@ -47,53 +43,18 @@ import org.apache.hadoop.security.UserGroupInformation;
*/
public class IPFailoverProxyProvider<T> extends
AbstractNNFailoverProxyProvider<T> {
- private final Configuration conf;
- private final Class<T> xface;
- private final URI nameNodeUri;
- private final HAProxyFactory<T> factory;
- private ProxyInfo<T> nnProxyInfo = null;
+ private final NNProxyInfo<T> nnProxyInfo;
public IPFailoverProxyProvider(Configuration conf, URI uri,
Class<T> xface, HAProxyFactory<T> factory) {
- this.xface = xface;
- this.nameNodeUri = uri;
- this.factory = factory;
-
- this.conf = new Configuration(conf);
- int maxRetries = this.conf.getInt(
- HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_KEY,
- HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_DEFAULT);
- this.conf.setInt(
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY,
- maxRetries);
-
- int maxRetriesOnSocketTimeouts = this.conf.getInt(
- HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
- HdfsClientConfigKeys.Failover.CONNECTION_RETRIES_ON_SOCKET_TIMEOUTS_DEFAULT);
- this.conf.setInt(
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,
- maxRetriesOnSocketTimeouts);
- }
-
- @Override
- public Class<T> getInterface() {
- return xface;
+ super(conf, uri, xface, factory);
+ this.nnProxyInfo = new NNProxyInfo<T>(DFSUtilClient.getNNAddress(uri));
}
@Override
- public synchronized ProxyInfo<T> getProxy() {
+ public synchronized NNProxyInfo<T> getProxy() {
// Create a non-ha proxy if not already created.
- if (nnProxyInfo == null) {
- try {
- // Create a proxy that is not wrapped in RetryProxy
- InetSocketAddress nnAddr = DFSUtilClient.getNNAddress(nameNodeUri);
- nnProxyInfo = new ProxyInfo<T>(factory.createProxy(conf, nnAddr, xface,
- UserGroupInformation.getCurrentUser(), false), nnAddr.toString());
- } catch (IOException ioe) {
- throw new RuntimeException(ioe);
- }
- }
- return nnProxyInfo;
+ return createProxyIfNeeded(nnProxyInfo);
}
/** Nothing to do for IP failover */
@@ -106,7 +67,7 @@ public class IPFailoverProxyProvider<T> extends
*/
@Override
public synchronized void close() throws IOException {
- if (nnProxyInfo == null) {
+ if (nnProxyInfo.proxy == null) {
return;
}
if (nnProxyInfo.proxy instanceof Closeable) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org