You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/08/23 18:09:08 UTC

[hadoop] 02/05: HDFS-13566. Add configurable additional RPC listener to NameNode. Contributed by Chen Liang.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 3835a6c4e82887228e88d6d345b6651de5f01eb4
Author: Chen Liang <cl...@apache.org>
AuthorDate: Tue Oct 23 14:53:45 2018 -0700

    HDFS-13566. Add configurable additional RPC listener to NameNode. Contributed by Chen Liang.
---
 .../main/java/org/apache/hadoop/ipc/Server.java    | 114 +++++++++++++++++++--
 .../hadoop/security/SaslPropertiesResolver.java    |   4 +-
 .../test/java/org/apache/hadoop/ipc/TestIPC.java   |  53 +++++++++-
 .../java/org/apache/hadoop/hdfs/DFSUtilClient.java |  85 ++++++++++++++-
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |   5 +
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   3 +
 .../hadoop/hdfs/server/namenode/NameNode.java      |  30 ++++++
 .../hdfs/server/namenode/NameNodeRpcServer.java    |  16 ++-
 .../src/main/resources/hdfs-default.xml            |  11 ++
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java     |  51 +++++++++
 .../apache/hadoop/hdfs/TestHAAuxiliaryPort.java    | 112 ++++++++++++++++++++
 11 files changed, 468 insertions(+), 16 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 33cfd06..0c4fbb8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -380,6 +380,24 @@ public abstract class Server {
   }
 
   /**
+   * Returns the SASL qop for the current call, if the current call is
+   * set, and the SASL negotiation is done. Otherwise return null. Note
+   * that CurCall is thread local object. So in fact, different handler
+   * threads will process different CurCall object.
+   *
+   * Also, only return for RPC calls, not supported for other protocols.
+   * @return the QOP of the current connection.
+   */
+  public static String getEstablishedQOP() {
+    Call call = CurCall.get();
+    if (call == null || !(call instanceof RpcCall)) {
+      return null;
+    }
+    RpcCall rpcCall = (RpcCall)call;
+    return rpcCall.connection.getEstablishedQOP();
+  }
+
+  /**
    * Returns the clientId from the current RPC request
    */
   public static byte[] getClientId() {
@@ -458,6 +476,10 @@ public abstract class Server {
   // maintains the set of client connections and handles idle timeouts
   private ConnectionManager connectionManager;
   private Listener listener = null;
+  // Auxiliary listeners maintained as in a map, to allow
+  // arbitrary number of of auxiliary listeners. A map from
+  // the port to the listener binding to it.
+  private Map<Integer, Listener> auxiliaryListenerMap;
   private Responder responder = null;
   private Handler[] handlers = null;
 
@@ -1148,11 +1170,12 @@ public abstract class Server {
     private Reader[] readers = null;
     private int currentReader = 0;
     private InetSocketAddress address; //the address we bind at
+    private int listenPort; //the port we bind at
     private int backlogLength = conf.getInt(
         CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
         CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
     
-    public Listener() throws IOException {
+    Listener(int port) throws IOException {
       address = new InetSocketAddress(bindAddress, port);
       // Create a new server socket and set to non blocking mode
       acceptChannel = ServerSocketChannel.open();
@@ -1160,7 +1183,10 @@ public abstract class Server {
 
       // Bind the server socket to the local host and port
       bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
-      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
+      //Could be an ephemeral port
+      this.listenPort = acceptChannel.socket().getLocalPort();
+      Thread.currentThread().setName("Listener at " +
+          bindAddress + "/" + this.listenPort);
       // create a selector;
       selector= Selector.open();
       readers = new Reader[readThreads];
@@ -1343,7 +1369,7 @@ public abstract class Server {
         channel.socket().setKeepAlive(true);
         
         Reader reader = getReader();
-        Connection c = connectionManager.register(channel);
+        Connection c = connectionManager.register(channel, this.listenPort);
         // If the connectionManager can't take it, close the connection.
         if (c == null) {
           if (channel.isOpen()) {
@@ -1765,6 +1791,7 @@ public abstract class Server {
     private ByteBuffer unwrappedDataLengthBuffer;
     private int serviceClass;
     private boolean shouldClose = false;
+    private int ingressPort;
 
     UserGroupInformation user = null;
     public UserGroupInformation attemptingUser = null; // user name before auth
@@ -1776,7 +1803,8 @@ public abstract class Server {
     private boolean sentNegotiate = false;
     private boolean useWrap = false;
     
-    public Connection(SocketChannel channel, long lastContact) {
+    public Connection(SocketChannel channel, long lastContact,
+        int ingressPort) {
       this.channel = channel;
       this.lastContact = lastContact;
       this.data = null;
@@ -1788,6 +1816,7 @@ public abstract class Server {
       this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
       this.socket = channel.socket();
       this.addr = socket.getInetAddress();
+      this.ingressPort = ingressPort;
       if (addr == null) {
         this.hostAddress = "*Unknown*";
       } else {
@@ -1822,9 +1851,24 @@ public abstract class Server {
       return hostAddress;
     }
 
+    public int getIngressPort() {
+      return ingressPort;
+    }
+
     public InetAddress getHostInetAddress() {
       return addr;
     }
+
+    public String getEstablishedQOP() {
+      // In practice, saslServer should not be null when this is
+      // called. If it is null, it must be either some
+      // configuration mistake or it is called from unit test.
+      if (saslServer == null) {
+        LOG.warn("SASL server should not be null!");
+        return null;
+      }
+      return (String)saslServer.getNegotiatedProperty(Sasl.QOP);
+    }
     
     public void setLastContact(long lastContact) {
       this.lastContact = lastContact;
@@ -2303,7 +2347,7 @@ public abstract class Server {
     private SaslServer createSaslServer(AuthMethod authMethod)
         throws IOException, InterruptedException {
       final Map<String,?> saslProps =
-                  saslPropsResolver.getServerProperties(addr);
+                  saslPropsResolver.getServerProperties(addr, ingressPort);
       return new SaslRpcServer(authMethod).create(this, saslProps, secretManager);
     }
     
@@ -2822,7 +2866,8 @@ public abstract class Server {
   private class Handler extends Thread {
     public Handler(int instanceNumber) {
       this.setDaemon(true);
-      this.setName("IPC Server handler "+ instanceNumber + " on " + port);
+      this.setName("IPC Server handler "+ instanceNumber +
+          " on default port " + port);
     }
 
     @Override
@@ -2984,6 +3029,7 @@ public abstract class Server {
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
     this.serverName = serverName;
+    this.auxiliaryListenerMap = null;
     this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
         CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
     if (queueSizePerHandler != -1) {
@@ -3023,8 +3069,9 @@ public abstract class Server {
     this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
     
     // Start the listener here and let it bind to the port
-    listener = new Listener();
-    this.port = listener.getAddress().getPort();    
+    listener = new Listener(port);
+    // set the server port to the default listener port.
+    this.port = listener.getAddress().getPort();
     connectionManager = new ConnectionManager();
     this.rpcMetrics = RpcMetrics.create(this, conf);
     this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
@@ -3046,7 +3093,23 @@ public abstract class Server {
     
     this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
   }
-  
+
+  public synchronized void addAuxiliaryListener(int auxiliaryPort)
+      throws IOException {
+    if (auxiliaryListenerMap == null) {
+      auxiliaryListenerMap = new HashMap<>();
+    }
+    if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) {
+      throw new IOException(
+          "There is already a listener binding to: " + auxiliaryPort);
+    }
+    Listener newListener = new Listener(auxiliaryPort);
+    // in the case of port = 0, the listener would be on a != 0 port.
+    LOG.info("Adding a server listener on port " +
+        newListener.getAddress().getPort());
+    auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener);
+  }
+
   private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
       throws IOException {
     RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder();
@@ -3283,6 +3346,12 @@ public abstract class Server {
   public synchronized void start() {
     responder.start();
     listener.start();
+    if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+      for (Listener newListener : auxiliaryListenerMap.values()) {
+        newListener.start();
+      }
+    }
+
     handlers = new Handler[handlerCount];
     
     for (int i = 0; i < handlerCount; i++) {
@@ -3304,6 +3373,12 @@ public abstract class Server {
     }
     listener.interrupt();
     listener.doStop();
+    if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+      for (Listener newListener : auxiliaryListenerMap.values()) {
+        newListener.interrupt();
+        newListener.doStop();
+      }
+    }
     responder.interrupt();
     notifyAll();
     this.rpcMetrics.shutdown();
@@ -3327,6 +3402,23 @@ public abstract class Server {
   public synchronized InetSocketAddress getListenerAddress() {
     return listener.getAddress();
   }
+
+  /**
+   * Return the set of all the configured auxiliary socket addresses NameNode
+   * RPC is listening on. If there are none, or it is not configured at all, an
+   * empty set is returned.
+   * @return the set of all the auxiliary addresses on which the
+   *         RPC server is listening on.
+   */
+  public synchronized Set<InetSocketAddress> getAuxiliaryListenerAddresses() {
+    Set<InetSocketAddress> allAddrs = new HashSet<>();
+    if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+      for (Listener auxListener : auxiliaryListenerMap.values()) {
+        allAddrs.add(auxListener.getAddress());
+      }
+    }
+    return allAddrs;
+  }
   
   /** 
    * Called for each call. 
@@ -3631,11 +3723,11 @@ public abstract class Server {
       return connections.toArray(new Connection[0]);
     }
 
-    Connection register(SocketChannel channel) {
+    Connection register(SocketChannel channel, int ingressPort) {
       if (isFull()) {
         return null;
       }
-      Connection connection = new Connection(channel, Time.now());
+      Connection connection = new Connection(channel, Time.now(), ingressPort);
       add(connection);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Server connection from " + connection +
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
index 64b86e3..dd6c42e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
@@ -102,7 +102,7 @@ public class SaslPropertiesResolver implements Configurable{
    */
   public Map<String, String> getServerProperties(InetAddress clientAddress,
       int ingressPort){
-    return properties;
+    return getServerProperties(clientAddress);
   }
 
   /**
@@ -122,7 +122,7 @@ public class SaslPropertiesResolver implements Configurable{
    */
   public Map<String, String> getClientProperties(InetAddress serverAddress,
       int ingressPort) {
-    return properties;
+    return getClientProperties(serverAddress);
   }
 
   /**
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index cdbaea4..f275f97 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -49,8 +49,10 @@ import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -171,6 +173,11 @@ public class TestIPC {
       this(handlerCount, sleep, LongWritable.class, null);
     }
 
+    public TestServer(int port, int handlerCount, boolean sleep)
+        throws IOException {
+      this(port, handlerCount, sleep, LongWritable.class, null);
+    }
+
     public TestServer(int handlerCount, boolean sleep, Configuration conf)
         throws IOException {
       this(handlerCount, sleep, LongWritable.class, null, conf);
@@ -182,11 +189,24 @@ public class TestIPC {
       this(handlerCount, sleep, paramClass, responseClass, conf);
     }
 
+    public TestServer(int port, int handlerCount, boolean sleep,
+        Class<? extends Writable> paramClass,
+        Class<? extends Writable> responseClass) throws IOException {
+      this(port, handlerCount, sleep, paramClass, responseClass, conf);
+    }
+
     public TestServer(int handlerCount, boolean sleep,
         Class<? extends Writable> paramClass,
         Class<? extends Writable> responseClass, Configuration conf)
         throws IOException {
-      super(ADDRESS, 0, paramClass, handlerCount, conf);
+      this(0, handlerCount, sleep, paramClass, responseClass, conf);
+    }
+
+    public TestServer(int port, int handlerCount, boolean sleep,
+        Class<? extends Writable> paramClass,
+        Class<? extends Writable> responseClass, Configuration conf)
+        throws IOException {
+      super(ADDRESS, port, paramClass, handlerCount, conf);
       this.sleep = sleep;
       this.responseClass = responseClass;
     }
@@ -338,6 +358,37 @@ public class TestIPC {
     }
     server.stop();
   }
+
+  @Test
+  public void testAuxiliaryPorts() throws IOException, InterruptedException {
+    int defaultPort = 9000;
+    int[] auxiliaryPorts = {9001, 9002, 9003};
+    final int handlerCount = 5;
+    final boolean handlerSleep = false;
+    Server server = new TestServer(defaultPort, handlerCount, handlerSleep);
+    for (int port : auxiliaryPorts) {
+      server.addAuxiliaryListener(port);
+    }
+    Set<InetSocketAddress> listenerAddrs =
+        server.getAuxiliaryListenerAddresses();
+    Set<InetSocketAddress> addrs = new HashSet<>();
+    for (InetSocketAddress addr : listenerAddrs) {
+      addrs.add(NetUtils.getConnectAddress(addr));
+    }
+    server.start();
+
+    Client client = new Client(LongWritable.class, conf);
+    Set<SerialCaller> calls = new HashSet<>();
+    for (InetSocketAddress addr : addrs) {
+      calls.add(new SerialCaller(client, addr, 100));
+    }
+    for (SerialCaller caller : calls) {
+      caller.join();
+      assertFalse(caller.failed);
+    }
+    client.stop();
+    server.stop();
+  }
 	
   @Test(timeout=60000)
   public void testStandAloneClient() throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 7236247..80185b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -21,6 +21,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import com.google.common.primitives.SignedBytes;
+import java.net.URISyntaxException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -92,6 +93,8 @@ import java.util.Arrays;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_CLIENT_TCPNODELAY_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
 
 @InterfaceAudience.Private
@@ -432,7 +435,7 @@ public class DFSUtilClient {
     Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
     for (String nnId : emptyAsSingletonNull(nnIds)) {
       String suffix = concatSuffixes(nsId, nnId);
-      String address = getConfValue(defaultValue, suffix, conf, keys);
+      String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
       if (address != null) {
         InetSocketAddress isa = NetUtils.createSocketAddr(address);
         if (isa.isUnresolved()) {
@@ -447,6 +450,86 @@ public class DFSUtilClient {
   }
 
   /**
+   * Return address from configuration. Take a list of keys as preference.
+   * If the address to be returned is the value of DFS_NAMENODE_RPC_ADDRESS_KEY,
+   * will check to see if auxiliary ports are enabled. If so, call to replace
+   * address port with auxiliary port. If the address is not the value of
+   * DFS_NAMENODE_RPC_ADDRESS_KEY, return the original address. If failed to
+   * find any address, return the given default value.
+   *
+   * @param defaultValue the default value if no values found for given keys
+   * @param suffix suffix to append to keys
+   * @param conf the configuration
+   * @param keys a list of keys, ordered by preference
+   * @return
+   */
+  private static String checkKeysAndProcess(String defaultValue, String suffix,
+      Configuration conf, String... keys) {
+    String succeededKey = null;
+    String address = null;
+    for (String key : keys) {
+      address = getConfValue(null, suffix, conf, key);
+      if (address != null) {
+        succeededKey = key;
+        break;
+      }
+    }
+    String ret;
+    if (address == null) {
+      ret = defaultValue;
+    } else if(DFS_NAMENODE_RPC_ADDRESS_KEY.equals(succeededKey))  {
+      ret = checkRpcAuxiliary(conf, suffix, address);
+    } else {
+      ret = address;
+    }
+    return ret;
+  }
+
+  /**
+   * Check if auxiliary port is enabled, if yes, check if the given address
+   * should have its port replaced by an auxiliary port. If the given address
+   * does not contain a port, append the auxiliary port to enforce using it.
+   *
+   * @param conf configuration.
+   * @param address the address to check and modify (if needed).
+   * @return the new modified address containing auxiliary port, or original
+   * address if auxiliary port not enabled.
+   */
+  private static String checkRpcAuxiliary(Configuration conf, String suffix,
+      String address) {
+    String key = DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+    key = addSuffix(key, suffix);
+    int[] ports = conf.getInts(key);
+    if (ports == null || ports.length == 0) {
+      return address;
+    }
+    LOG.info("Using server auxiliary ports " + Arrays.toString(ports));
+    URI uri;
+    try {
+      uri = new URI(address);
+    } catch (URISyntaxException e) {
+      // return the original address untouched if it is not a valid URI. This
+      // happens in unit test, as MiniDFSCluster sets the value to
+      // 127.0.0.1:0, without schema (i.e. "hdfs://"). While in practice, this
+      // should not be the case. So log a warning message here.
+      LOG.warn("NameNode address is not a valid uri:" + address);
+      return address;
+    }
+    // Ignore the port, only take the schema(e.g. hdfs) and host (e.g.
+    // localhost), then append port
+    // TODO : revisit if there is a better way
+    StringBuilder sb = new StringBuilder();
+    sb.append(uri.getScheme());
+    sb.append("://");
+    sb.append(uri.getHost());
+    sb.append(":");
+    // TODO : currently, only the very first auxiliary port is being used.
+    // But actually NN supports running multiple auxiliary
+    sb.append(ports[0]);
+    return sb.toString();
+  }
+
+  /**
    * Given a list of keys in the order of preference, returns a value
    * for the key in the given order from the configuration.
    * @param defaultValue default value to return, when key was not found
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index f2cec31..1ab71f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -68,6 +68,11 @@ public interface HdfsClientConfigKeys {
   String PREFIX = "dfs.client.";
   String  DFS_NAMESERVICES = "dfs.nameservices";
   String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
+
+  String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX = "auxiliary-ports";
+  String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = DFS_NAMENODE_RPC_ADDRESS_KEY
+      + "." + DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX;
+
   int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;
   String  DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
   int     DFS_NAMENODE_HTTPS_PORT_DEFAULT = 9871;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 04046e4..a7106d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1231,6 +1231,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final Class<DFSNetworkTopology> DFS_NET_TOPOLOGY_IMPL_DEFAULT =
       DFSNetworkTopology.class;
 
+  public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY =
+      HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index 957914c..581ce66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -1059,6 +1060,14 @@ public class NameNode extends ReconfigurableBase implements
   }
 
   /**
+   * @return The auxiliary nameNode RPC addresses, or empty set if there
+   * is none.
+   */
+  public Set<InetSocketAddress> getAuxiliaryNameNodeAddresses() {
+    return rpcServer.getAuxiliaryRpcAddresses();
+  }
+
+  /**
    * @return NameNode RPC address in "host:port" string form
    */
   public String getNameNodeAddressHostPortString() {
@@ -1066,6 +1075,27 @@ public class NameNode extends ReconfigurableBase implements
   }
 
   /**
+   * Return a host:port format string corresponds to an auxiliary
+   * port configured on NameNode. If there are multiple auxiliary ports,
+   * an arbitrary one is returned. If there is no auxiliary listener, returns
+   * null.
+   *
+   * @return a string of format host:port that points to an auxiliary NameNode
+   *         address, or null if there is no such address.
+   */
+  @VisibleForTesting
+  public String getNNAuxiliaryRpcAddress() {
+    Set<InetSocketAddress> auxiliaryAddrs = getAuxiliaryNameNodeAddresses();
+    if (auxiliaryAddrs.isEmpty()) {
+      return null;
+    }
+    // since set has no particular order, returning the first element of
+    // from the iterator is effectively arbitrary.
+    InetSocketAddress addr = auxiliaryAddrs.iterator().next();
+    return NetUtils.getHostPortString(addr);
+  }
+
+  /**
    * @return NameNode service RPC address if configured, the
    *    NameNode RPC address otherwise
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 3a2ae6b..4c0db09 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -26,6 +26,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
 
@@ -538,6 +539,13 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     if (lifelineRpcServer != null) {
       lifelineRpcServer.setTracer(nn.tracer);
     }
+    int[] auxiliaryPorts =
+        conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY);
+    if (auxiliaryPorts != null && auxiliaryPorts.length != 0) {
+      for (int auxiliaryPort : auxiliaryPorts) {
+        this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
+      }
+    }
   }
 
   /** Allow access to the lifeline RPC server for testing */
@@ -607,10 +615,16 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     return serviceRPCAddress;
   }
 
-  InetSocketAddress getRpcAddress() {
+  @VisibleForTesting
+  public InetSocketAddress getRpcAddress() {
     return clientRpcAddress;
   }
 
+  @VisibleForTesting
+  public Set<InetSocketAddress> getAuxiliaryRpcAddresses() {
+    return clientRpcServer.getAuxiliaryListenerAddresses();
+  }
+
   private static UserGroupInformation getRemoteUser() throws IOException {
     return NameNode.getRemoteUser();
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 7988502..e70b808 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -5045,6 +5045,17 @@
   </property>
 
   <property>
+    <name>dfs.namenode.rpc-address.auxiliary-ports</name>
+    <value></value>
+    <description>
+      A comma separated list of auxiliary ports for the NameNode to listen on.
+      This allows exposing multiple NN addresses to clients.
+      Particularly, it is used to enforce different SASL levels on different ports.
+      Empty list indicates that auxiliary ports are disabled.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.namenode.blockreport.queue.size</name>
     <value>1024</value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 017f32e..97964ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1361,6 +1361,21 @@ public class MiniDFSCluster implements AutoCloseable {
     }
     return uri;
   }
+
+  URI getURIForAuxiliaryPort(int nnIndex) {
+    String hostPort =
+        getNN(nnIndex).nameNode.getNNAuxiliaryRpcAddress();
+    if (hostPort == null) {
+      throw new RuntimeException("No auxiliary port found");
+    }
+    URI uri = null;
+    try {
+      uri = new URI("hdfs://" + hostPort);
+    } catch (URISyntaxException e) {
+      NameNode.LOG.warn("unexpected URISyntaxException", e);
+    }
+    return uri;
+  }
   
   public int getInstanceId() {
     return instanceId;
@@ -1961,6 +1976,14 @@ public class MiniDFSCluster implements AutoCloseable {
     checkSingleNameNode();
     return getNameNodePort(0);
   }
+
+  /**
+   * Get the auxiliary port of NameNode, NameNode specified by index.
+   */
+  public int getNameNodeAuxiliaryPort() {
+    checkSingleNameNode();
+    return getNameNodeAuxiliaryPort(0);
+  }
     
   /**
    * Gets the rpc port used by the NameNode at the given index, because the
@@ -1971,6 +1994,22 @@ public class MiniDFSCluster implements AutoCloseable {
   }
 
   /**
+   * Gets the rpc port used by the NameNode at the given index, if the
+   * NameNode has multiple auxiliary ports configured, a arbitrary
+   * one is returned.
+   */
+  public int getNameNodeAuxiliaryPort(int nnIndex) {
+    Set<InetSocketAddress> allAuxiliaryAddresses =
+        getNN(nnIndex).nameNode.getAuxiliaryNameNodeAddresses();
+    if (allAuxiliaryAddresses.isEmpty()) {
+      return -1;
+    } else {
+      InetSocketAddress addr = allAuxiliaryAddresses.iterator().next();
+      return addr.getPort();
+    }
+  }
+
+  /**
    * @return the service rpc port used by the NameNode at the given index.
    */     
   public int getNameNodeServicePort(int nnIndex) {
@@ -2531,6 +2570,12 @@ public class MiniDFSCluster implements AutoCloseable {
     return getFileSystem(0);
   }
 
+  public DistributedFileSystem getFileSystemFromAuxiliaryPort()
+      throws IOException {
+    checkSingleNameNode();
+    return getFileSystemFromAuxiliaryPort(0);
+  }
+
   /**
    * Get a client handle to the DFS cluster for the namenode at given index.
    */
@@ -2539,6 +2584,12 @@ public class MiniDFSCluster implements AutoCloseable {
         getNN(nnIndex).conf));
   }
 
+  public DistributedFileSystem getFileSystemFromAuxiliaryPort(int nnIndex)
+      throws IOException {
+    return (DistributedFileSystem) addFileSystem(FileSystem.get(
+        getURIForAuxiliaryPort(nnIndex), getNN(nnIndex).conf));
+  }
+
   /**
    * Get another FileSystem instance that is different from FileSystem.get(conf).
    * This simulating different threads working on different FileSystem instances.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
new file mode 100644
index 0000000..867fbac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test NN auxiliary port with HA.
+ */
+public class TestHAAuxiliaryPort {
+  @Test
+  public void testTest() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0");
+    conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1",
+        "9000,9001");
+    conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn2",
+        "9000,9001");
+    conf.set(DFS_NAMESERVICES, "ha-nn-uri-0");
+    conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ha-nn-uri-0", "nn1,nn2");
+    conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+
+    MiniDFSNNTopology topology = new MiniDFSNNTopology()
+        .addNameservice(new MiniDFSNNTopology.NSConf("ha-nn-uri-0")
+            .addNN(new MiniDFSNNTopology.NNConf("nn1"))
+            .addNN(new MiniDFSNNTopology.NNConf("nn2")));
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(topology)
+        .numDataNodes(0)
+        .build();
+    cluster.transitionToActive(0);
+    cluster.waitActive();
+
+    NameNode nn0 = cluster.getNameNode(0);
+    NameNode nn1 = cluster.getNameNode(1);
+
+    // all the addresses below are valid nn0 addresses
+    NameNodeRpcServer rpcServer0 = (NameNodeRpcServer)nn0.getRpcServer();
+    InetSocketAddress server0RpcAddress = rpcServer0.getRpcAddress();
+    Set<InetSocketAddress> auxAddrServer0 =
+        rpcServer0.getAuxiliaryRpcAddresses();
+    assertEquals(2, auxAddrServer0.size());
+
+    // all the addresses below are valid nn1 addresses
+    NameNodeRpcServer rpcServer1 = (NameNodeRpcServer)nn1.getRpcServer();
+    InetSocketAddress server1RpcAddress = rpcServer1.getRpcAddress();
+    Set<InetSocketAddress> auxAddrServer1 =
+        rpcServer1.getAuxiliaryRpcAddresses();
+    assertEquals(2, auxAddrServer1.size());
+
+    // mkdir on nn0 uri 0
+    URI nn0URI = new URI("hdfs://localhost:" +
+        server0RpcAddress.getPort());
+    try (DFSClient client0 = new DFSClient(nn0URI, conf)){
+      client0.mkdirs("/test", null, true);
+      // should be available on other ports also
+      for (InetSocketAddress auxAddr : auxAddrServer0) {
+        nn0URI = new URI("hdfs://localhost:" + auxAddr.getPort());
+        try (DFSClient clientTmp = new DFSClient(nn0URI, conf)) {
+          assertTrue(clientTmp.exists("/test"));
+        }
+      }
+    }
+
+    // now perform a failover
+    cluster.shutdownNameNode(0);
+    cluster.transitionToActive(1);
+
+    // then try to read the file from the nn1
+    URI nn1URI = new URI("hdfs://localhost:" +
+        server1RpcAddress.getPort());
+    try (DFSClient client1 = new DFSClient(nn1URI, conf)) {
+      assertTrue(client1.exists("/test"));
+      // should be available on other ports also
+      for (InetSocketAddress auxAddr : auxAddrServer1) {
+        nn1URI = new URI("hdfs://localhost:" + auxAddr.getPort());
+        try (DFSClient clientTmp = new DFSClient(nn1URI, conf)) {
+          assertTrue(client1.exists("/test"));
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org