You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/08/23 18:09:08 UTC
[hadoop] 02/05: HDFS-13566. Add configurable additional RPC
listener to NameNode. Contributed by Chen Liang.
This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 3835a6c4e82887228e88d6d345b6651de5f01eb4
Author: Chen Liang <cl...@apache.org>
AuthorDate: Tue Oct 23 14:53:45 2018 -0700
HDFS-13566. Add configurable additional RPC listener to NameNode. Contributed by Chen Liang.
---
.../main/java/org/apache/hadoop/ipc/Server.java | 114 +++++++++++++++++++--
.../hadoop/security/SaslPropertiesResolver.java | 4 +-
.../test/java/org/apache/hadoop/ipc/TestIPC.java | 53 +++++++++-
.../java/org/apache/hadoop/hdfs/DFSUtilClient.java | 85 ++++++++++++++-
.../hadoop/hdfs/client/HdfsClientConfigKeys.java | 5 +
.../java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 +
.../hadoop/hdfs/server/namenode/NameNode.java | 30 ++++++
.../hdfs/server/namenode/NameNodeRpcServer.java | 16 ++-
.../src/main/resources/hdfs-default.xml | 11 ++
.../org/apache/hadoop/hdfs/MiniDFSCluster.java | 51 +++++++++
.../apache/hadoop/hdfs/TestHAAuxiliaryPort.java | 112 ++++++++++++++++++++
11 files changed, 468 insertions(+), 16 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 33cfd06..0c4fbb8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -380,6 +380,24 @@ public abstract class Server {
}
/**
+ * Returns the SASL qop for the current call, if the current call is
+ * set, and the SASL negotiation is done. Otherwise return null. Note
+ * that CurCall is thread local object. So in fact, different handler
+ * threads will process different CurCall object.
+ *
+ * Also, only return for RPC calls, not supported for other protocols.
+ * @return the QOP of the current connection.
+ */
+ public static String getEstablishedQOP() {
+ Call call = CurCall.get();
+ if (call == null || !(call instanceof RpcCall)) {
+ return null;
+ }
+ RpcCall rpcCall = (RpcCall)call;
+ return rpcCall.connection.getEstablishedQOP();
+ }
+
+ /**
* Returns the clientId from the current RPC request
*/
public static byte[] getClientId() {
@@ -458,6 +476,10 @@ public abstract class Server {
// maintains the set of client connections and handles idle timeouts
private ConnectionManager connectionManager;
private Listener listener = null;
+ // Auxiliary listeners maintained as in a map, to allow
+ // arbitrary number of of auxiliary listeners. A map from
+ // the port to the listener binding to it.
+ private Map<Integer, Listener> auxiliaryListenerMap;
private Responder responder = null;
private Handler[] handlers = null;
@@ -1148,11 +1170,12 @@ public abstract class Server {
private Reader[] readers = null;
private int currentReader = 0;
private InetSocketAddress address; //the address we bind at
+ private int listenPort; //the port we bind at
private int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
- public Listener() throws IOException {
+ Listener(int port) throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
acceptChannel = ServerSocketChannel.open();
@@ -1160,7 +1183,10 @@ public abstract class Server {
// Bind the server socket to the local host and port
bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
- port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
+ //Could be an ephemeral port
+ this.listenPort = acceptChannel.socket().getLocalPort();
+ Thread.currentThread().setName("Listener at " +
+ bindAddress + "/" + this.listenPort);
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
@@ -1343,7 +1369,7 @@ public abstract class Server {
channel.socket().setKeepAlive(true);
Reader reader = getReader();
- Connection c = connectionManager.register(channel);
+ Connection c = connectionManager.register(channel, this.listenPort);
// If the connectionManager can't take it, close the connection.
if (c == null) {
if (channel.isOpen()) {
@@ -1765,6 +1791,7 @@ public abstract class Server {
private ByteBuffer unwrappedDataLengthBuffer;
private int serviceClass;
private boolean shouldClose = false;
+ private int ingressPort;
UserGroupInformation user = null;
public UserGroupInformation attemptingUser = null; // user name before auth
@@ -1776,7 +1803,8 @@ public abstract class Server {
private boolean sentNegotiate = false;
private boolean useWrap = false;
- public Connection(SocketChannel channel, long lastContact) {
+ public Connection(SocketChannel channel, long lastContact,
+ int ingressPort) {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
@@ -1788,6 +1816,7 @@ public abstract class Server {
this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
this.socket = channel.socket();
this.addr = socket.getInetAddress();
+ this.ingressPort = ingressPort;
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
@@ -1822,9 +1851,24 @@ public abstract class Server {
return hostAddress;
}
+ public int getIngressPort() {
+ return ingressPort;
+ }
+
public InetAddress getHostInetAddress() {
return addr;
}
+
+ public String getEstablishedQOP() {
+ // In practice, saslServer should not be null when this is
+ // called. If it is null, it must be either some
+ // configuration mistake or it is called from unit test.
+ if (saslServer == null) {
+ LOG.warn("SASL server should not be null!");
+ return null;
+ }
+ return (String)saslServer.getNegotiatedProperty(Sasl.QOP);
+ }
public void setLastContact(long lastContact) {
this.lastContact = lastContact;
@@ -2303,7 +2347,7 @@ public abstract class Server {
private SaslServer createSaslServer(AuthMethod authMethod)
throws IOException, InterruptedException {
final Map<String,?> saslProps =
- saslPropsResolver.getServerProperties(addr);
+ saslPropsResolver.getServerProperties(addr, ingressPort);
return new SaslRpcServer(authMethod).create(this, saslProps, secretManager);
}
@@ -2822,7 +2866,8 @@ public abstract class Server {
private class Handler extends Thread {
public Handler(int instanceNumber) {
this.setDaemon(true);
- this.setName("IPC Server handler "+ instanceNumber + " on " + port);
+ this.setName("IPC Server handler "+ instanceNumber +
+ " on default port " + port);
}
@Override
@@ -2984,6 +3029,7 @@ public abstract class Server {
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
this.serverName = serverName;
+ this.auxiliaryListenerMap = null;
this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
if (queueSizePerHandler != -1) {
@@ -3023,8 +3069,9 @@ public abstract class Server {
this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
// Start the listener here and let it bind to the port
- listener = new Listener();
- this.port = listener.getAddress().getPort();
+ listener = new Listener(port);
+ // set the server port to the default listener port.
+ this.port = listener.getAddress().getPort();
connectionManager = new ConnectionManager();
this.rpcMetrics = RpcMetrics.create(this, conf);
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
@@ -3046,7 +3093,23 @@ public abstract class Server {
this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
}
-
+
+ public synchronized void addAuxiliaryListener(int auxiliaryPort)
+ throws IOException {
+ if (auxiliaryListenerMap == null) {
+ auxiliaryListenerMap = new HashMap<>();
+ }
+ if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) {
+ throw new IOException(
+ "There is already a listener binding to: " + auxiliaryPort);
+ }
+ Listener newListener = new Listener(auxiliaryPort);
+ // in the case of port = 0, the listener would be on a != 0 port.
+ LOG.info("Adding a server listener on port " +
+ newListener.getAddress().getPort());
+ auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener);
+ }
+
private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
throws IOException {
RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder();
@@ -3283,6 +3346,12 @@ public abstract class Server {
public synchronized void start() {
responder.start();
listener.start();
+ if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+ for (Listener newListener : auxiliaryListenerMap.values()) {
+ newListener.start();
+ }
+ }
+
handlers = new Handler[handlerCount];
for (int i = 0; i < handlerCount; i++) {
@@ -3304,6 +3373,12 @@ public abstract class Server {
}
listener.interrupt();
listener.doStop();
+ if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+ for (Listener newListener : auxiliaryListenerMap.values()) {
+ newListener.interrupt();
+ newListener.doStop();
+ }
+ }
responder.interrupt();
notifyAll();
this.rpcMetrics.shutdown();
@@ -3327,6 +3402,23 @@ public abstract class Server {
public synchronized InetSocketAddress getListenerAddress() {
return listener.getAddress();
}
+
+ /**
+ * Return the set of all the configured auxiliary socket addresses NameNode
+ * RPC is listening on. If there are none, or it is not configured at all, an
+ * empty set is returned.
+ * @return the set of all the auxiliary addresses on which the
+ * RPC server is listening on.
+ */
+ public synchronized Set<InetSocketAddress> getAuxiliaryListenerAddresses() {
+ Set<InetSocketAddress> allAddrs = new HashSet<>();
+ if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+ for (Listener auxListener : auxiliaryListenerMap.values()) {
+ allAddrs.add(auxListener.getAddress());
+ }
+ }
+ return allAddrs;
+ }
/**
* Called for each call.
@@ -3631,11 +3723,11 @@ public abstract class Server {
return connections.toArray(new Connection[0]);
}
- Connection register(SocketChannel channel) {
+ Connection register(SocketChannel channel, int ingressPort) {
if (isFull()) {
return null;
}
- Connection connection = new Connection(channel, Time.now());
+ Connection connection = new Connection(channel, Time.now(), ingressPort);
add(connection);
if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection +
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
index 64b86e3..dd6c42e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
@@ -102,7 +102,7 @@ public class SaslPropertiesResolver implements Configurable{
*/
public Map<String, String> getServerProperties(InetAddress clientAddress,
int ingressPort){
- return properties;
+ return getServerProperties(clientAddress);
}
/**
@@ -122,7 +122,7 @@ public class SaslPropertiesResolver implements Configurable{
*/
public Map<String, String> getClientProperties(InetAddress serverAddress,
int ingressPort) {
- return properties;
+ return getClientProperties(serverAddress);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index cdbaea4..f275f97 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -49,8 +49,10 @@ import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
+import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -171,6 +173,11 @@ public class TestIPC {
this(handlerCount, sleep, LongWritable.class, null);
}
+ public TestServer(int port, int handlerCount, boolean sleep)
+ throws IOException {
+ this(port, handlerCount, sleep, LongWritable.class, null);
+ }
+
public TestServer(int handlerCount, boolean sleep, Configuration conf)
throws IOException {
this(handlerCount, sleep, LongWritable.class, null, conf);
@@ -182,11 +189,24 @@ public class TestIPC {
this(handlerCount, sleep, paramClass, responseClass, conf);
}
+ public TestServer(int port, int handlerCount, boolean sleep,
+ Class<? extends Writable> paramClass,
+ Class<? extends Writable> responseClass) throws IOException {
+ this(port, handlerCount, sleep, paramClass, responseClass, conf);
+ }
+
public TestServer(int handlerCount, boolean sleep,
Class<? extends Writable> paramClass,
Class<? extends Writable> responseClass, Configuration conf)
throws IOException {
- super(ADDRESS, 0, paramClass, handlerCount, conf);
+ this(0, handlerCount, sleep, paramClass, responseClass, conf);
+ }
+
+ public TestServer(int port, int handlerCount, boolean sleep,
+ Class<? extends Writable> paramClass,
+ Class<? extends Writable> responseClass, Configuration conf)
+ throws IOException {
+ super(ADDRESS, port, paramClass, handlerCount, conf);
this.sleep = sleep;
this.responseClass = responseClass;
}
@@ -338,6 +358,37 @@ public class TestIPC {
}
server.stop();
}
+
+ @Test
+ public void testAuxiliaryPorts() throws IOException, InterruptedException {
+ int defaultPort = 9000;
+ int[] auxiliaryPorts = {9001, 9002, 9003};
+ final int handlerCount = 5;
+ final boolean handlerSleep = false;
+ Server server = new TestServer(defaultPort, handlerCount, handlerSleep);
+ for (int port : auxiliaryPorts) {
+ server.addAuxiliaryListener(port);
+ }
+ Set<InetSocketAddress> listenerAddrs =
+ server.getAuxiliaryListenerAddresses();
+ Set<InetSocketAddress> addrs = new HashSet<>();
+ for (InetSocketAddress addr : listenerAddrs) {
+ addrs.add(NetUtils.getConnectAddress(addr));
+ }
+ server.start();
+
+ Client client = new Client(LongWritable.class, conf);
+ Set<SerialCaller> calls = new HashSet<>();
+ for (InetSocketAddress addr : addrs) {
+ calls.add(new SerialCaller(client, addr, 100));
+ }
+ for (SerialCaller caller : calls) {
+ caller.join();
+ assertFalse(caller.failed);
+ }
+ client.stop();
+ server.stop();
+ }
@Test(timeout=60000)
public void testStandAloneClient() throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 7236247..80185b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -21,6 +21,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.primitives.SignedBytes;
+import java.net.URISyntaxException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -92,6 +93,8 @@ import java.util.Arrays;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
@InterfaceAudience.Private
@@ -432,7 +435,7 @@ public class DFSUtilClient {
Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
for (String nnId : emptyAsSingletonNull(nnIds)) {
String suffix = concatSuffixes(nsId, nnId);
- String address = getConfValue(defaultValue, suffix, conf, keys);
+ String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
if (address != null) {
InetSocketAddress isa = NetUtils.createSocketAddr(address);
if (isa.isUnresolved()) {
@@ -447,6 +450,86 @@ public class DFSUtilClient {
}
/**
+ * Return address from configuration. Take a list of keys as preference.
+ * If the address to be returned is the value of DFS_NAMENODE_RPC_ADDRESS_KEY,
+ * will check to see if auxiliary ports are enabled. If so, call to replace
+ * address port with auxiliary port. If the address is not the value of
+ * DFS_NAMENODE_RPC_ADDRESS_KEY, return the original address. If failed to
+ * find any address, return the given default value.
+ *
+ * @param defaultValue the default value if no values found for given keys
+ * @param suffix suffix to append to keys
+ * @param conf the configuration
+ * @param keys a list of keys, ordered by preference
+ * @return
+ */
+ private static String checkKeysAndProcess(String defaultValue, String suffix,
+ Configuration conf, String... keys) {
+ String succeededKey = null;
+ String address = null;
+ for (String key : keys) {
+ address = getConfValue(null, suffix, conf, key);
+ if (address != null) {
+ succeededKey = key;
+ break;
+ }
+ }
+ String ret;
+ if (address == null) {
+ ret = defaultValue;
+ } else if(DFS_NAMENODE_RPC_ADDRESS_KEY.equals(succeededKey)) {
+ ret = checkRpcAuxiliary(conf, suffix, address);
+ } else {
+ ret = address;
+ }
+ return ret;
+ }
+
+ /**
+ * Check if auxiliary port is enabled, if yes, check if the given address
+ * should have its port replaced by an auxiliary port. If the given address
+ * does not contain a port, append the auxiliary port to enforce using it.
+ *
+ * @param conf configuration.
+ * @param address the address to check and modify (if needed).
+ * @return the new modified address containing auxiliary port, or original
+ * address if auxiliary port not enabled.
+ */
+ private static String checkRpcAuxiliary(Configuration conf, String suffix,
+ String address) {
+ String key = DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+ key = addSuffix(key, suffix);
+ int[] ports = conf.getInts(key);
+ if (ports == null || ports.length == 0) {
+ return address;
+ }
+ LOG.info("Using server auxiliary ports " + Arrays.toString(ports));
+ URI uri;
+ try {
+ uri = new URI(address);
+ } catch (URISyntaxException e) {
+ // return the original address untouched if it is not a valid URI. This
+ // happens in unit test, as MiniDFSCluster sets the value to
+ // 127.0.0.1:0, without schema (i.e. "hdfs://"). While in practice, this
+ // should not be the case. So log a warning message here.
+ LOG.warn("NameNode address is not a valid uri:" + address);
+ return address;
+ }
+ // Ignore the port, only take the schema(e.g. hdfs) and host (e.g.
+ // localhost), then append port
+ // TODO : revisit if there is a better way
+ StringBuilder sb = new StringBuilder();
+ sb.append(uri.getScheme());
+ sb.append("://");
+ sb.append(uri.getHost());
+ sb.append(":");
+ // TODO : currently, only the very first auxiliary port is being used.
+ // But actually NN supports running multiple auxiliary
+ sb.append(ports[0]);
+ return sb.toString();
+ }
+
+ /**
* Given a list of keys in the order of preference, returns a value
* for the key in the given order from the configuration.
* @param defaultValue default value to return, when key was not found
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index f2cec31..1ab71f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -68,6 +68,11 @@ public interface HdfsClientConfigKeys {
String PREFIX = "dfs.client.";
String DFS_NAMESERVICES = "dfs.nameservices";
String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
+
+ String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX = "auxiliary-ports";
+ String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = DFS_NAMENODE_RPC_ADDRESS_KEY
+ + "." + DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX;
+
int DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;
String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
int DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 04046e4..a7106d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1231,6 +1231,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final Class<DFSNetworkTopology> DFS_NET_TOPOLOGY_IMPL_DEFAULT =
DFSNetworkTopology.class;
+ public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY =
+ HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+
// dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry
@Deprecated
public static final String DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 957914c..581ce66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -1059,6 +1060,14 @@ public class NameNode extends ReconfigurableBase implements
}
/**
+ * @return The auxiliary nameNode RPC addresses, or empty set if there
+ * is none.
+ */
+ public Set<InetSocketAddress> getAuxiliaryNameNodeAddresses() {
+ return rpcServer.getAuxiliaryRpcAddresses();
+ }
+
+ /**
* @return NameNode RPC address in "host:port" string form
*/
public String getNameNodeAddressHostPortString() {
@@ -1066,6 +1075,27 @@ public class NameNode extends ReconfigurableBase implements
}
/**
+ * Return a host:port format string corresponds to an auxiliary
+ * port configured on NameNode. If there are multiple auxiliary ports,
+ * an arbitrary one is returned. If there is no auxiliary listener, returns
+ * null.
+ *
+ * @return a string of format host:port that points to an auxiliary NameNode
+ * address, or null if there is no such address.
+ */
+ @VisibleForTesting
+ public String getNNAuxiliaryRpcAddress() {
+ Set<InetSocketAddress> auxiliaryAddrs = getAuxiliaryNameNodeAddresses();
+ if (auxiliaryAddrs.isEmpty()) {
+ return null;
+ }
+ // since set has no particular order, returning the first element of
+ // from the iterator is effectively arbitrary.
+ InetSocketAddress addr = auxiliaryAddrs.iterator().next();
+ return NetUtils.getHostPortString(addr);
+ }
+
+ /**
* @return NameNode service RPC address if configured, the
* NameNode RPC address otherwise
*/
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 3a2ae6b..4c0db09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -26,6 +26,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
@@ -538,6 +539,13 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (lifelineRpcServer != null) {
lifelineRpcServer.setTracer(nn.tracer);
}
+ int[] auxiliaryPorts =
+ conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY);
+ if (auxiliaryPorts != null && auxiliaryPorts.length != 0) {
+ for (int auxiliaryPort : auxiliaryPorts) {
+ this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
+ }
+ }
}
/** Allow access to the lifeline RPC server for testing */
@@ -607,10 +615,16 @@ public class NameNodeRpcServer implements NamenodeProtocols {
return serviceRPCAddress;
}
- InetSocketAddress getRpcAddress() {
+ @VisibleForTesting
+ public InetSocketAddress getRpcAddress() {
return clientRpcAddress;
}
+ @VisibleForTesting
+ public Set<InetSocketAddress> getAuxiliaryRpcAddresses() {
+ return clientRpcServer.getAuxiliaryListenerAddresses();
+ }
+
private static UserGroupInformation getRemoteUser() throws IOException {
return NameNode.getRemoteUser();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7988502..e70b808 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5045,6 +5045,17 @@
</property>
<property>
+ <name>dfs.namenode.rpc-address.auxiliary-ports</name>
+ <value></value>
+ <description>
+ A comma separated list of auxiliary ports for the NameNode to listen on.
+ This allows exposing multiple NN addresses to clients.
+ Particularly, it is used to enforce different SASL levels on different ports.
+ Empty list indicates that auxiliary ports are disabled.
+ </description>
+ </property>
+
+ <property>
<name>dfs.namenode.blockreport.queue.size</name>
<value>1024</value>
<description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 017f32e..97964ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1361,6 +1361,21 @@ public class MiniDFSCluster implements AutoCloseable {
}
return uri;
}
+
+ URI getURIForAuxiliaryPort(int nnIndex) {
+ String hostPort =
+ getNN(nnIndex).nameNode.getNNAuxiliaryRpcAddress();
+ if (hostPort == null) {
+ throw new RuntimeException("No auxiliary port found");
+ }
+ URI uri = null;
+ try {
+ uri = new URI("hdfs://" + hostPort);
+ } catch (URISyntaxException e) {
+ NameNode.LOG.warn("unexpected URISyntaxException", e);
+ }
+ return uri;
+ }
public int getInstanceId() {
return instanceId;
@@ -1961,6 +1976,14 @@ public class MiniDFSCluster implements AutoCloseable {
checkSingleNameNode();
return getNameNodePort(0);
}
+
+ /**
+ * Get the auxiliary port of NameNode, NameNode specified by index.
+ */
+ public int getNameNodeAuxiliaryPort() {
+ checkSingleNameNode();
+ return getNameNodeAuxiliaryPort(0);
+ }
/**
* Gets the rpc port used by the NameNode at the given index, because the
@@ -1971,6 +1994,22 @@ public class MiniDFSCluster implements AutoCloseable {
}
/**
+ * Gets the rpc port used by the NameNode at the given index, if the
+ * NameNode has multiple auxiliary ports configured, a arbitrary
+ * one is returned.
+ */
+ public int getNameNodeAuxiliaryPort(int nnIndex) {
+ Set<InetSocketAddress> allAuxiliaryAddresses =
+ getNN(nnIndex).nameNode.getAuxiliaryNameNodeAddresses();
+ if (allAuxiliaryAddresses.isEmpty()) {
+ return -1;
+ } else {
+ InetSocketAddress addr = allAuxiliaryAddresses.iterator().next();
+ return addr.getPort();
+ }
+ }
+
+ /**
* @return the service rpc port used by the NameNode at the given index.
*/
public int getNameNodeServicePort(int nnIndex) {
@@ -2531,6 +2570,12 @@ public class MiniDFSCluster implements AutoCloseable {
return getFileSystem(0);
}
+ public DistributedFileSystem getFileSystemFromAuxiliaryPort()
+ throws IOException {
+ checkSingleNameNode();
+ return getFileSystemFromAuxiliaryPort(0);
+ }
+
/**
* Get a client handle to the DFS cluster for the namenode at given index.
*/
@@ -2539,6 +2584,12 @@ public class MiniDFSCluster implements AutoCloseable {
getNN(nnIndex).conf));
}
+ public DistributedFileSystem getFileSystemFromAuxiliaryPort(int nnIndex)
+ throws IOException {
+ return (DistributedFileSystem) addFileSystem(FileSystem.get(
+ getURIForAuxiliaryPort(nnIndex), getNN(nnIndex).conf));
+ }
+
/**
* Get another FileSystem instance that is different from FileSystem.get(conf).
* This simulating different threads working on different FileSystem instances.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
new file mode 100644
index 0000000..867fbac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test NN auxiliary port with HA.
+ */
+public class TestHAAuxiliaryPort {
+ @Test
+ public void testTest() throws Exception {
+ Configuration conf = new Configuration();
+ conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0");
+ conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1",
+ "9000,9001");
+ conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn2",
+ "9000,9001");
+ conf.set(DFS_NAMESERVICES, "ha-nn-uri-0");
+ conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ha-nn-uri-0", "nn1,nn2");
+ conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+
+ MiniDFSNNTopology topology = new MiniDFSNNTopology()
+ .addNameservice(new MiniDFSNNTopology.NSConf("ha-nn-uri-0")
+ .addNN(new MiniDFSNNTopology.NNConf("nn1"))
+ .addNN(new MiniDFSNNTopology.NNConf("nn2")));
+
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(topology)
+ .numDataNodes(0)
+ .build();
+ cluster.transitionToActive(0);
+ cluster.waitActive();
+
+ NameNode nn0 = cluster.getNameNode(0);
+ NameNode nn1 = cluster.getNameNode(1);
+
+ // all the addresses below are valid nn0 addresses
+ NameNodeRpcServer rpcServer0 = (NameNodeRpcServer)nn0.getRpcServer();
+ InetSocketAddress server0RpcAddress = rpcServer0.getRpcAddress();
+ Set<InetSocketAddress> auxAddrServer0 =
+ rpcServer0.getAuxiliaryRpcAddresses();
+ assertEquals(2, auxAddrServer0.size());
+
+ // all the addresses below are valid nn1 addresses
+ NameNodeRpcServer rpcServer1 = (NameNodeRpcServer)nn1.getRpcServer();
+ InetSocketAddress server1RpcAddress = rpcServer1.getRpcAddress();
+ Set<InetSocketAddress> auxAddrServer1 =
+ rpcServer1.getAuxiliaryRpcAddresses();
+ assertEquals(2, auxAddrServer1.size());
+
+ // mkdir on nn0 uri 0
+ URI nn0URI = new URI("hdfs://localhost:" +
+ server0RpcAddress.getPort());
+ try (DFSClient client0 = new DFSClient(nn0URI, conf)){
+ client0.mkdirs("/test", null, true);
+ // should be available on other ports also
+ for (InetSocketAddress auxAddr : auxAddrServer0) {
+ nn0URI = new URI("hdfs://localhost:" + auxAddr.getPort());
+ try (DFSClient clientTmp = new DFSClient(nn0URI, conf)) {
+ assertTrue(clientTmp.exists("/test"));
+ }
+ }
+ }
+
+ // now perform a failover
+ cluster.shutdownNameNode(0);
+ cluster.transitionToActive(1);
+
+ // then try to read the file from the nn1
+ URI nn1URI = new URI("hdfs://localhost:" +
+ server1RpcAddress.getPort());
+ try (DFSClient client1 = new DFSClient(nn1URI, conf)) {
+ assertTrue(client1.exists("/test"));
+ // should be available on other ports also
+ for (InetSocketAddress auxAddr : auxAddrServer1) {
+ nn1URI = new URI("hdfs://localhost:" + auxAddr.getPort());
+ try (DFSClient clientTmp = new DFSClient(nn1URI, conf)) {
+ assertTrue(client1.exists("/test"));
+ }
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org