You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/09/04 00:20:02 UTC

[hadoop] branch branch-2 updated (493b362 -> 1ff68d0)

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a change to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git.


    from 493b362  YARN-9806. TestNMSimulator#testNMSimulator fails in branch-2
     new 94d71ba  HDFS-13547. Add ingress port based sasl resolver. Contributed by Chen Liang.
     new a06e35a  HDFS-13566. Add configurable additional RPC listener to NameNode. Contributed by Chen Liang.
     new e63619a  HDFS-13617. Allow wrapping NN QOP into token in encrypted message. Contributed by Chen Liang
     new 163cb4d  HDFS-13699. Add DFSClient sending handshake token to DataNode, and allow DataNode overwrite downstream QOP. Contributed by Chen Liang.
     new 1ff68d0  HDFS-14611. Move handshake secret field from Token to BlockAccessToken. Contributed by Chen Liang

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../main/java/org/apache/hadoop/ipc/Server.java    | 109 ++++++++--
 .../hadoop/security/IngressPortBasedResolver.java  | 100 ++++++++++
 .../hadoop/security/SaslPropertiesResolver.java    |  47 ++++-
 .../hadoop/security/WhitelistBasedResolver.java    |  20 +-
 .../hadoop/security/token/SecretManager.java       |   2 +-
 .../org/apache/hadoop/security/token/Token.java    |  12 +-
 .../test/java/org/apache/hadoop/ipc/TestIPC.java   |  53 ++++-
 .../security/TestIngressPortBasedResolver.java     |  59 ++++++
 .../java/org/apache/hadoop/hdfs/DFSUtilClient.java |  96 ++++++++-
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |   8 +
 .../datatransfer/sasl/DataTransferSaslUtil.java    |  72 +++++++
 .../datatransfer/sasl/SaslDataTransferClient.java  | 134 +++++++++++--
 .../security/token/block/BlockTokenIdentifier.java |  25 +++
 .../src/main/proto/datatransfer.proto              |   6 +
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  11 ++
 .../datatransfer/sasl/SaslDataTransferServer.java  |  29 ++-
 .../token/block/BlockTokenSecretManager.java       |  36 +++-
 .../hdfs/server/blockmanagement/BlockManager.java  |  11 +-
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |   6 +
 .../hadoop/hdfs/server/datanode/DataNode.java      |  10 +
 .../hadoop/hdfs/server/datanode/DataXceiver.java   |  18 +-
 .../hadoop/hdfs/server/namenode/NameNode.java      |  30 +++
 .../hdfs/server/namenode/NameNodeRpcServer.java    |  38 +++-
 .../src/main/resources/hdfs-default.xml            |  44 +++++
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java     |  51 +++++
 .../hadoop/hdfs/TestBlockTokenWrappingQOP.java     | 164 +++++++++++++++
 .../apache/hadoop/hdfs/TestHAAuxiliaryPort.java    | 112 +++++++++++
 .../apache/hadoop/hdfs/TestMultipleNNPortQOP.java  | 219 +++++++++++++++++++++
 28 files changed, 1442 insertions(+), 80 deletions(-)
 create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/IngressPortBasedResolver.java
 create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestIngressPortBasedResolver.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
 create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 02/05: HDFS-13566. Add configurable additional RPC listener to NameNode. Contributed by Chen Liang.

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit a06e35a1979a4be336c4d6be05cceb815637e245
Author: Chen Liang <cl...@apache.org>
AuthorDate: Tue Oct 23 14:53:45 2018 -0700

    HDFS-13566. Add configurable additional RPC listener to NameNode. Contributed by Chen Liang.
---
 .../main/java/org/apache/hadoop/ipc/Server.java    | 114 +++++++++++++++++++--
 .../hadoop/security/SaslPropertiesResolver.java    |   4 +-
 .../test/java/org/apache/hadoop/ipc/TestIPC.java   |  53 +++++++++-
 .../java/org/apache/hadoop/hdfs/DFSUtilClient.java |  96 +++++++++++++++--
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |   5 +
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   3 +
 .../hadoop/hdfs/server/namenode/NameNode.java      |  30 ++++++
 .../hdfs/server/namenode/NameNodeRpcServer.java    |  16 ++-
 .../src/main/resources/hdfs-default.xml            |  11 ++
 .../org/apache/hadoop/hdfs/MiniDFSCluster.java     |  51 +++++++++
 .../apache/hadoop/hdfs/TestHAAuxiliaryPort.java    | 112 ++++++++++++++++++++
 11 files changed, 472 insertions(+), 23 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index c81fcf4..c23390f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -374,6 +374,24 @@ public abstract class Server {
   }
 
   /**
+   * Returns the SASL qop for the current call, if the current call is
+   * set, and the SASL negotiation is done. Otherwise return null. Note
+   * that CurCall is thread local object. So in fact, different handler
+   * threads will process different CurCall object.
+   *
+   * Also, only return for RPC calls, not supported for other protocols.
+   * @return the QOP of the current connection.
+   */
+  public static String getEstablishedQOP() {
+    Call call = CurCall.get();
+    if (call == null || !(call instanceof RpcCall)) {
+      return null;
+    }
+    RpcCall rpcCall = (RpcCall)call;
+    return rpcCall.connection.getEstablishedQOP();
+  }
+
+  /**
    * Returns the clientId from the current RPC request
    */
   public static byte[] getClientId() {
@@ -452,6 +470,10 @@ public abstract class Server {
   // maintains the set of client connections and handles idle timeouts
   private ConnectionManager connectionManager;
   private Listener listener = null;
+  // Auxiliary listeners maintained as in a map, to allow
+  // arbitrary number of of auxiliary listeners. A map from
+  // the port to the listener binding to it.
+  private Map<Integer, Listener> auxiliaryListenerMap;
   private Responder responder = null;
   private Handler[] handlers = null;
 
@@ -1142,11 +1164,12 @@ public abstract class Server {
     private Reader[] readers = null;
     private int currentReader = 0;
     private InetSocketAddress address; //the address we bind at
+    private int listenPort; //the port we bind at
     private int backlogLength = conf.getInt(
         CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
         CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
     
-    public Listener() throws IOException {
+    Listener(int port) throws IOException {
       address = new InetSocketAddress(bindAddress, port);
       // Create a new server socket and set to non blocking mode
       acceptChannel = ServerSocketChannel.open();
@@ -1154,7 +1177,10 @@ public abstract class Server {
 
       // Bind the server socket to the local host and port
       bind(acceptChannel.socket(), address, backlogLength, conf, portRangeConfig);
-      port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
+      //Could be an ephemeral port
+      this.listenPort = acceptChannel.socket().getLocalPort();
+      Thread.currentThread().setName("Listener at " +
+          bindAddress + "/" + this.listenPort);
       // create a selector;
       selector= Selector.open();
       readers = new Reader[readThreads];
@@ -1337,7 +1363,7 @@ public abstract class Server {
         channel.socket().setKeepAlive(true);
         
         Reader reader = getReader();
-        Connection c = connectionManager.register(channel);
+        Connection c = connectionManager.register(channel, this.listenPort);
         // If the connectionManager can't take it, close the connection.
         if (c == null) {
           if (channel.isOpen()) {
@@ -1759,6 +1785,7 @@ public abstract class Server {
     private ByteBuffer unwrappedDataLengthBuffer;
     private int serviceClass;
     private boolean shouldClose = false;
+    private int ingressPort;
 
     UserGroupInformation user = null;
     public UserGroupInformation attemptingUser = null; // user name before auth
@@ -1770,7 +1797,8 @@ public abstract class Server {
     private boolean sentNegotiate = false;
     private boolean useWrap = false;
     
-    public Connection(SocketChannel channel, long lastContact) {
+    public Connection(SocketChannel channel, long lastContact,
+        int ingressPort) {
       this.channel = channel;
       this.lastContact = lastContact;
       this.data = null;
@@ -1779,6 +1807,7 @@ public abstract class Server {
       this.unwrappedDataLengthBuffer = ByteBuffer.allocate(4);
       this.socket = channel.socket();
       this.addr = socket.getInetAddress();
+      this.ingressPort = ingressPort;
       if (addr == null) {
         this.hostAddress = "*Unknown*";
       } else {
@@ -1813,9 +1842,24 @@ public abstract class Server {
       return hostAddress;
     }
 
+    public int getIngressPort() {
+      return ingressPort;
+    }
+
     public InetAddress getHostInetAddress() {
       return addr;
     }
+
+    public String getEstablishedQOP() {
+      // In practice, saslServer should not be null when this is
+      // called. If it is null, it must be either some
+      // configuration mistake or it is called from unit test.
+      if (saslServer == null) {
+        LOG.warn("SASL server should not be null!");
+        return null;
+      }
+      return (String)saslServer.getNegotiatedProperty(Sasl.QOP);
+    }
     
     public void setLastContact(long lastContact) {
       this.lastContact = lastContact;
@@ -2223,7 +2267,7 @@ public abstract class Server {
     private SaslServer createSaslServer(AuthMethod authMethod)
         throws IOException, InterruptedException {
       final Map<String,?> saslProps =
-                  saslPropsResolver.getServerProperties(addr);
+                  saslPropsResolver.getServerProperties(addr, ingressPort);
       return new SaslRpcServer(authMethod).create(this, saslProps, secretManager);
     }
     
@@ -2739,7 +2783,8 @@ public abstract class Server {
   private class Handler extends Thread {
     public Handler(int instanceNumber) {
       this.setDaemon(true);
-      this.setName("IPC Server handler "+ instanceNumber + " on " + port);
+      this.setName("IPC Server handler "+ instanceNumber +
+          " on default port " + port);
     }
 
     @Override
@@ -2900,6 +2945,7 @@ public abstract class Server {
     this.rpcRequestClass = rpcRequestClass; 
     this.handlerCount = handlerCount;
     this.socketSendBufferSize = 0;
+    this.auxiliaryListenerMap = null;
     this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH,
         CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT);
     if (queueSizePerHandler != -1) {
@@ -2939,8 +2985,9 @@ public abstract class Server {
     this.negotiateResponse = buildNegotiateResponse(enabledAuthMethods);
     
     // Start the listener here and let it bind to the port
-    listener = new Listener();
-    this.port = listener.getAddress().getPort();    
+    listener = new Listener(port);
+    // set the server port to the default listener port.
+    this.port = listener.getAddress().getPort();
     connectionManager = new ConnectionManager();
     this.rpcMetrics = RpcMetrics.create(this, conf);
     this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
@@ -2962,7 +3009,23 @@ public abstract class Server {
     
     this.exceptionsHandler.addTerseLoggingExceptions(StandbyException.class);
   }
-  
+
+  public synchronized void addAuxiliaryListener(int auxiliaryPort)
+      throws IOException {
+    if (auxiliaryListenerMap == null) {
+      auxiliaryListenerMap = new HashMap<>();
+    }
+    if (auxiliaryListenerMap.containsKey(auxiliaryPort) && auxiliaryPort != 0) {
+      throw new IOException(
+          "There is already a listener binding to: " + auxiliaryPort);
+    }
+    Listener newListener = new Listener(auxiliaryPort);
+    // in the case of port = 0, the listener would be on a != 0 port.
+    LOG.info("Adding a server listener on port " +
+        newListener.getAddress().getPort());
+    auxiliaryListenerMap.put(newListener.getAddress().getPort(), newListener);
+  }
+
   private RpcSaslProto buildNegotiateResponse(List<AuthMethod> authMethods)
       throws IOException {
     RpcSaslProto.Builder negotiateBuilder = RpcSaslProto.newBuilder();
@@ -3199,6 +3262,12 @@ public abstract class Server {
   public synchronized void start() {
     responder.start();
     listener.start();
+    if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+      for (Listener newListener : auxiliaryListenerMap.values()) {
+        newListener.start();
+      }
+    }
+
     handlers = new Handler[handlerCount];
     
     for (int i = 0; i < handlerCount; i++) {
@@ -3220,6 +3289,12 @@ public abstract class Server {
     }
     listener.interrupt();
     listener.doStop();
+    if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+      for (Listener newListener : auxiliaryListenerMap.values()) {
+        newListener.interrupt();
+        newListener.doStop();
+      }
+    }
     responder.interrupt();
     notifyAll();
     this.rpcMetrics.shutdown();
@@ -3243,6 +3318,23 @@ public abstract class Server {
   public synchronized InetSocketAddress getListenerAddress() {
     return listener.getAddress();
   }
+
+  /**
+   * Return the set of all the configured auxiliary socket addresses NameNode
+   * RPC is listening on. If there are none, or it is not configured at all, an
+   * empty set is returned.
+   * @return the set of all the auxiliary addresses on which the
+   *         RPC server is listening on.
+   */
+  public synchronized Set<InetSocketAddress> getAuxiliaryListenerAddresses() {
+    Set<InetSocketAddress> allAddrs = new HashSet<>();
+    if (auxiliaryListenerMap != null && auxiliaryListenerMap.size() > 0) {
+      for (Listener auxListener : auxiliaryListenerMap.values()) {
+        allAddrs.add(auxListener.getAddress());
+      }
+    }
+    return allAddrs;
+  }
   
   /** 
    * Called for each call. 
@@ -3547,11 +3639,11 @@ public abstract class Server {
       return connections.toArray(new Connection[0]);
     }
 
-    Connection register(SocketChannel channel) {
+    Connection register(SocketChannel channel, int ingressPort) {
       if (isFull()) {
         return null;
       }
-      Connection connection = new Connection(channel, Time.now());
+      Connection connection = new Connection(channel, Time.now(), ingressPort);
       add(connection);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Server connection from " + connection +
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
index 64b86e3..dd6c42e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
@@ -102,7 +102,7 @@ public class SaslPropertiesResolver implements Configurable{
    */
   public Map<String, String> getServerProperties(InetAddress clientAddress,
       int ingressPort){
-    return properties;
+    return getServerProperties(clientAddress);
   }
 
   /**
@@ -122,7 +122,7 @@ public class SaslPropertiesResolver implements Configurable{
    */
   public Map<String, String> getClientProperties(InetAddress serverAddress,
       int ingressPort) {
-    return properties;
+    return getClientProperties(serverAddress);
   }
 
   /**
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 95e76f7..9e42690 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -49,8 +49,10 @@ import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -171,6 +173,11 @@ public class TestIPC {
       this(handlerCount, sleep, LongWritable.class, null);
     }
 
+    public TestServer(int port, int handlerCount, boolean sleep)
+        throws IOException {
+      this(port, handlerCount, sleep, LongWritable.class, null);
+    }
+
     public TestServer(int handlerCount, boolean sleep, Configuration conf)
         throws IOException {
       this(handlerCount, sleep, LongWritable.class, null, conf);
@@ -182,11 +189,24 @@ public class TestIPC {
       this(handlerCount, sleep, paramClass, responseClass, conf);
     }
 
+    public TestServer(int port, int handlerCount, boolean sleep,
+        Class<? extends Writable> paramClass,
+        Class<? extends Writable> responseClass) throws IOException {
+      this(port, handlerCount, sleep, paramClass, responseClass, conf);
+    }
+
     public TestServer(int handlerCount, boolean sleep,
         Class<? extends Writable> paramClass,
         Class<? extends Writable> responseClass, Configuration conf)
         throws IOException {
-      super(ADDRESS, 0, paramClass, handlerCount, conf);
+      this(0, handlerCount, sleep, paramClass, responseClass, conf);
+    }
+
+    public TestServer(int port, int handlerCount, boolean sleep,
+        Class<? extends Writable> paramClass,
+        Class<? extends Writable> responseClass, Configuration conf)
+        throws IOException {
+      super(ADDRESS, port, paramClass, handlerCount, conf);
       this.sleep = sleep;
       this.responseClass = responseClass;
     }
@@ -338,6 +358,37 @@ public class TestIPC {
     }
     server.stop();
   }
+
+  @Test
+  public void testAuxiliaryPorts() throws IOException, InterruptedException {
+    int defaultPort = 9000;
+    int[] auxiliaryPorts = {9001, 9002, 9003};
+    final int handlerCount = 5;
+    final boolean handlerSleep = false;
+    Server server = new TestServer(defaultPort, handlerCount, handlerSleep);
+    for (int port : auxiliaryPorts) {
+      server.addAuxiliaryListener(port);
+    }
+    Set<InetSocketAddress> listenerAddrs =
+        server.getAuxiliaryListenerAddresses();
+    Set<InetSocketAddress> addrs = new HashSet<>();
+    for (InetSocketAddress addr : listenerAddrs) {
+      addrs.add(NetUtils.getConnectAddress(addr));
+    }
+    server.start();
+
+    Client client = new Client(LongWritable.class, conf);
+    Set<SerialCaller> calls = new HashSet<>();
+    for (InetSocketAddress addr : addrs) {
+      calls.add(new SerialCaller(client, addr, 100));
+    }
+    for (SerialCaller caller : calls) {
+      caller.join();
+      assertFalse(caller.failed);
+    }
+    client.stop();
+    server.stop();
+  }
 	
   @Test(timeout=60000)
   public void testStandAloneClient() throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 15c9b84..4be4c82 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.UnsupportedEncodingException;
@@ -31,6 +28,7 @@ import java.net.URISyntaxException;
 import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
@@ -41,6 +39,9 @@ import java.util.Map;
 
 import javax.net.SocketFactory;
 
+import com.google.common.base.Joiner;
+import com.google.common.collect.Maps;
+import com.google.common.primitives.SignedBytes;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -72,9 +73,8 @@ import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Joiner;
-import com.google.common.collect.Maps;
-import com.google.common.primitives.SignedBytes;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.*;
+
 
 public class DFSUtilClient {
   public static final byte[] EMPTY_BYTES = {};
@@ -162,7 +162,7 @@ public class DFSUtilClient {
   public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
       Configuration conf) {
     return DFSUtilClient.getAddresses(conf, null,
-      HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
+      DFS_NAMENODE_RPC_ADDRESS_KEY);
   }
 
   /**
@@ -365,7 +365,7 @@ public class DFSUtilClient {
     Map<String, InetSocketAddress> ret = Maps.newLinkedHashMap();
     for (String nnId : emptyAsSingletonNull(nnIds)) {
       String suffix = concatSuffixes(nsId, nnId);
-      String address = getConfValue(defaultValue, suffix, conf, keys);
+      String address = checkKeysAndProcess(defaultValue, suffix, conf, keys);
       if (address != null) {
         InetSocketAddress isa = NetUtils.createSocketAddr(address);
         if (isa.isUnresolved()) {
@@ -380,6 +380,86 @@ public class DFSUtilClient {
   }
 
   /**
+   * Return address from configuration. Take a list of keys as preference.
+   * If the address to be returned is the value of DFS_NAMENODE_RPC_ADDRESS_KEY,
+   * will check to see if auxiliary ports are enabled. If so, call to replace
+   * address port with auxiliary port. If the address is not the value of
+   * DFS_NAMENODE_RPC_ADDRESS_KEY, return the original address. If failed to
+   * find any address, return the given default value.
+   *
+   * @param defaultValue the default value if no values found for given keys
+   * @param suffix suffix to append to keys
+   * @param conf the configuration
+   * @param keys a list of keys, ordered by preference
+   * @return
+   */
+  private static String checkKeysAndProcess(String defaultValue, String suffix,
+      Configuration conf, String... keys) {
+    String succeededKey = null;
+    String address = null;
+    for (String key : keys) {
+      address = getConfValue(null, suffix, conf, key);
+      if (address != null) {
+        succeededKey = key;
+        break;
+      }
+    }
+    String ret;
+    if (address == null) {
+      ret = defaultValue;
+    } else if(DFS_NAMENODE_RPC_ADDRESS_KEY.equals(succeededKey))  {
+      ret = checkRpcAuxiliary(conf, suffix, address);
+    } else {
+      ret = address;
+    }
+    return ret;
+  }
+
+  /**
+   * Check if auxiliary port is enabled, if yes, check if the given address
+   * should have its port replaced by an auxiliary port. If the given address
+   * does not contain a port, append the auxiliary port to enforce using it.
+   *
+   * @param conf configuration.
+   * @param address the address to check and modify (if needed).
+   * @return the new modified address containing auxiliary port, or original
+   * address if auxiliary port not enabled.
+   */
+  private static String checkRpcAuxiliary(Configuration conf, String suffix,
+      String address) {
+    String key = DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+    key = addSuffix(key, suffix);
+    int[] ports = conf.getInts(key);
+    if (ports == null || ports.length == 0) {
+      return address;
+    }
+    LOG.info("Using server auxiliary ports " + Arrays.toString(ports));
+    URI uri;
+    try {
+      uri = new URI(address);
+    } catch (URISyntaxException e) {
+      // return the original address untouched if it is not a valid URI. This
+      // happens in unit test, as MiniDFSCluster sets the value to
+      // 127.0.0.1:0, without schema (i.e. "hdfs://"). While in practice, this
+      // should not be the case. So log a warning message here.
+      LOG.warn("NameNode address is not a valid uri:" + address);
+      return address;
+    }
+    // Ignore the port, only take the schema(e.g. hdfs) and host (e.g.
+    // localhost), then append port
+    // TODO : revisit if there is a better way
+    StringBuilder sb = new StringBuilder();
+    sb.append(uri.getScheme());
+    sb.append("://");
+    sb.append(uri.getHost());
+    sb.append(":");
+    // TODO : currently, only the very first auxiliary port is being used.
+    // But actually NN supports running multiple auxiliary
+    sb.append(ports[0]);
+    return sb.toString();
+  }
+
+  /**
    * Given a list of keys in the order of preference, returns a value
    * for the key in the given order from the configuration.
    * @param defaultValue default value to return, when key was not found
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 4bc9b3e..c74ece2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -69,6 +69,11 @@ public interface HdfsClientConfigKeys {
   String  DFS_NAMESERVICES = "dfs.nameservices";
   String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";
   int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
+
+  String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX = "auxiliary-ports";
+  String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY = DFS_NAMENODE_RPC_ADDRESS_KEY
+      + "." + DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_SUFFIX;
+
   String  DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
   int     DFS_NAMENODE_HTTPS_PORT_DEFAULT = 50470;
   String  DFS_NAMENODE_HTTPS_ADDRESS_KEY = "dfs.namenode.https-address";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 41cc8e6..e973bd3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1054,6 +1054,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final Class<DFSNetworkTopology> DFS_NET_TOPOLOGY_IMPL_DEFAULT =
       DFSNetworkTopology.class;
 
+  public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY =
+      HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index efff782..b44d104 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -23,6 +23,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -1062,6 +1063,14 @@ public class NameNode extends ReconfigurableBase implements
   }
 
   /**
+   * @return The auxiliary nameNode RPC addresses, or empty set if there
+   * is none.
+   */
+  public Set<InetSocketAddress> getAuxiliaryNameNodeAddresses() {
+    return rpcServer.getAuxiliaryRpcAddresses();
+  }
+
+  /**
    * @return NameNode RPC address in "host:port" string form
    */
   public String getNameNodeAddressHostPortString() {
@@ -1069,6 +1078,27 @@ public class NameNode extends ReconfigurableBase implements
   }
 
   /**
+   * Return a host:port format string corresponds to an auxiliary
+   * port configured on NameNode. If there are multiple auxiliary ports,
+   * an arbitrary one is returned. If there is no auxiliary listener, returns
+   * null.
+   *
+   * @return a string of format host:port that points to an auxiliary NameNode
+   *         address, or null if there is no such address.
+   */
+  @VisibleForTesting
+  public String getNNAuxiliaryRpcAddress() {
+    Set<InetSocketAddress> auxiliaryAddrs = getAuxiliaryNameNodeAddresses();
+    if (auxiliaryAddrs.isEmpty()) {
+      return null;
+    }
+    // since set has no particular order, returning the first element of
+    // from the iterator is effectively arbitrary.
+    InetSocketAddress addr = auxiliaryAddrs.iterator().next();
+    return NetUtils.getHostPortString(addr);
+  }
+
+  /**
    * @return NameNode service RPC address if configured, the
    *    NameNode RPC address otherwise
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 0ef1343..aea811d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -26,6 +26,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
 
@@ -519,6 +520,13 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     if (lifelineRpcServer != null) {
       lifelineRpcServer.setTracer(nn.tracer);
     }
+    int[] auxiliaryPorts =
+        conf.getInts(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY);
+    if (auxiliaryPorts != null && auxiliaryPorts.length != 0) {
+      for (int auxiliaryPort : auxiliaryPorts) {
+        this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
+      }
+    }
   }
 
   /** Allow access to the lifeline RPC server for testing */
@@ -588,10 +596,16 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     return serviceRPCAddress;
   }
 
-  InetSocketAddress getRpcAddress() {
+  @VisibleForTesting
+  public InetSocketAddress getRpcAddress() {
     return clientRpcAddress;
   }
 
+  @VisibleForTesting
+  public Set<InetSocketAddress> getAuxiliaryRpcAddresses() {
+    return clientRpcServer.getAuxiliaryListenerAddresses();
+  }
+
   private static UserGroupInformation getRemoteUser() throws IOException {
     return NameNode.getRemoteUser();
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index aa97ec5..0a3b2d4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4545,4 +4545,15 @@
       ensure that other waiters on the lock can get in.
     </description>
   </property>
+
+  <property>
+    <name>dfs.namenode.rpc-address.auxiliary-ports</name>
+    <value></value>
+    <description>
+      A comma separated list of auxiliary ports for the NameNode to listen on.
+      This allows exposing multiple NN addresses to clients.
+      Particularly, it is used to enforce different SASL levels on different ports.
+      Empty list indicates that auxiliary ports are disabled.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
index 2ca8abc..ca28874 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
@@ -1359,6 +1359,21 @@ public class MiniDFSCluster implements AutoCloseable {
     }
     return uri;
   }
+
+  URI getURIForAuxiliaryPort(int nnIndex) {
+    String hostPort =
+        getNN(nnIndex).nameNode.getNNAuxiliaryRpcAddress();
+    if (hostPort == null) {
+      throw new RuntimeException("No auxiliary port found");
+    }
+    URI uri = null;
+    try {
+      uri = new URI("hdfs://" + hostPort);
+    } catch (URISyntaxException e) {
+      NameNode.LOG.warn("unexpected URISyntaxException", e);
+    }
+    return uri;
+  }
   
   public int getInstanceId() {
     return instanceId;
@@ -1913,6 +1928,14 @@ public class MiniDFSCluster implements AutoCloseable {
     checkSingleNameNode();
     return getNameNodePort(0);
   }
+
+  /**
+   * Get the auxiliary port of NameNode, NameNode specified by index.
+   */
+  public int getNameNodeAuxiliaryPort() {
+    checkSingleNameNode();
+    return getNameNodeAuxiliaryPort(0);
+  }
     
   /**
    * Gets the rpc port used by the NameNode at the given index, because the
@@ -1923,6 +1946,22 @@ public class MiniDFSCluster implements AutoCloseable {
   }
 
   /**
+   * Gets the rpc port used by the NameNode at the given index, if the
+   * NameNode has multiple auxiliary ports configured, a arbitrary
+   * one is returned.
+   */
+  public int getNameNodeAuxiliaryPort(int nnIndex) {
+    Set<InetSocketAddress> allAuxiliaryAddresses =
+        getNN(nnIndex).nameNode.getAuxiliaryNameNodeAddresses();
+    if (allAuxiliaryAddresses.isEmpty()) {
+      return -1;
+    } else {
+      InetSocketAddress addr = allAuxiliaryAddresses.iterator().next();
+      return addr.getPort();
+    }
+  }
+
+  /**
    * @return the service rpc port used by the NameNode at the given index.
    */     
   public int getNameNodeServicePort(int nnIndex) {
@@ -2502,6 +2541,12 @@ public class MiniDFSCluster implements AutoCloseable {
     return getFileSystem(0);
   }
 
+  public DistributedFileSystem getFileSystemFromAuxiliaryPort()
+      throws IOException {
+    checkSingleNameNode();
+    return getFileSystemFromAuxiliaryPort(0);
+  }
+
   /**
    * Get a client handle to the DFS cluster for the namenode at given index.
    */
@@ -2510,6 +2555,12 @@ public class MiniDFSCluster implements AutoCloseable {
         getNN(nnIndex).conf));
   }
 
+  public DistributedFileSystem getFileSystemFromAuxiliaryPort(int nnIndex)
+      throws IOException {
+    return (DistributedFileSystem) addFileSystem(FileSystem.get(
+        getURIForAuxiliaryPort(nnIndex), getNN(nnIndex).conf));
+  }
+
   /**
    * Get another FileSystem instance that is different from FileSystem.get(conf).
    * This simulating different threads working on different FileSystem instances.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
new file mode 100644
index 0000000..867fbac
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_NAMESERVICES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Test NN auxiliary port with HA.
+ */
+public class TestHAAuxiliaryPort {
+  @Test
+  public void testTest() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0");
+    conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1",
+        "9000,9001");
+    conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn2",
+        "9000,9001");
+    conf.set(DFS_NAMESERVICES, "ha-nn-uri-0");
+    conf.set(DFS_HA_NAMENODES_KEY_PREFIX + ".ha-nn-uri-0", "nn1,nn2");
+    conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+
+    MiniDFSNNTopology topology = new MiniDFSNNTopology()
+        .addNameservice(new MiniDFSNNTopology.NSConf("ha-nn-uri-0")
+            .addNN(new MiniDFSNNTopology.NNConf("nn1"))
+            .addNN(new MiniDFSNNTopology.NNConf("nn2")));
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(topology)
+        .numDataNodes(0)
+        .build();
+    cluster.transitionToActive(0);
+    cluster.waitActive();
+
+    NameNode nn0 = cluster.getNameNode(0);
+    NameNode nn1 = cluster.getNameNode(1);
+
+    // all the addresses below are valid nn0 addresses
+    NameNodeRpcServer rpcServer0 = (NameNodeRpcServer)nn0.getRpcServer();
+    InetSocketAddress server0RpcAddress = rpcServer0.getRpcAddress();
+    Set<InetSocketAddress> auxAddrServer0 =
+        rpcServer0.getAuxiliaryRpcAddresses();
+    assertEquals(2, auxAddrServer0.size());
+
+    // all the addresses below are valid nn1 addresses
+    NameNodeRpcServer rpcServer1 = (NameNodeRpcServer)nn1.getRpcServer();
+    InetSocketAddress server1RpcAddress = rpcServer1.getRpcAddress();
+    Set<InetSocketAddress> auxAddrServer1 =
+        rpcServer1.getAuxiliaryRpcAddresses();
+    assertEquals(2, auxAddrServer1.size());
+
+    // mkdir on nn0 uri 0
+    URI nn0URI = new URI("hdfs://localhost:" +
+        server0RpcAddress.getPort());
+    try (DFSClient client0 = new DFSClient(nn0URI, conf)){
+      client0.mkdirs("/test", null, true);
+      // should be available on other ports also
+      for (InetSocketAddress auxAddr : auxAddrServer0) {
+        nn0URI = new URI("hdfs://localhost:" + auxAddr.getPort());
+        try (DFSClient clientTmp = new DFSClient(nn0URI, conf)) {
+          assertTrue(clientTmp.exists("/test"));
+        }
+      }
+    }
+
+    // now perform a failover
+    cluster.shutdownNameNode(0);
+    cluster.transitionToActive(1);
+
+    // then try to read the file from the nn1
+    URI nn1URI = new URI("hdfs://localhost:" +
+        server1RpcAddress.getPort());
+    try (DFSClient client1 = new DFSClient(nn1URI, conf)) {
+      assertTrue(client1.exists("/test"));
+      // should be available on other ports also
+      for (InetSocketAddress auxAddr : auxAddrServer1) {
+        nn1URI = new URI("hdfs://localhost:" + auxAddr.getPort());
+        try (DFSClient clientTmp = new DFSClient(nn1URI, conf)) {
+          assertTrue(client1.exists("/test"));
+        }
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 04/05: HDFS-13699. Add DFSClient sending handshake token to DataNode, and allow DataNode overwrite downstream QOP. Contributed by Chen Liang.

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 163cb4d5aebd57c8d047903b218924a34a5a9165
Author: Chen Liang <cl...@apache.org>
AuthorDate: Fri Apr 12 17:37:51 2019 -0700

    HDFS-13699. Add DFSClient sending handshake token to DataNode, and allow DataNode overwrite downstream QOP. Contributed by Chen Liang.
---
 .../hadoop/security/token/SecretManager.java       |   2 +-
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |   3 +
 .../datatransfer/sasl/DataTransferSaslUtil.java    |  72 +++++++
 .../datatransfer/sasl/SaslDataTransferClient.java  | 117 +++++++++--
 .../src/main/proto/datatransfer.proto              |   6 +
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   4 +
 .../datatransfer/sasl/SaslDataTransferServer.java  |  53 ++++-
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |   6 +
 .../hadoop/hdfs/server/datanode/DataNode.java      |  10 +
 .../hadoop/hdfs/server/datanode/DataXceiver.java   |  16 +-
 .../src/main/resources/hdfs-default.xml            |  22 +++
 .../apache/hadoop/hdfs/TestHAAuxiliaryPort.java    |   2 +-
 .../apache/hadoop/hdfs/TestMultipleNNPortQOP.java  | 219 +++++++++++++++++++++
 13 files changed, 508 insertions(+), 24 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java
index 798c8c9..514806d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/SecretManager.java
@@ -167,7 +167,7 @@ public abstract class SecretManager<T extends TokenIdentifier> {
    * @param key the secret key
    * @return the bytes of the generated password
    */
-  protected static byte[] createPassword(byte[] identifier, 
+  public static byte[] createPassword(byte[] identifier,
                                          SecretKey key) {
     Mac mac = threadLocalMac.get();
     try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index c74ece2..44662d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -171,6 +171,9 @@ public interface HdfsClientConfigKeys {
   String DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY =
       "dfs.encrypt.data.transfer.cipher.suites";
 
+  String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY =
+      "dfs.encrypt.data.overwrite.downstream.new.qop";
+
   String DFS_DATA_TRANSFER_PROTECTION_KEY = "dfs.data.transfer.protection";
   String DFS_DATA_TRANSFER_PROTECTION_DEFAULT = "";
   String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
index f4651eb..666a29f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.HandshakeSecretProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherOptionProto;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
 import org.apache.hadoop.security.SaslPropertiesResolver;
@@ -249,6 +250,51 @@ public final class DataTransferSaslUtil {
     }
   }
 
+  static class SaslMessageWithHandshake {
+    private final byte[] payload;
+    private final byte[] secret;
+    private final String bpid;
+
+    SaslMessageWithHandshake(byte[] payload, byte[] secret, String bpid) {
+      this.payload = payload;
+      this.secret = secret;
+      this.bpid = bpid;
+    }
+
+    byte[] getPayload() {
+      return payload;
+    }
+
+    byte[] getSecret() {
+      return secret;
+    }
+
+    String getBpid() {
+      return bpid;
+    }
+  }
+
+  public static SaslMessageWithHandshake readSaslMessageWithHandshakeSecret(
+      InputStream in) throws IOException {
+    DataTransferEncryptorMessageProto proto =
+        DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in));
+    if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+      throw new InvalidEncryptionKeyException(proto.getMessage());
+    } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+      throw new IOException(proto.getMessage());
+    } else {
+      byte[] payload = proto.getPayload().toByteArray();
+      byte[] secret = null;
+      String bpid = null;
+      if (proto.hasHandshakeSecret()) {
+        HandshakeSecretProto handshakeSecret = proto.getHandshakeSecret();
+        secret = handshakeSecret.getSecret().toByteArray();
+        bpid = handshakeSecret.getBpid();
+      }
+      return new SaslMessageWithHandshake(payload, secret, bpid);
+    }
+  }
+
   /**
    * Negotiate a cipher option which server supports.
    *
@@ -375,6 +421,12 @@ public final class DataTransferSaslUtil {
     sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null);
   }
 
+  public static void sendSaslMessageHandshakeSecret(OutputStream out,
+      byte[] payload, byte[] secret, String bpid) throws IOException {
+    sendSaslMessageHandshakeSecret(out, DataTransferEncryptorStatus.SUCCESS,
+        payload, null, secret, bpid);
+  }
+
   /**
    * Send a SASL negotiation message and negotiation cipher options to server.
    *
@@ -497,6 +549,13 @@ public final class DataTransferSaslUtil {
   public static void sendSaslMessage(OutputStream out,
       DataTransferEncryptorStatus status, byte[] payload, String message)
       throws IOException {
+    sendSaslMessage(out, status, payload, message, null);
+  }
+
+  public static void sendSaslMessage(OutputStream out,
+      DataTransferEncryptorStatus status, byte[] payload, String message,
+      HandshakeSecretProto handshakeSecret)
+      throws IOException {
     DataTransferEncryptorMessageProto.Builder builder =
         DataTransferEncryptorMessageProto.newBuilder();
 
@@ -507,12 +566,25 @@ public final class DataTransferSaslUtil {
     if (message != null) {
       builder.setMessage(message);
     }
+    if (handshakeSecret != null) {
+      builder.setHandshakeSecret(handshakeSecret);
+    }
 
     DataTransferEncryptorMessageProto proto = builder.build();
     proto.writeDelimitedTo(out);
     out.flush();
   }
 
+  public static void sendSaslMessageHandshakeSecret(OutputStream out,
+      DataTransferEncryptorStatus status, byte[] payload, String message,
+      byte[] secret, String bpid) throws IOException {
+    HandshakeSecretProto.Builder builder =
+        HandshakeSecretProto.newBuilder();
+    builder.setSecret(ByteString.copyFrom(secret));
+    builder.setBpid(bpid);
+    sendSaslMessage(out, status, payload, message, builder.build());
+  }
+
   /**
    * There is no reason to instantiate this class.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
index 7804bec..8d1c7f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
@@ -18,8 +18,11 @@
 package org.apache.hadoop.hdfs.protocol.datatransfer.sasl;
 
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
 
+import com.google.common.annotations.VisibleForTesting;
+import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -31,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.crypto.SecretKey;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -39,6 +43,7 @@ import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.RealmCallback;
 import javax.security.sasl.RealmChoiceCallback;
 
+import javax.security.sasl.Sasl;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -54,6 +59,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -83,6 +89,10 @@ public class SaslDataTransferClient {
   private final SaslPropertiesResolver saslPropsResolver;
   private final TrustedChannelResolver trustedChannelResolver;
 
+  // Store the most recent successfully negotiated QOP,
+  // for testing purpose only
+  private String targetQOP;
+
   /**
    * Creates a new SaslDataTransferClient.  This constructor is used in cases
    * where it is not relevant to track if a secure client did a fallback to
@@ -140,7 +150,7 @@ public class SaslDataTransferClient {
     DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ?
         encryptionKeyFactory.newDataEncryptionKey() : null;
     IOStreamPair ios = send(socket.getInetAddress(), underlyingOut,
-        underlyingIn, encryptionKey, accessToken, datanodeId);
+        underlyingIn, encryptionKey, accessToken, datanodeId, null);
     return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
   }
 
@@ -180,8 +190,19 @@ public class SaslDataTransferClient {
       InputStream underlyingIn, DataEncryptionKeyFactory encryptionKeyFactory,
       Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
       throws IOException {
+    return socketSend(socket, underlyingOut, underlyingIn, encryptionKeyFactory,
+        accessToken, datanodeId, null);
+  }
+
+  public IOStreamPair socketSend(
+      Socket socket, OutputStream underlyingOut, InputStream underlyingIn,
+      DataEncryptionKeyFactory encryptionKeyFactory,
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
+      SecretKey secretKey)
+      throws IOException {
     IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut,
-        underlyingIn, encryptionKeyFactory, accessToken, datanodeId);
+        underlyingIn, encryptionKeyFactory, accessToken, datanodeId,
+        secretKey);
     return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut);
   }
 
@@ -203,13 +224,26 @@ public class SaslDataTransferClient {
       DataEncryptionKeyFactory encryptionKeyFactory,
       Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
       throws IOException {
-    if (!trustedChannelResolver.isTrusted() &&
-        !trustedChannelResolver.isTrusted(addr)) {
+    return checkTrustAndSend(addr, underlyingOut, underlyingIn,
+        encryptionKeyFactory, accessToken, datanodeId, null);
+  }
+
+  private IOStreamPair checkTrustAndSend(
+      InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn,
+      DataEncryptionKeyFactory encryptionKeyFactory,
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
+      SecretKey secretKey)
+      throws IOException {
+    boolean localTrusted = trustedChannelResolver.isTrusted();
+    boolean remoteTrusted = trustedChannelResolver.isTrusted(addr);
+    LOG.info("SASL encryption trust check: localHostTrusted = {}, "
+        + "remoteHostTrusted = {}", localTrusted, remoteTrusted);
+    if (!localTrusted || !remoteTrusted) {
       // The encryption key factory only returns a key if encryption is enabled.
       DataEncryptionKey encryptionKey =
           encryptionKeyFactory.newDataEncryptionKey();
       return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken,
-          datanodeId);
+          datanodeId, secretKey);
     } else {
       LOG.debug(
           "SASL client skipping handshake on trusted connection for addr = {}, "
@@ -233,13 +267,14 @@ public class SaslDataTransferClient {
    */
   private IOStreamPair send(InetAddress addr, OutputStream underlyingOut,
       InputStream underlyingIn, DataEncryptionKey encryptionKey,
-      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId)
+      Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId,
+      SecretKey secretKey)
       throws IOException {
     if (encryptionKey != null) {
       LOG.debug("SASL client doing encrypted handshake for addr = {}, "
           + "datanodeId = {}", addr, datanodeId);
       return getEncryptedStreams(addr, underlyingOut, underlyingIn,
-          encryptionKey);
+          encryptionKey, accessToken, secretKey);
     } else if (!UserGroupInformation.isSecurityEnabled()) {
       LOG.debug("SASL client skipping handshake in unsecured configuration for "
           + "addr = {}, datanodeId = {}", addr, datanodeId);
@@ -260,7 +295,8 @@ public class SaslDataTransferClient {
       LOG.debug(
           "SASL client doing general handshake for addr = {}, datanodeId = {}",
           addr, datanodeId);
-      return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken);
+      return getSaslStreams(addr, underlyingOut, underlyingIn,
+          accessToken, secretKey);
     } else {
       // It's a secured cluster using non-privileged ports, but no SASL.  The
       // only way this can happen is if the DataNode has
@@ -283,11 +319,20 @@ public class SaslDataTransferClient {
    * @throws IOException for any error
    */
   private IOStreamPair getEncryptedStreams(InetAddress addr,
-      OutputStream underlyingOut,
-      InputStream underlyingIn, DataEncryptionKey encryptionKey)
+      OutputStream underlyingOut, InputStream underlyingIn,
+      DataEncryptionKey encryptionKey,
+      Token<BlockTokenIdentifier> accessToken,
+      SecretKey secretKey)
       throws IOException {
     Map<String, String> saslProps = createSaslPropertiesForEncryption(
         encryptionKey.encryptionAlgorithm);
+    if (secretKey != null) {
+      LOG.debug("DataNode overwriting downstream QOP" +
+          saslProps.get(Sasl.QOP));
+      byte[] newSecret =  SecretManager.createPassword(saslProps.get(Sasl.QOP)
+          .getBytes(Charsets.UTF_8), secretKey);
+      accessToken.setDNHandshakeSecret(newSecret);
+    }
 
     LOG.debug("Client using encryption algorithm {}",
         encryptionKey.encryptionAlgorithm);
@@ -297,7 +342,7 @@ public class SaslDataTransferClient {
     CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
         password);
     return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
-        saslProps, callbackHandler);
+        saslProps, callbackHandler, accessToken);
   }
 
   /**
@@ -366,6 +411,11 @@ public class SaslDataTransferClient {
     }
   }
 
+  @VisibleForTesting
+  public String getTargetQOP() {
+    return targetQOP;
+  }
+
   /**
    * Sends client SASL negotiation for general-purpose handshake.
    *
@@ -378,16 +428,36 @@ public class SaslDataTransferClient {
    */
   private IOStreamPair getSaslStreams(InetAddress addr,
       OutputStream underlyingOut, InputStream underlyingIn,
-      Token<BlockTokenIdentifier> accessToken)
+      Token<BlockTokenIdentifier> accessToken,
+      SecretKey secretKey)
       throws IOException {
     Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr);
 
+    // secretKey != null only happens when this is called by DN
+    // sending to downstream DN. If called from client, this will be null,
+    // as there is no key for client to generate mac instance.
+    // So that, if a different QOP is desired for inter-DN communication,
+    // the check below will use new QOP to create a secret, which includes
+    // the new QOP.
+    if (secretKey != null) {
+      String newQOP = conf
+          .get(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY);
+      if (newQOP != null) {
+        saslProps.put(Sasl.QOP, newQOP);
+      }
+      LOG.debug("DataNode overwriting downstream QOP " +
+          saslProps.get(Sasl.QOP));
+      byte[] newSecret = SecretManager.createPassword(
+          saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8), secretKey);
+      accessToken.setDNHandshakeSecret(newSecret);
+    }
+    targetQOP = saslProps.get(Sasl.QOP);
     String userName = buildUserName(accessToken);
     char[] password = buildClientPassword(accessToken);
     CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName,
         password);
     return doSaslHandshake(addr, underlyingOut, underlyingIn, userName,
-        saslProps, callbackHandler);
+        saslProps, callbackHandler, accessToken);
   }
 
   /**
@@ -431,8 +501,8 @@ public class SaslDataTransferClient {
    */
   private IOStreamPair doSaslHandshake(InetAddress addr,
       OutputStream underlyingOut, InputStream underlyingIn, String userName,
-      Map<String, String> saslProps,
-      CallbackHandler callbackHandler) throws IOException {
+      Map<String, String> saslProps, CallbackHandler callbackHandler,
+      Token<BlockTokenIdentifier> accessToken) throws IOException {
 
     DataOutputStream out = new DataOutputStream(underlyingOut);
     DataInputStream in = new DataInputStream(underlyingIn);
@@ -445,7 +515,22 @@ public class SaslDataTransferClient {
 
     try {
       // Start of handshake - "initial response" in SASL terminology.
-      sendSaslMessage(out, new byte[0]);
+      // The handshake secret can be null, this happens when client is running
+      // a new version but the cluster does not have this feature. In which case
+      // there will be no encrypted secret sent from NN.
+      byte[] handshakeSecret = accessToken.getDnHandshakeSecret();
+      if (handshakeSecret == null || handshakeSecret.length == 0) {
+        LOG.debug("Handshake secret is null, sending without "
+            + "handshake secret.");
+        sendSaslMessage(out, new byte[0]);
+      } else {
+        LOG.debug("Sending handshake secret.");
+        BlockTokenIdentifier identifier = new BlockTokenIdentifier();
+        identifier.readFields(new DataInputStream(
+            new ByteArrayInputStream(accessToken.getIdentifier())));
+        String bpid = identifier.getBlockPoolId();
+        sendSaslMessageHandshakeSecret(out, new byte[0], handshakeSecret, bpid);
+      }
 
       // step 1
       byte[] remoteResponse = readSaslMessage(in);
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
index a091d41..135bab1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto
@@ -43,6 +43,12 @@ message DataTransferEncryptorMessageProto {
   optional bytes payload = 2;
   optional string message = 3;
   repeated CipherOptionProto cipherOption = 4;
+  optional HandshakeSecretProto handshakeSecret = 5;
+}
+
+message HandshakeSecretProto {
+  required bytes secret = 1;
+  required string bpid = 2;
 }
 
 message BaseHeaderProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index c3eec2b..23198ac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -835,6 +835,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
 
   // Security-related configs
   public static final String DFS_ENCRYPT_DATA_TRANSFER_KEY = "dfs.encrypt.data.transfer";
+  public static final String DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY =
+      "dfs.encrypt.data.overwrite.downstream.derived.qop";
+  public static final boolean DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT =
+      false;
   public static final boolean DFS_ENCRYPT_DATA_TRANSFER_DEFAULT = false;
   public static final String DFS_XFRAME_OPTION_ENABLED = "dfs.xframe.enabled";
   public static final boolean DFS_XFRAME_OPTION_ENABLED_DEFAULT = true;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
index e67d873..eb26e99 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
@@ -21,15 +21,18 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSF
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
 import static org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import javax.crypto.SecretKey;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -37,6 +40,7 @@ import javax.security.auth.callback.PasswordCallback;
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
+import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 
 import org.apache.commons.codec.binary.Base64;
@@ -48,12 +52,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,6 +85,10 @@ public class SaslDataTransferServer {
   private final BlockPoolTokenSecretManager blockPoolTokenSecretManager;
   private final DNConf dnConf;
 
+  // Store the most recent successfully negotiated QOP,
+  // for testing purpose only
+  private String negotiatedQOP;
+
   /**
    * Creates a new SaslDataTransferServer.
    *
@@ -337,6 +348,26 @@ public class SaslDataTransferServer {
     return identifier;
   }
 
+  private String examineSecret(byte[] secret, String bpid) {
+    BlockKey blockKey = blockPoolTokenSecretManager.get(bpid).getCurrentKey();
+    SecretKey secretKey = blockKey.getKey();
+    for (SaslRpcServer.QualityOfProtection qop :
+        SaslRpcServer.QualityOfProtection.values()) {
+      String qopString = qop.getSaslQop();
+      byte[] data = qopString.getBytes(Charsets.UTF_8);
+      byte[] encryptedData = SecretManager.createPassword(data, secretKey);
+      if (Arrays.equals(encryptedData, secret)) {
+        return qopString;
+      }
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  public String getNegotiatedQOP() {
+    return negotiatedQOP;
+  }
+
   /**
    * This method actually executes the server-side SASL handshake.
    *
@@ -355,9 +386,6 @@ public class SaslDataTransferServer {
     DataInputStream in = new DataInputStream(underlyingIn);
     DataOutputStream out = new DataOutputStream(underlyingOut);
 
-    SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(saslProps,
-      callbackHandler);
-
     int magicNumber = in.readInt();
     if (magicNumber != SASL_TRANSFER_MAGIC_NUMBER) {
       throw new InvalidMagicNumberException(magicNumber, 
@@ -365,7 +393,23 @@ public class SaslDataTransferServer {
     }
     try {
       // step 1
-      byte[] remoteResponse = readSaslMessage(in);
+      SaslMessageWithHandshake message = readSaslMessageWithHandshakeSecret(in);
+      byte[] secret = message.getSecret();
+      String bpid = message.getBpid();
+      if (secret != null || bpid != null) {
+        // sanity check, if one is null, the other must also not be null
+        assert(secret != null && bpid != null);
+        String qop = examineSecret(secret, bpid);
+        if (qop != null) {
+          saslProps.put(Sasl.QOP, qop);
+        } else {
+          LOG.error("Unable to match secret to a QOP!");
+        }
+      }
+      SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(
+          saslProps, callbackHandler);
+
+      byte[] remoteResponse = message.getPayload();
       byte[] localResponse = sasl.evaluateChallengeOrResponse(remoteResponse);
       sendSaslMessage(out, localResponse);
 
@@ -379,6 +423,7 @@ public class SaslDataTransferServer {
       checkSaslComplete(sasl, saslProps);
 
       CipherOption cipherOption = null;
+      negotiatedQOP = sasl.getNegotiatedQop();
       if (sasl.isNegotiatedQopPrivacy()) {
         // Negotiate a cipher option
         Configuration conf = dnConf.getConf();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 113cdf8..c4fc4c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -32,6 +32,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_P
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
@@ -88,6 +90,7 @@ public class DNConf {
   final boolean syncOnClose;
   final boolean encryptDataTransfer;
   final boolean connectToDnViaHostname;
+  final boolean overwriteDownstreamDerivedQOP;
 
   final long readaheadLength;
   final long heartBeatInterval;
@@ -233,6 +236,9 @@ public class DNConf {
     this.encryptDataTransfer = getConf().getBoolean(
         DFS_ENCRYPT_DATA_TRANSFER_KEY,
         DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
+    this.overwriteDownstreamDerivedQOP = getConf().getBoolean(
+        DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY,
+        DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_DEFAULT);
     this.encryptionAlgorithm = getConf().get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
     this.trustedChannelResolver = TrustedChannelResolver.getInstance(getConf());
     this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver(
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 1fab84a..f70e49b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1472,6 +1472,11 @@ public class DataNode extends ReconfigurableBase
     return UUID.randomUUID().toString();
   }
 
+  @VisibleForTesting
+  public SaslDataTransferClient getSaslClient() {
+    return saslClient;
+  }
+
   /**
    * Verify that the DatanodeUuid has been initialized. If this is a new
    * datanode then we generate a new Datanode Uuid and persist it to disk.
@@ -1691,6 +1696,11 @@ public class DataNode extends ReconfigurableBase
     return streamingAddr.getPort();
   }
 
+  @VisibleForTesting
+  public SaslDataTransferServer getSaslServer() {
+    return saslServer;
+  }
+  
   /**
    * @return name useful for logging
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 1d8db52..43b9500 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -48,6 +48,9 @@ import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.ByteString;
+import javax.crypto.SecretKey;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -72,6 +75,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCirc
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException;
 import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException;
@@ -777,8 +781,16 @@ class DataXceiver extends Receiver implements Runnable {
           InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
           DataEncryptionKeyFactory keyFactory =
             datanode.getDataEncryptionKeyFactoryForBlock(block);
-          IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
-            unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
+          SecretKey secretKey = null;
+          if (dnConf.overwriteDownstreamDerivedQOP) {
+            String bpid = block.getBlockPoolId();
+            BlockKey blockKey = datanode.blockPoolTokenSecretManager
+                .get(bpid).getCurrentKey();
+            secretKey = blockKey.getKey();
+          }
+          IOStreamPair saslStreams = datanode.saslClient.socketSend(
+              mirrorSock, unbufMirrorOut, unbufMirrorIn, keyFactory,
+              blockToken, targets[0], secretKey);
           unbufMirrorOut = saslStreams.out;
           unbufMirrorIn = saslStreams.in;
           mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index fa4ba35..fae39ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4567,4 +4567,26 @@
       will use exactly the same QOP NameNode and client has already agreed on.
     </description>
   </property>
+
+  <property>
+    <name>dfs.encrypt.data.overwrite.downstream.derived.qop</name>
+    <value>false</value>
+    <description>
+      A boolean specifies whether DN should overwrite the downstream
+      QOP in a write pipeline. This is used in the case where client
+      talks to first DN with a QOP, but inter-DN communication needs to be
+      using a different QOP. If set to false, the default behaviour is that
+      inter-DN communication will use the same QOP as client-DN connection.
+    </description>
+  </property>
+
+  <property>
+    <name>dfs.encrypt.data.overwrite.downstream.new.qop</name>
+    <value></value>
+    <description>
+      When dfs.datanode.overwrite.downstream.derived.qop is set to true,
+      this configuration specifies the new QOP to be used to overwrite
+      inter-DN QOP.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
index 867fbac..45ccefa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHAAuxiliaryPort.java
@@ -37,7 +37,7 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestHAAuxiliaryPort {
   @Test
-  public void testTest() throws Exception {
+  public void testHAAuxiliaryPort() throws Exception {
     Configuration conf = new Configuration();
     conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "0,0");
     conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY + ".ha-nn-uri-0.nn1",
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
new file mode 100644
index 0000000..ca84557
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.net.URI;
+import java.util.ArrayList;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY;
+import static org.junit.Assert.*;
+
+
+/**
+ * This test tests access NameNode on different port with different
+ * configured QOP.
+ */
+public class TestMultipleNNPortQOP extends SaslDataTransferTestCase {
+
+  private static final Path PATH1  = new Path("/file1");
+  private static final Path PATH2  = new Path("/file2");
+  private static final Path PATH3  = new Path("/file3");
+  private static final int BLOCK_SIZE = 4096;
+  private static final int NUM_BLOCKS = 3;
+
+  private static HdfsConfiguration clusterConf;
+
+  @Before
+  public void setup() throws Exception {
+    clusterConf = createSecureConfig(
+        "authentication,integrity,privacy");
+    clusterConf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY,
+        "12000,12100,12200");
+    // explicitly setting service rpc for datanode. This because
+    // DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port
+    // and service port at the same time, and if no setting for service
+    // rpc, it would return client port, in this case, it will be the
+    // auxiliary port for data node. Which is not what auxiliary is for.
+    // setting service rpc port to avoid this.
+    clusterConf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020");
+    clusterConf.set(
+        CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
+        "org.apache.hadoop.security.IngressPortBasedResolver");
+    clusterConf.set("ingress.port.sasl.configured.ports", "12000,12100,12200");
+    clusterConf.set("ingress.port.sasl.prop.12000", "authentication");
+    clusterConf.set("ingress.port.sasl.prop.12100", "integrity");
+    clusterConf.set("ingress.port.sasl.prop.12200", "privacy");
+    clusterConf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
+  }
+
+  /**
+   * Test accessing NameNode from three different ports.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMultipleNNPort() throws Exception {
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(clusterConf)
+          .numDataNodes(3).build();
+
+      cluster.waitActive();
+      HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+      clientConf.unset(
+          CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
+      ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+
+      URI currentURI = cluster.getURI();
+      URI uriAuthPort = new URI(currentURI.getScheme() +
+          "://" + currentURI.getHost() + ":12000");
+      URI uriIntegrityPort = new URI(currentURI.getScheme() +
+          "://" + currentURI.getHost() + ":12100");
+      URI uriPrivacyPort = new URI(currentURI.getScheme() +
+          "://" + currentURI.getHost() + ":12200");
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
+      FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
+      doTest(fsPrivacy, PATH1);
+      for (DataNode dn : dataNodes) {
+        SaslDataTransferServer saslServer = dn.getSaslServer();
+        assertEquals("auth-conf", saslServer.getNegotiatedQOP());
+      }
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
+      FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
+      doTest(fsIntegrity, PATH2);
+      for (DataNode dn : dataNodes) {
+        SaslDataTransferServer saslServer = dn.getSaslServer();
+        assertEquals("auth-int", saslServer.getNegotiatedQOP());
+      }
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
+      FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
+      doTest(fsAuth, PATH3);
+      for (DataNode dn : dataNodes) {
+        SaslDataTransferServer saslServer = dn.getSaslServer();
+        assertEquals("auth", saslServer.getNegotiatedQOP());
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  /**
+   * Test accessing NameNode from three different ports, tests
+   * overwriting downstream DN in the pipeline.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testMultipleNNPortOverwriteDownStream() throws Exception {
+    clusterConf.set(DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_NEW_QOP_KEY, "auth");
+    clusterConf.setBoolean(
+        DFS_ENCRYPT_DATA_OVERWRITE_DOWNSTREAM_DERIVED_QOP_KEY, true);
+    MiniDFSCluster cluster = null;
+    try {
+      cluster =
+          new MiniDFSCluster.Builder(clusterConf).numDataNodes(3).build();
+      cluster.waitActive();
+      HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+      clientConf.unset(
+          CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
+      ArrayList<DataNode> dataNodes = cluster.getDataNodes();
+
+      URI currentURI = cluster.getURI();
+      URI uriAuthPort =
+          new URI(currentURI.getScheme() + "://" +
+              currentURI.getHost() + ":12000");
+      URI uriIntegrityPort =
+          new URI(currentURI.getScheme() + "://" +
+              currentURI.getHost() + ":12100");
+      URI uriPrivacyPort =
+          new URI(currentURI.getScheme() + "://" +
+              currentURI.getHost() + ":12200");
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
+      FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
+      doTest(fsPrivacy, PATH1);
+      // add a wait so that data has reached not only first DN,
+      // but also the rest
+      Thread.sleep(100);
+      for (int i = 0; i < 2; i++) {
+        DataNode dn = dataNodes.get(i);
+        SaslDataTransferClient saslClient = dn.getSaslClient();
+        assertEquals("auth", saslClient.getTargetQOP());
+      }
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
+      FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
+      doTest(fsIntegrity, PATH2);
+      Thread.sleep(100);
+      for (int i = 0; i < 2; i++) {
+        DataNode dn = dataNodes.get(i);
+        SaslDataTransferClient saslClient = dn.getSaslClient();
+        assertEquals("auth", saslClient.getTargetQOP());
+      }
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
+      FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
+      doTest(fsAuth, PATH3);
+      Thread.sleep(100);
+      for (int i = 0; i < 3; i++) {
+        DataNode dn = dataNodes.get(i);
+        SaslDataTransferServer saslServer = dn.getSaslServer();
+        assertEquals("auth", saslServer.getNegotiatedQOP());
+      }
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private void doTest(FileSystem fs, Path path) throws Exception {
+    FileSystemTestHelper.createFile(fs, path, NUM_BLOCKS, BLOCK_SIZE);
+    assertArrayEquals(FileSystemTestHelper.getFileData(NUM_BLOCKS, BLOCK_SIZE),
+        DFSTestUtil.readFile(fs, path).getBytes("UTF-8"));
+    BlockLocation[] blockLocations = fs.getFileBlockLocations(path, 0,
+        Long.MAX_VALUE);
+    assertNotNull(blockLocations);
+    assertEquals(NUM_BLOCKS, blockLocations.length);
+    for (BlockLocation blockLocation: blockLocations) {
+      assertNotNull(blockLocation.getHosts());
+      assertEquals(3, blockLocation.getHosts().length);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 05/05: HDFS-14611. Move handshake secret field from Token to BlockAccessToken. Contributed by Chen Liang

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 1ff68d0e475ff7420220f45ccd5c408df06b2050
Author: Chen Liang <cl...@apache.org>
AuthorDate: Tue Sep 3 17:18:41 2019 -0700

    HDFS-14611. Move handshake secret field from Token to BlockAccessToken. Contributed by Chen Liang
---
 .../org/apache/hadoop/security/token/Token.java    | 29 +++--------
 .../hadoop-common/src/main/proto/Security.proto    |  1 -
 .../datatransfer/sasl/SaslDataTransferClient.java  | 57 ++++++++++++++--------
 .../hadoop/hdfs/protocolPB/PBHelperClient.java     | 16 ++----
 .../security/token/block/BlockTokenIdentifier.java | 25 ++++++++++
 .../datatransfer/sasl/SaslDataTransferServer.java  | 28 +----------
 .../token/block/BlockTokenSecretManager.java       | 42 +++++++++-------
 .../hdfs/server/blockmanagement/BlockManager.java  | 11 +++--
 .../hadoop/hdfs/server/datanode/DataNode.java      |  2 +-
 .../hadoop/hdfs/server/datanode/DataXceiver.java   |  2 -
 .../hdfs/server/namenode/NameNodeRpcServer.java    | 47 +-----------------
 .../hadoop/hdfs/TestBlockTokenWrappingQOP.java     | 50 +++++--------------
 12 files changed, 122 insertions(+), 188 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
index 572b6f3..b79a018 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
@@ -52,8 +52,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
   private Text service;
   private TokenRenewer renewer;
 
-  private byte[] dnHandshakeSecret;
-
   /**
    * Construct a token given a token identifier and a secret manager for the
    * type of the token identifier.
@@ -65,7 +63,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
     identifier = id.getBytes();
     kind = id.getKind();
     service = new Text();
-    dnHandshakeSecret = new byte[0];
   }
  
   /**
@@ -80,7 +77,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
     this.password = (password == null)? new byte[0] : password;
     this.kind = (kind == null)? new Text() : kind;
     this.service = (service == null)? new Text() : service;
-    this.dnHandshakeSecret = new byte[0];
   }
 
   /**
@@ -91,7 +87,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
     password = new byte[0];
     kind = new Text();
     service = new Text();
-    dnHandshakeSecret = new byte[0];
   }
 
   /**
@@ -103,7 +98,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
     this.password = other.password.clone();
     this.kind = new Text(other.kind);
     this.service = new Text(other.service);
-    this.dnHandshakeSecret = other.dnHandshakeSecret.clone();
   }
 
   /**
@@ -114,14 +108,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
     return identifier;
   }
 
-  public byte[] getDnHandshakeSecret() {
-    return dnHandshakeSecret;
-  }
-
-  public void setDNHandshakeSecret(byte[] secret) {
-    this.dnHandshakeSecret = secret;
-  }
-
   private static Class<? extends TokenIdentifier>
       getClassForIdentifier(Text kind) {
     Class<? extends TokenIdentifier> cls = null;
@@ -204,6 +190,14 @@ public class Token<T extends TokenIdentifier> implements Writable {
     service = newService;
   }
 
+  public void setID(byte[] bytes) {
+    identifier = bytes;
+  }
+
+  public void setPassword(byte[] newPassword) {
+    password = newPassword;
+  }
+
   /**
    * Whether this is a private token.
    * @return false always for non-private tokens
@@ -304,11 +298,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
     in.readFully(password);
     kind.readFields(in);
     service.readFields(in);
-    len = WritableUtils.readVInt(in);
-    if (dnHandshakeSecret == null || dnHandshakeSecret.length != len) {
-      dnHandshakeSecret = new byte[len];
-    }
-    in.readFully(dnHandshakeSecret);
   }
 
   @Override
@@ -319,8 +308,6 @@ public class Token<T extends TokenIdentifier> implements Writable {
     out.write(password);
     kind.write(out);
     service.write(out);
-    WritableUtils.writeVInt(out, dnHandshakeSecret.length);
-    out.write(dnHandshakeSecret);
   }
 
   /**
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/Security.proto b/hadoop-common-project/hadoop-common/src/main/proto/Security.proto
index c5844da..5ff571d 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/Security.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/Security.proto
@@ -36,7 +36,6 @@ message TokenProto {
   required bytes password = 2;
   required string kind = 3;
   required string service = 4;
-  optional bytes handshakeSecret = 5;
 }
 
 message GetDelegationTokenRequestProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
index 8d1c7f6..50bb39e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java
@@ -236,7 +236,7 @@ public class SaslDataTransferClient {
       throws IOException {
     boolean localTrusted = trustedChannelResolver.isTrusted();
     boolean remoteTrusted = trustedChannelResolver.isTrusted(addr);
-    LOG.info("SASL encryption trust check: localHostTrusted = {}, "
+    LOG.debug("SASL encryption trust check: localHostTrusted = {}, "
         + "remoteHostTrusted = {}", localTrusted, remoteTrusted);
     if (!localTrusted || !remoteTrusted) {
       // The encryption key factory only returns a key if encryption is enabled.
@@ -329,9 +329,7 @@ public class SaslDataTransferClient {
     if (secretKey != null) {
       LOG.debug("DataNode overwriting downstream QOP" +
           saslProps.get(Sasl.QOP));
-      byte[] newSecret =  SecretManager.createPassword(saslProps.get(Sasl.QOP)
-          .getBytes(Charsets.UTF_8), secretKey);
-      accessToken.setDNHandshakeSecret(newSecret);
+      updateToken(accessToken, secretKey, saslProps);
     }
 
     LOG.debug("Client using encryption algorithm {}",
@@ -447,9 +445,7 @@ public class SaslDataTransferClient {
       }
       LOG.debug("DataNode overwriting downstream QOP " +
           saslProps.get(Sasl.QOP));
-      byte[] newSecret = SecretManager.createPassword(
-          saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8), secretKey);
-      accessToken.setDNHandshakeSecret(newSecret);
+      updateToken(accessToken, secretKey, saslProps);
     }
     targetQOP = saslProps.get(Sasl.QOP);
     String userName = buildUserName(accessToken);
@@ -460,6 +456,18 @@ public class SaslDataTransferClient {
         saslProps, callbackHandler, accessToken);
   }
 
+  private void updateToken(Token<BlockTokenIdentifier> accessToken,
+      SecretKey secretKey, Map<String, String> saslProps)
+      throws IOException {
+    byte[] newSecret = saslProps.get(Sasl.QOP).getBytes(Charsets.UTF_8);
+    BlockTokenIdentifier bkid = accessToken.decodeIdentifier();
+    bkid.setHandshakeMsg(newSecret);
+    byte[] bkidBytes = bkid.getBytes();
+    accessToken.setPassword(
+        SecretManager.createPassword(bkidBytes, secretKey));
+    accessToken.setID(bkidBytes);
+  }
+
   /**
    * Builds the client's user name for the general-purpose handshake, consisting
    * of the base64-encoded serialized block access token identifier.  Note that
@@ -516,20 +524,29 @@ public class SaslDataTransferClient {
     try {
       // Start of handshake - "initial response" in SASL terminology.
       // The handshake secret can be null, this happens when client is running
-      // a new version but the cluster does not have this feature. In which case
-      // there will be no encrypted secret sent from NN.
-      byte[] handshakeSecret = accessToken.getDnHandshakeSecret();
-      if (handshakeSecret == null || handshakeSecret.length == 0) {
-        LOG.debug("Handshake secret is null, sending without "
-            + "handshake secret.");
-        sendSaslMessage(out, new byte[0]);
+      // a new version but the cluster does not have this feature.
+      // In which case there will be no encrypted secret sent from NN.
+      BlockTokenIdentifier blockTokenIdentifier =
+          accessToken.decodeIdentifier();
+      if (blockTokenIdentifier != null) {
+        byte[] handshakeSecret =
+            accessToken.decodeIdentifier().getHandshakeMsg();
+        if (handshakeSecret == null || handshakeSecret.length == 0) {
+          LOG.debug("Handshake secret is null, "
+              + "sending without handshake secret.");
+          sendSaslMessage(out, new byte[0]);
+        } else {
+          LOG.debug("Sending handshake secret.");
+          BlockTokenIdentifier identifier = new BlockTokenIdentifier();
+          identifier.readFields(new DataInputStream(
+              new ByteArrayInputStream(accessToken.getIdentifier())));
+          String bpid = identifier.getBlockPoolId();
+          sendSaslMessageHandshakeSecret(out, new byte[0],
+              handshakeSecret, bpid);
+        }
       } else {
-        LOG.debug("Sending handshake secret.");
-        BlockTokenIdentifier identifier = new BlockTokenIdentifier();
-        identifier.readFields(new DataInputStream(
-            new ByteArrayInputStream(accessToken.getIdentifier())));
-        String bpid = identifier.getBlockPoolId();
-        sendSaslMessageHandshakeSecret(out, new byte[0], handshakeSecret, bpid);
+        LOG.debug("Block token id is null, sending without handshake secret.");
+        sendSaslMessage(out, new byte[0]);
       }
 
       // step 1
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index a3c3bed..7694387 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -271,16 +271,11 @@ public class PBHelperClient {
   }
 
   public static TokenProto convert(Token<?> tok) {
-    TokenProto.Builder builder = TokenProto.newBuilder().
+    return TokenProto.newBuilder().
         setIdentifier(getByteString(tok.getIdentifier())).
         setPassword(getByteString(tok.getPassword())).
         setKindBytes(getFixedByteString(tok.getKind())).
-        setServiceBytes(getFixedByteString(tok.getService()));
-    if (tok.getDnHandshakeSecret() != null) {
-      builder.setHandshakeSecret(
-          ByteString.copyFrom(tok.getDnHandshakeSecret()));
-    }
-    return builder.build();
+        setServiceBytes(getFixedByteString(tok.getService())).build();
   }
 
   public static ShortCircuitShmIdProto convert(ShmId shmId) {
@@ -652,14 +647,9 @@ public class PBHelperClient {
 
   public static Token<BlockTokenIdentifier> convert(
       TokenProto blockToken) {
-    Token<BlockTokenIdentifier> token =
-        new Token<>(blockToken.getIdentifier()
+    return new Token<>(blockToken.getIdentifier()
         .toByteArray(), blockToken.getPassword().toByteArray(), new Text(
         blockToken.getKind()), new Text(blockToken.getService()));
-    if (blockToken.hasHandshakeSecret()) {
-      token.setDNHandshakeSecret(blockToken.getHandshakeSecret().toByteArray());
-    }
-    return token;
   }
 
   // DatanodeId
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
index 3f2c9ca..87c831a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.security.token.block;
 
 import java.io.DataInput;
 import java.io.DataOutput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.util.EnumSet;
 
@@ -44,6 +45,7 @@ public class BlockTokenIdentifier extends TokenIdentifier {
   private String blockPoolId;
   private long blockId;
   private final EnumSet<AccessMode> modes;
+  private byte[] handshakeMsg;
 
   private byte [] cache;
 
@@ -58,6 +60,7 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     this.blockPoolId = bpid;
     this.blockId = blockId;
     this.modes = modes == null ? EnumSet.noneOf(AccessMode.class) : modes;
+    this.handshakeMsg = new byte[0];
   }
 
   @Override
@@ -108,6 +111,15 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     return modes;
   }
 
+  public byte[] getHandshakeMsg() {
+    return handshakeMsg;
+  }
+
+  public void setHandshakeMsg(byte[] bytes) {
+    handshakeMsg = bytes;
+  }
+
+
   @Override
   public String toString() {
     return "block_token_identifier (expiryDate=" + this.getExpiryDate()
@@ -157,6 +169,15 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     for (int i = 0; i < length; i++) {
       modes.add(WritableUtils.readEnum(in, AccessMode.class));
     }
+    try {
+      int handshakeMsgLen = WritableUtils.readVInt(in);
+      if (handshakeMsgLen != 0) {
+        handshakeMsg = new byte[handshakeMsgLen];
+        in.readFully(handshakeMsg);
+      }
+    } catch (EOFException eof) {
+
+    }
   }
 
   @Override
@@ -170,6 +191,10 @@ public class BlockTokenIdentifier extends TokenIdentifier {
     for (AccessMode aMode : modes) {
       WritableUtils.writeEnum(out, aMode);
     }
+    if (handshakeMsg != null && handshakeMsg.length > 0) {
+      WritableUtils.writeVInt(out, handshakeMsg.length);
+      out.write(handshakeMsg);
+    }
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
index eb26e99..8854968 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferServer.java
@@ -28,11 +28,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import javax.crypto.SecretKey;
 import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
 import javax.security.auth.callback.NameCallback;
@@ -52,15 +50,12 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
-import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.security.SaslPropertiesResolver;
-import org.apache.hadoop.security.SaslRpcServer;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -348,21 +343,6 @@ public class SaslDataTransferServer {
     return identifier;
   }
 
-  private String examineSecret(byte[] secret, String bpid) {
-    BlockKey blockKey = blockPoolTokenSecretManager.get(bpid).getCurrentKey();
-    SecretKey secretKey = blockKey.getKey();
-    for (SaslRpcServer.QualityOfProtection qop :
-        SaslRpcServer.QualityOfProtection.values()) {
-      String qopString = qop.getSaslQop();
-      byte[] data = qopString.getBytes(Charsets.UTF_8);
-      byte[] encryptedData = SecretManager.createPassword(data, secretKey);
-      if (Arrays.equals(encryptedData, secret)) {
-        return qopString;
-      }
-    }
-    return null;
-  }
-
   @VisibleForTesting
   public String getNegotiatedQOP() {
     return negotiatedQOP;
@@ -399,12 +379,8 @@ public class SaslDataTransferServer {
       if (secret != null || bpid != null) {
         // sanity check, if one is null, the other must also not be null
         assert(secret != null && bpid != null);
-        String qop = examineSecret(secret, bpid);
-        if (qop != null) {
-          saslProps.put(Sasl.QOP, qop);
-        } else {
-          LOG.error("Unable to match secret to a QOP!");
-        }
+        String qop = new String(secret, Charsets.UTF_8);
+        saslProps.put(Sasl.QOP, qop);
       }
       SaslParticipant sasl = SaslParticipant.createServerSaslParticipant(
           saslProps, callbackHandler);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index 5b92776..a934232 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdfs.security.token.block;
 
+import com.google.common.base.Charsets;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
@@ -34,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
@@ -75,6 +77,7 @@ public class BlockTokenSecretManager extends
 
   private final int intRange;
   private final int nnRangeStart;
+  private final boolean shouldWrapQOP;
 
   private final SecureRandom nonceGenerator = new SecureRandom();
 
@@ -92,7 +95,7 @@ public class BlockTokenSecretManager extends
   public BlockTokenSecretManager(long keyUpdateInterval,
       long tokenLifetime, String blockPoolId, String encryptionAlgorithm) {
     this(false, keyUpdateInterval, tokenLifetime, blockPoolId,
-        encryptionAlgorithm, 0, 1);
+        encryptionAlgorithm, 0, 1, false);
   }
   
   /**
@@ -104,19 +107,29 @@ public class BlockTokenSecretManager extends
    * @param blockPoolId block pool ID
    * @param encryptionAlgorithm encryption algorithm to use
    * @param numNNs number of namenodes possible
+   * @param shouldWrapQOP should wrap QOP in the block access token
    */
   public BlockTokenSecretManager(long keyUpdateInterval,
       long tokenLifetime, int nnIndex, int numNNs,  String blockPoolId,
-      String encryptionAlgorithm) {
-    this(true, keyUpdateInterval, tokenLifetime, blockPoolId, encryptionAlgorithm, nnIndex, numNNs);
+      String encryptionAlgorithm, boolean shouldWrapQOP) {
+    this(true, keyUpdateInterval, tokenLifetime, blockPoolId,
+        encryptionAlgorithm, nnIndex, numNNs, shouldWrapQOP);
     Preconditions.checkArgument(nnIndex >= 0);
     Preconditions.checkArgument(numNNs > 0);
     setSerialNo(new SecureRandom().nextInt());
     generateKeys();
   }
-  
+
+  public BlockTokenSecretManager(long keyUpdateInterval,
+      long tokenLifetime, int nnIndex, int numNNs,  String blockPoolId,
+      String encryptionAlgorithm) {
+    this(keyUpdateInterval, tokenLifetime, nnIndex, numNNs, blockPoolId,
+        encryptionAlgorithm, false);
+  }
+
   private BlockTokenSecretManager(boolean isMaster, long keyUpdateInterval,
-      long tokenLifetime, String blockPoolId, String encryptionAlgorithm, int nnIndex, int numNNs) {
+      long tokenLifetime, String blockPoolId, String encryptionAlgorithm,
+      int nnIndex, int numNNs, boolean shouldWrapQOP) {
     this.intRange = Integer.MAX_VALUE / numNNs;
     this.nnRangeStart = intRange * nnIndex;
     this.isMaster = isMaster;
@@ -125,6 +138,7 @@ public class BlockTokenSecretManager extends
     this.allKeys = new HashMap<Integer, BlockKey>();
     this.blockPoolId = blockPoolId;
     this.encryptionAlgorithm = encryptionAlgorithm;
+    this.shouldWrapQOP = shouldWrapQOP;
     this.timer = new Timer();
     generateKeys();
   }
@@ -253,6 +267,12 @@ public class BlockTokenSecretManager extends
       ExtendedBlock block, EnumSet<BlockTokenIdentifier.AccessMode> modes) throws IOException {
     BlockTokenIdentifier id = new BlockTokenIdentifier(userId, block
         .getBlockPoolId(), block.getBlockId(), modes);
+    if (shouldWrapQOP) {
+      String qop = Server.getEstablishedQOP();
+      if (qop != null) {
+        id.setHandshakeMsg(qop.getBytes(Charsets.UTF_8));
+      }
+    }
     return new Token<BlockTokenIdentifier>(id, this);
   }
 
@@ -431,18 +451,6 @@ public class BlockTokenSecretManager extends
     return createPassword(nonce, key.getKey());
   }
 
-  /**
-   * Encrypt the given message with the current block key, using the current
-   * block key.
-   *
-   * @param message the message to be encrypted.
-   * @return the secret created by encrypting the given message.
-   */
-  public byte[] secretGen(byte[] message) {
-    return createPassword(message, currentKey.getKey());
-  }
-
-  @VisibleForTesting
   public BlockKey getCurrentKey() {
     return currentKey;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 8dd0e1d..1f87972 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -17,7 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.Time.now;
 
@@ -509,6 +509,9 @@ public class BlockManager implements BlockStatsMXBean {
     String nsId = DFSUtil.getNamenodeNameServiceId(conf);
     boolean isHaEnabled = HAUtil.isHAEnabled(conf, nsId);
 
+    boolean shouldWrapQOP = conf.getBoolean(
+        DFS_NAMENODE_SEND_QOP_ENABLED, DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT);
+
     if (isHaEnabled) {
       // figure out which index we are of the nns
       Collection<String> nnIds = DFSUtilClient.getNameNodeIds(conf, nsId);
@@ -521,10 +524,12 @@ public class BlockManager implements BlockStatsMXBean {
         nnIndex++;
       }
       return new BlockTokenSecretManager(updateMin * 60 * 1000L,
-          lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(), null, encryptionAlgorithm);
+          lifetimeMin * 60 * 1000L, nnIndex, nnIds.size(),
+          null, encryptionAlgorithm, shouldWrapQOP);
     } else {
       return new BlockTokenSecretManager(updateMin*60*1000L,
-          lifetimeMin*60*1000L, 0, 1, null, encryptionAlgorithm);
+          lifetimeMin*60*1000L, 0, 1,
+          null, encryptionAlgorithm, shouldWrapQOP);
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index f70e49b..b60ed53 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -1700,7 +1700,7 @@ public class DataNode extends ReconfigurableBase
   public SaslDataTransferServer getSaslServer() {
     return saslServer;
   }
-  
+
   /**
    * @return name useful for logging
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 43b9500..272dded 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -92,8 +92,6 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StopWatch;
 
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 584b849..2f1273c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -27,14 +27,11 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
 
 import static org.apache.hadoop.util.Time.now;
 
-import com.google.common.base.Charsets;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -135,8 +132,6 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -255,8 +250,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   
   private final String minimumDataNodeVersion;
 
-  private final boolean shouldSendQOP;
-
   public NameNodeRpcServer(Configuration conf, NameNode nn)
       throws IOException {
     this.nn = nn;
@@ -534,8 +527,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
         this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
       }
     }
-    this.shouldSendQOP = conf.getBoolean(
-        DFS_NAMENODE_SEND_QOP_ENABLED, DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT);
   }
 
   /** Allow access to the lifeline RPC server for testing */
@@ -738,11 +729,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     metrics.incrGetBlockLocations();
     LocatedBlocks locatedBlocks =
         namesystem.getBlockLocations(getClientMachine(), src, offset, length);
-    if (shouldSendQOP) {
-      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
-        wrapEstablishedQOP(lb, getEstablishedClientQOP());
-      }
-    }
     return locatedBlocks;
   }
   
@@ -816,9 +802,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       RetryCache.setState(cacheEntry, success, info);
     }
     metrics.incrFilesAppended();
-    if (shouldSendQOP) {
-      wrapEstablishedQOP(info.getLastBlock(), getEstablishedClientQOP());
-    }
     return info;
   }
 
@@ -887,9 +870,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     if (locatedBlock != null) {
       metrics.incrAddBlockOps();
     }
-    if (shouldSendQOP) {
-      wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
-    }
     return locatedBlock;
   }
 
@@ -923,9 +903,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     LocatedBlock locatedBlock = namesystem.getAdditionalDatanode(src, fileId,
         blk, existings, existingStorageIDs, excludeSet, numAdditionalNodes,
         clientName);
-    if (shouldSendQOP) {
-      wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
-    }
     return locatedBlock;
   }
   /**
@@ -1794,7 +1771,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
    *
    * @return the established QOP of this client.
    */
-  private static String getEstablishedClientQOP() {
+  public static String getEstablishedClientQOP() {
     return Server.getEstablishedQOP();
   }
 
@@ -2344,26 +2321,4 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     namesystem.checkSuperuserPrivilege();
     return Lists.newArrayList(nn.getReconfigurableProperties());
   }
-
-
-  /**
-   * Wrapping the QOP information into the LocatedBlock instance.
-   * The wrapped QOP will be used by DataNode, i.e. DataNode will simply use
-   * this QOP to accept client calls, because this this QOP is viewed
-   * as the QOP that NameNode has accepted.
-   *
-   * @param locatedBlock the LocatedBlock instance
-   * @param qop the QOP to wrap in
-   * @throws RuntimeException
-   */
-  private void wrapEstablishedQOP(LocatedBlock locatedBlock, String qop) {
-    if (qop == null || locatedBlock == null) {
-      return;
-    }
-    BlockTokenSecretManager btsm = namesystem.getBlockManager()
-        .getBlockTokenSecretManager();
-    Token<BlockTokenIdentifier> token = locatedBlock.getBlockToken();
-    byte[] secret = btsm.secretGen(qop.getBytes(Charsets.UTF_8));
-    token.setDNHandshakeSecret(secret);
-  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
index ea7ab97..1a292b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
-import javax.crypto.Mac;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.CreateFlag;
@@ -32,7 +31,6 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
-import org.apache.hadoop.hdfs.security.token.block.BlockKey;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.security.TestPermission;
 import org.junit.After;
@@ -41,8 +39,11 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.junit.Assert.*;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_RPC_PROTECTION;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 
 /**
@@ -55,7 +56,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
 
   private HdfsConfiguration conf;
   private MiniDFSCluster cluster;
-  private String encryptionAlgorithm;
   private DistributedFileSystem dfs;
 
   private String configKey;
@@ -84,7 +84,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
     conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
     conf.set(HADOOP_RPC_PROTECTION, this.configKey);
     cluster = null;
-    encryptionAlgorithm = "HmacSHA1";
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     cluster.waitActive();
   }
@@ -109,12 +108,8 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
 
     LocatedBlock lb = client.namenode.addBlock(src, clientName, null, null,
         HdfsConstants.GRANDFATHER_INODE_ID, null, null);
-    byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
-    BlockKey currentKey = cluster.getNamesystem().getBlockManager()
-        .getBlockTokenSecretManager().getCurrentKey();
-    String decrypted = decryptMessage(secret, currentKey,
-        encryptionAlgorithm);
-    assertEquals(this.qopValue, decrypted);
+    byte[] secret = lb.getBlockToken().decodeIdentifier().getHandshakeMsg();
+    assertEquals(this.qopValue, new String(secret));
   }
 
   @Test
@@ -137,12 +132,8 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
         new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
 
     byte[] secret = lastBlock.getLastBlock().getBlockToken()
-        .getDnHandshakeSecret();
-    BlockKey currentKey = cluster.getNamesystem().getBlockManager()
-        .getBlockTokenSecretManager().getCurrentKey();
-    String decrypted = decryptMessage(secret, currentKey,
-        encryptionAlgorithm);
-    assertEquals(this.qopValue, decrypted);
+        .decodeIdentifier().getHandshakeMsg();
+    assertEquals(this.qopValue, new String(secret));
   }
 
   @Test
@@ -164,27 +155,10 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
 
     assertTrue(lbs.getLocatedBlocks().size() > 0);
 
-    BlockKey currentKey = cluster.getNamesystem().getBlockManager()
-        .getBlockTokenSecretManager().getCurrentKey();
     for (LocatedBlock lb : lbs.getLocatedBlocks()) {
-      byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
-      String decrypted = decryptMessage(secret, currentKey,
-          encryptionAlgorithm);
-      assertEquals(this.qopValue, decrypted);
+      byte[] secret = lb.getBlockToken()
+          .decodeIdentifier().getHandshakeMsg();
+      assertEquals(this.qopValue, new String(secret));
     }
   }
-
-  private String decryptMessage(byte[] secret, BlockKey key,
-      String algorithm) throws Exception {
-    String[] qops = {"auth", "auth-conf", "auth-int"};
-    Mac mac = Mac.getInstance(algorithm);
-    mac.init(key.getKey());
-    for (String qop : qops) {
-      byte[] encrypted = mac.doFinal(qop.getBytes());
-      if (Arrays.equals(encrypted, secret)) {
-        return qop;
-      }
-    }
-    return null;
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 01/05: HDFS-13547. Add ingress port based sasl resolver. Contributed by Chen Liang.

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 94d71ba3b26d1904c5016261e710b6b8d8abce6d
Author: Chen Liang <cl...@apache.org>
AuthorDate: Tue Jun 5 11:51:29 2018 -0700

    HDFS-13547. Add ingress port based sasl resolver. Contributed by Chen Liang.
---
 .../hadoop/security/IngressPortBasedResolver.java  | 100 +++++++++++++++++++++
 .../hadoop/security/SaslPropertiesResolver.java    |  47 +++++++++-
 .../hadoop/security/WhitelistBasedResolver.java    |  20 +----
 .../security/TestIngressPortBasedResolver.java     |  59 ++++++++++++
 4 files changed, 207 insertions(+), 19 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/IngressPortBasedResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/IngressPortBasedResolver.java
new file mode 100644
index 0000000..a30e4a8
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/IngressPortBasedResolver.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation of SaslPropertiesResolver. Used on server side,
+ * returns SASL properties based on the port the client is connecting
+ * to. This should be used along with server side enabling multiple ports
+ * TODO: when NN multiple listener is enabled, automatically use this
+ * resolver without having to set in config.
+ *
+ * For configuration, for example if server runs on two ports 9000 and 9001,
+ * and we want to specify 9000 to use auth-conf and 9001 to use auth.
+ *
+ * We need to set the following configuration properties:
+ * ingress.port.sasl.configured.ports=9000,9001
+ * ingress.port.sasl.prop.9000=privacy
+ * ingress.port.sasl.prop.9001=authentication
+ *
+ * One note is that, if there is misconfiguration that a port, say, 9002 is
+ * given in ingress.port.sasl.configured.ports, but it's sasl prop is not
+ * set, a default of QOP of privacy (auth-conf) will be used. In addition,
+ * if a port is not given even in ingress.port.sasl.configured.ports, but
+ * is being checked in getServerProperties(), the default SASL prop will
+ * be returned. Both of these two cases are considered misconfiguration.
+ */
+public class IngressPortBasedResolver extends SaslPropertiesResolver {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(IngressPortBasedResolver.class.getName());
+
+  static final String INGRESS_PORT_SASL_PROP_PREFIX = "ingress.port.sasl.prop";
+
+  static final String INGRESS_PORT_SASL_CONFIGURED_PORTS =
+      "ingress.port.sasl.configured.ports";
+
+  // no need to concurrent map, because after setConf() it never change,
+  // only for read.
+  private HashMap<Integer, Map<String, String>> portPropMapping;
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    portPropMapping = new HashMap<>();
+    Collection<String> portStrings =
+        conf.getTrimmedStringCollection(INGRESS_PORT_SASL_CONFIGURED_PORTS);
+    for (String portString : portStrings) {
+      int port = Integer.parseInt(portString);
+      String configKey = INGRESS_PORT_SASL_PROP_PREFIX + "." + portString;
+      Map<String, String> props = getSaslProperties(conf, configKey,
+          SaslRpcServer.QualityOfProtection.PRIVACY);
+      portPropMapping.put(port, props);
+    }
+    LOG.debug("Configured with port to QOP mapping as:" + portPropMapping);
+  }
+
+  /**
+   * Identify the Sasl Properties to be used for a connection with a client.
+   * @param clientAddress client's address
+   * @param ingressPort the port that the client is connecting
+   * @return the sasl properties to be used for the connection.
+   */
+  @Override
+  @VisibleForTesting
+  public Map<String, String> getServerProperties(InetAddress clientAddress,
+      int ingressPort) {
+    LOG.debug("Resolving SASL properties for " + clientAddress + " "
+        + ingressPort);
+    if (!portPropMapping.containsKey(ingressPort)) {
+      LOG.warn("An un-configured port is being requested " + ingressPort
+          + " using default");
+      return getDefaultProperties();
+    }
+    return portPropMapping.get(ingressPort);
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
index 305443c..64b86e3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslPropertiesResolver.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.security;
 
 import java.net.InetAddress;
-import java.util.Locale;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -96,6 +95,17 @@ public class SaslPropertiesResolver implements Configurable{
   }
 
   /**
+   * Identify the Sasl Properties to be used for a connection with a  client.
+   * @param clientAddress  client's address
+   * @param ingressPort the port that the client is connecting
+   * @return the sasl properties to be used for the connection.
+   */
+  public Map<String, String> getServerProperties(InetAddress clientAddress,
+      int ingressPort){
+    return properties;
+  }
+
+  /**
    * Identify the Sasl Properties to be used for a connection with a server.
    * @param serverAddress server's address
    * @return the sasl properties to be used for the connection.
@@ -103,4 +113,39 @@ public class SaslPropertiesResolver implements Configurable{
   public Map<String, String> getClientProperties(InetAddress serverAddress){
     return properties;
   }
+
+  /**
+   * Identify the Sasl Properties to be used for a connection with a server.
+   * @param serverAddress server's address
+   * @param ingressPort the port that is used to connect to server
+   * @return the sasl properties to be used for the connection.
+   */
+  public Map<String, String> getClientProperties(InetAddress serverAddress,
+      int ingressPort) {
+    return properties;
+  }
+
+  /**
+   * A util function to retrieve specific additional sasl property from config.
+   * Used by subclasses to read sasl properties used by themselves.
+   * @param conf the configuration
+   * @param configKey the config key to look for
+   * @param defaultQOP the default QOP if the key is missing
+   * @return sasl property associated with the given key
+   */
+  static Map<String, String> getSaslProperties(Configuration conf,
+      String configKey, QualityOfProtection defaultQOP) {
+    Map<String, String> saslProps = new TreeMap<>();
+    String[] qop = conf.getStrings(configKey, defaultQOP.toString());
+
+    for (int i=0; i < qop.length; i++) {
+      qop[i] = QualityOfProtection.valueOf(
+          StringUtils.toUpperCase(qop[i])).getSaslQop();
+    }
+
+    saslProps.put(Sasl.QOP, StringUtils.join(",", qop));
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+
+    return saslProps;
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/WhitelistBasedResolver.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/WhitelistBasedResolver.java
index a64c4de..5964886 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/WhitelistBasedResolver.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/WhitelistBasedResolver.java
@@ -20,15 +20,10 @@ package org.apache.hadoop.security;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Map;
-import java.util.TreeMap;
-
-import javax.security.sasl.Sasl;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
 import org.apache.hadoop.util.CombinedIPWhiteList;
-import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -134,18 +129,7 @@ public class WhitelistBasedResolver extends SaslPropertiesResolver {
   }
 
   static Map<String, String> getSaslProperties(Configuration conf) {
-    Map<String, String> saslProps =new TreeMap<String, String>();
-    String[] qop = conf.getStrings(HADOOP_RPC_PROTECTION_NON_WHITELIST,
-        QualityOfProtection.PRIVACY.toString());
-
-    for (int i=0; i < qop.length; i++) {
-      qop[i] = QualityOfProtection.valueOf(
-          StringUtils.toUpperCase(qop[i])).getSaslQop();
-    }
-
-    saslProps.put(Sasl.QOP, StringUtils.join(",", qop));
-    saslProps.put(Sasl.SERVER_AUTH, "true");
-
-    return saslProps;
+    return getSaslProperties(conf, HADOOP_RPC_PROTECTION_NON_WHITELIST,
+        QualityOfProtection.PRIVACY);
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestIngressPortBasedResolver.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestIngressPortBasedResolver.java
new file mode 100644
index 0000000..96c80af
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestIngressPortBasedResolver.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.security;
+
+import javax.security.sasl.Sasl;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Test class for IngressPortBasedResolver.
+ */
+public class TestIngressPortBasedResolver {
+
+  /**
+   * A simple test to test that for the configured ports, the resolver
+   * can return the current SASL properties.
+   */
+  @Test
+  public void testResolver() {
+    Configuration conf = new Configuration();
+    conf.set("ingress.port.sasl.configured.ports", "444,555,666,777");
+    conf.set("ingress.port.sasl.prop.444", "authentication");
+    conf.set("ingress.port.sasl.prop.555", "authentication,privacy");
+    conf.set("ingress.port.sasl.prop.666", "privacy");
+
+    IngressPortBasedResolver resolver = new IngressPortBasedResolver();
+    resolver.setConf(conf);
+
+    // the client address does not matter, give it a null
+    assertEquals("auth",
+        resolver.getServerProperties(null, 444).get(Sasl.QOP));
+    assertEquals("auth,auth-conf",
+        resolver.getServerProperties(null, 555).get(Sasl.QOP));
+    assertEquals("auth-conf",
+        resolver.getServerProperties(null, 666).get(Sasl.QOP));
+    assertEquals("auth-conf",
+        resolver.getServerProperties(null, 777).get(Sasl.QOP));
+    assertEquals("auth",
+        resolver.getServerProperties(null, 888).get(Sasl.QOP));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[hadoop] 03/05: HDFS-13617. Allow wrapping NN QOP into token in encrypted message. Contributed by Chen Liang

Posted by cl...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit e63619a6dd5b2ff1eef1613cdaede6f21cc697b4
Author: Chen Liang <cl...@apache.org>
AuthorDate: Wed Feb 13 12:40:31 2019 -0800

    HDFS-13617. Allow wrapping NN QOP into token in encrypted message. Contributed by Chen Liang
---
 .../main/java/org/apache/hadoop/ipc/Server.java    |  11 +-
 .../org/apache/hadoop/security/token/Token.java    |  25 ++-
 .../hadoop-common/src/main/proto/Security.proto    |   1 +
 .../hadoop/hdfs/protocolPB/PBHelperClient.java     |  16 +-
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |   4 +
 .../token/block/BlockTokenSecretManager.java       |  18 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java    |  67 +++++++-
 .../src/main/resources/hdfs-default.xml            |  11 ++
 .../hadoop/hdfs/TestBlockTokenWrappingQOP.java     | 190 +++++++++++++++++++++
 9 files changed, 325 insertions(+), 18 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index c23390f..f598c08 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -1777,6 +1777,7 @@ public abstract class Server {
     IpcConnectionContextProto connectionContext;
     String protocolName;
     SaslServer saslServer;
+    private String establishedQOP;
     private AuthMethod authMethod;
     private AuthProtocol authProtocol;
     private boolean saslContextEstablished;
@@ -1851,14 +1852,7 @@ public abstract class Server {
     }
 
     public String getEstablishedQOP() {
-      // In practice, saslServer should not be null when this is
-      // called. If it is null, it must be either some
-      // configuration mistake or it is called from unit test.
-      if (saslServer == null) {
-        LOG.warn("SASL server should not be null!");
-        return null;
-      }
-      return (String)saslServer.getNegotiatedProperty(Sasl.QOP);
+      return establishedQOP;
     }
     
     public void setLastContact(long lastContact) {
@@ -1998,6 +1992,7 @@ public abstract class Server {
       // do NOT enable wrapping until the last auth response is sent
       if (saslContextEstablished) {
         String qop = (String) saslServer.getNegotiatedProperty(Sasl.QOP);
+        establishedQOP = qop;
         // SASL wrapping is only used if the connection has a QOP, and
         // the value is not auth.  ex. auth-int & auth-priv
         useWrap = (qop != null && !"auth".equalsIgnoreCase(qop));
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
index 5a15b94..572b6f3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/Token.java
@@ -51,7 +51,9 @@ public class Token<T extends TokenIdentifier> implements Writable {
   private Text kind;
   private Text service;
   private TokenRenewer renewer;
-  
+
+  private byte[] dnHandshakeSecret;
+
   /**
    * Construct a token given a token identifier and a secret manager for the
    * type of the token identifier.
@@ -63,6 +65,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
     identifier = id.getBytes();
     kind = id.getKind();
     service = new Text();
+    dnHandshakeSecret = new byte[0];
   }
  
   /**
@@ -77,6 +80,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
     this.password = (password == null)? new byte[0] : password;
     this.kind = (kind == null)? new Text() : kind;
     this.service = (service == null)? new Text() : service;
+    this.dnHandshakeSecret = new byte[0];
   }
 
   /**
@@ -87,6 +91,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
     password = new byte[0];
     kind = new Text();
     service = new Text();
+    dnHandshakeSecret = new byte[0];
   }
 
   /**
@@ -98,6 +103,7 @@ public class Token<T extends TokenIdentifier> implements Writable {
     this.password = other.password.clone();
     this.kind = new Text(other.kind);
     this.service = new Text(other.service);
+    this.dnHandshakeSecret = other.dnHandshakeSecret.clone();
   }
 
   /**
@@ -107,7 +113,15 @@ public class Token<T extends TokenIdentifier> implements Writable {
   public byte[] getIdentifier() {
     return identifier;
   }
-  
+
+  public byte[] getDnHandshakeSecret() {
+    return dnHandshakeSecret;
+  }
+
+  public void setDNHandshakeSecret(byte[] secret) {
+    this.dnHandshakeSecret = secret;
+  }
+
   private static Class<? extends TokenIdentifier>
       getClassForIdentifier(Text kind) {
     Class<? extends TokenIdentifier> cls = null;
@@ -290,6 +304,11 @@ public class Token<T extends TokenIdentifier> implements Writable {
     in.readFully(password);
     kind.readFields(in);
     service.readFields(in);
+    len = WritableUtils.readVInt(in);
+    if (dnHandshakeSecret == null || dnHandshakeSecret.length != len) {
+      dnHandshakeSecret = new byte[len];
+    }
+    in.readFully(dnHandshakeSecret);
   }
 
   @Override
@@ -300,6 +319,8 @@ public class Token<T extends TokenIdentifier> implements Writable {
     out.write(password);
     kind.write(out);
     service.write(out);
+    WritableUtils.writeVInt(out, dnHandshakeSecret.length);
+    out.write(dnHandshakeSecret);
   }
 
   /**
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/Security.proto b/hadoop-common-project/hadoop-common/src/main/proto/Security.proto
index 5ff571d..c5844da 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/Security.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/Security.proto
@@ -36,6 +36,7 @@ message TokenProto {
   required bytes password = 2;
   required string kind = 3;
   required string service = 4;
+  optional bytes handshakeSecret = 5;
 }
 
 message GetDelegationTokenRequestProto {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index 7694387..a3c3bed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -271,11 +271,16 @@ public class PBHelperClient {
   }
 
   public static TokenProto convert(Token<?> tok) {
-    return TokenProto.newBuilder().
+    TokenProto.Builder builder = TokenProto.newBuilder().
         setIdentifier(getByteString(tok.getIdentifier())).
         setPassword(getByteString(tok.getPassword())).
         setKindBytes(getFixedByteString(tok.getKind())).
-        setServiceBytes(getFixedByteString(tok.getService())).build();
+        setServiceBytes(getFixedByteString(tok.getService()));
+    if (tok.getDnHandshakeSecret() != null) {
+      builder.setHandshakeSecret(
+          ByteString.copyFrom(tok.getDnHandshakeSecret()));
+    }
+    return builder.build();
   }
 
   public static ShortCircuitShmIdProto convert(ShmId shmId) {
@@ -647,9 +652,14 @@ public class PBHelperClient {
 
   public static Token<BlockTokenIdentifier> convert(
       TokenProto blockToken) {
-    return new Token<>(blockToken.getIdentifier()
+    Token<BlockTokenIdentifier> token =
+        new Token<>(blockToken.getIdentifier()
         .toByteArray(), blockToken.getPassword().toByteArray(), new Text(
         blockToken.getKind()), new Text(blockToken.getService()));
+    if (blockToken.hasHandshakeSecret()) {
+      token.setDNHandshakeSecret(blockToken.getHandshakeSecret().toByteArray());
+    }
+    return token;
   }
 
   // DatanodeId
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index e973bd3..c3eec2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -1057,6 +1057,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final String DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY =
       HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
 
+  public static final String DFS_NAMENODE_SEND_QOP_ENABLED =
+      "dfs.namenode.send.qop.enabled";
+  public static final boolean DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT = false;
+
   // dfs.client.retry confs are moved to HdfsClientConfigKeys.Retry 
   @Deprecated
   public static final String  DFS_CLIENT_RETRY_POLICY_ENABLED_KEY
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index 2ad8d08..5b92776 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -430,7 +430,23 @@ public class BlockTokenSecretManager extends
     }
     return createPassword(nonce, key.getKey());
   }
-  
+
+  /**
+   * Encrypt the given message with the current block key, using the current
+   * block key.
+   *
+   * @param message the message to be encrypted.
+   * @return the secret created by encrypting the given message.
+   */
+  public byte[] secretGen(byte[] message) {
+    return createPassword(message, currentKey.getKey());
+  }
+
+  @VisibleForTesting
+  public BlockKey getCurrentKey() {
+    return currentKey;
+  }
+
   @VisibleForTesting
   public synchronized void setKeyUpdateIntervalForTesting(long millis) {
     this.keyUpdateInterval = millis;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index aea811d..584b849 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -27,11 +27,14 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_DEPTH;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.MAX_PATH_LENGTH;
 
 import static org.apache.hadoop.util.Time.now;
 
+import com.google.common.base.Charsets;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -132,6 +135,8 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.ReconfigurationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@@ -250,6 +255,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   
   private final String minimumDataNodeVersion;
 
+  private final boolean shouldSendQOP;
+
   public NameNodeRpcServer(Configuration conf, NameNode nn)
       throws IOException {
     this.nn = nn;
@@ -527,6 +534,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
         this.clientRpcServer.addAuxiliaryListener(auxiliaryPort);
       }
     }
+    this.shouldSendQOP = conf.getBoolean(
+        DFS_NAMENODE_SEND_QOP_ENABLED, DFS_NAMENODE_SEND_QOP_ENABLED_DEFAULT);
   }
 
   /** Allow access to the lifeline RPC server for testing */
@@ -727,8 +736,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       throws IOException {
     checkNNStartup();
     metrics.incrGetBlockLocations();
-    return namesystem.getBlockLocations(getClientMachine(), 
-                                        src, offset, length);
+    LocatedBlocks locatedBlocks =
+        namesystem.getBlockLocations(getClientMachine(), src, offset, length);
+    if (shouldSendQOP) {
+      for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
+        wrapEstablishedQOP(lb, getEstablishedClientQOP());
+      }
+    }
+    return locatedBlocks;
   }
   
   @Override // ClientProtocol
@@ -801,6 +816,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
       RetryCache.setState(cacheEntry, success, info);
     }
     metrics.incrFilesAppended();
+    if (shouldSendQOP) {
+      wrapEstablishedQOP(info.getLastBlock(), getEstablishedClientQOP());
+    }
     return info;
   }
 
@@ -869,6 +887,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     if (locatedBlock != null) {
       metrics.incrAddBlockOps();
     }
+    if (shouldSendQOP) {
+      wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
+    }
     return locatedBlock;
   }
 
@@ -899,8 +920,13 @@ public class NameNodeRpcServer implements NamenodeProtocols {
         excludeSet.add(node);
       }
     }
-    return namesystem.getAdditionalDatanode(src, fileId, blk, existings,
-        existingStorageIDs, excludeSet, numAdditionalNodes, clientName);
+    LocatedBlock locatedBlock = namesystem.getAdditionalDatanode(src, fileId,
+        blk, existings, existingStorageIDs, excludeSet, numAdditionalNodes,
+        clientName);
+    if (shouldSendQOP) {
+      wrapEstablishedQOP(locatedBlock, getEstablishedClientQOP());
+    }
+    return locatedBlock;
   }
   /**
    * The client needs to give up on the block.
@@ -1761,6 +1787,17 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     return clientMachine;
   }
 
+  /**
+   * Return the QOP of the client that the current handler thread
+   * is handling. Assuming the negotiation is done at this point,
+   * otherwise returns null.
+   *
+   * @return the established QOP of this client.
+   */
+  private static String getEstablishedClientQOP() {
+    return Server.getEstablishedQOP();
+  }
+
   @Override
   public DataEncryptionKey getDataEncryptionKey() throws IOException {
     checkNNStartup();
@@ -2307,4 +2344,26 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     namesystem.checkSuperuserPrivilege();
     return Lists.newArrayList(nn.getReconfigurableProperties());
   }
+
+
+  /**
+   * Wrapping the QOP information into the LocatedBlock instance.
+   * The wrapped QOP will be used by DataNode, i.e. DataNode will simply use
+   * this QOP to accept client calls, because this this QOP is viewed
+   * as the QOP that NameNode has accepted.
+   *
+   * @param locatedBlock the LocatedBlock instance
+   * @param qop the QOP to wrap in
+   * @throws RuntimeException
+   */
+  private void wrapEstablishedQOP(LocatedBlock locatedBlock, String qop) {
+    if (qop == null || locatedBlock == null) {
+      return;
+    }
+    BlockTokenSecretManager btsm = namesystem.getBlockManager()
+        .getBlockTokenSecretManager();
+    Token<BlockTokenIdentifier> token = locatedBlock.getBlockToken();
+    byte[] secret = btsm.secretGen(qop.getBytes(Charsets.UTF_8));
+    token.setDNHandshakeSecret(secret);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 0a3b2d4..fa4ba35 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4556,4 +4556,15 @@
       Empty list indicates that auxiliary ports are disabled.
     </description>
   </property>
+
+  <property>
+    <name>dfs.namenode.send.qop.enabled</name>
+    <value>false</value>
+    <description>
+      A boolean specifies whether NameNode should encrypt the established QOP
+      and include it in block token. The encrypted QOP will be used by DataNode
+      as target QOP, overwriting DataNode configuration. This ensures DataNode
+      will use exactly the same QOP NameNode and client has already agreed on.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
new file mode 100644
index 0000000..ea7ab97
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import javax.crypto.Mac;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
+import org.apache.hadoop.hdfs.security.token.block.BlockKey;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.security.TestPermission;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.junit.Assert.*;
+
+
+/**
+ * This tests enabling NN sending the established QOP back to client,
+ * in encrypted message, using block access token key.
+ */
+@RunWith(Parameterized.class)
+public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
+  public static final Log LOG = LogFactory.getLog(TestPermission.class);
+
+  private HdfsConfiguration conf;
+  private MiniDFSCluster cluster;
+  private String encryptionAlgorithm;
+  private DistributedFileSystem dfs;
+
+  private String configKey;
+  private String qopValue;
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> qopSettings() {
+    // if configured with privacy, the negotiated QOP should auth-conf
+    // similarly for the other two
+    return Arrays.asList(new Object[][] {
+        {"privacy", "auth-conf"},
+        {"integrity", "auth-int"},
+        {"authentication", "auth"}
+    });
+  }
+
+  public TestBlockTokenWrappingQOP(String configKey, String qopValue) {
+    this.configKey = configKey;
+    this.qopValue = qopValue;
+  }
+
+  @Before
+  public void setup() throws Exception {
+    conf = createSecureConfig(this.configKey);
+    conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
+    conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
+    conf.set(HADOOP_RPC_PROTECTION, this.configKey);
+    cluster = null;
+    encryptionAlgorithm = "HmacSHA1";
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testAddBlockWrappingQOP() throws Exception {
+    final String src = "/testAddBlockWrappingQOP";
+    final Path path = new Path(src);
+
+    dfs = cluster.getFileSystem();
+    dfs.create(path);
+
+    DFSClient client = dfs.getClient();
+    String clientName = client.getClientName();
+
+    LocatedBlock lb = client.namenode.addBlock(src, clientName, null, null,
+        HdfsConstants.GRANDFATHER_INODE_ID, null, null);
+    byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
+    BlockKey currentKey = cluster.getNamesystem().getBlockManager()
+        .getBlockTokenSecretManager().getCurrentKey();
+    String decrypted = decryptMessage(secret, currentKey,
+        encryptionAlgorithm);
+    assertEquals(this.qopValue, decrypted);
+  }
+
+  @Test
+  public void testAppendWrappingQOP() throws Exception {
+    final String src = "/testAppendWrappingQOP";
+    final Path path = new Path(src);
+
+    dfs = cluster.getFileSystem();
+    FSDataOutputStream out = dfs.create(path);
+    // NameNode append call returns a last block instance. If there is nothing
+    // it returns as a null. So write something, so that lastBlock has
+    // something
+    out.write(0);
+    out.close();
+
+    DFSClient client = dfs.getClient();
+    String clientName = client.getClientName();
+
+    LastBlockWithStatus lastBlock = client.namenode.append(src, clientName,
+        new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
+
+    byte[] secret = lastBlock.getLastBlock().getBlockToken()
+        .getDnHandshakeSecret();
+    BlockKey currentKey = cluster.getNamesystem().getBlockManager()
+        .getBlockTokenSecretManager().getCurrentKey();
+    String decrypted = decryptMessage(secret, currentKey,
+        encryptionAlgorithm);
+    assertEquals(this.qopValue, decrypted);
+  }
+
+  @Test
+  public void testGetBlockLocationWrappingQOP() throws Exception {
+    final String src = "/testGetBlockLocationWrappingQOP";
+    final Path path = new Path(src);
+
+    dfs = cluster.getFileSystem();
+    FSDataOutputStream out = dfs.create(path);
+    // if the file is empty, there will be no blocks returned. Write something
+    // so that getBlockLocations actually returns some block.
+    out.write(0);
+    out.close();
+
+    FileStatus status = dfs.getFileStatus(path);
+    DFSClient client = dfs.getClient();
+    LocatedBlocks lbs = client.namenode.getBlockLocations(
+        src, 0, status.getLen());
+
+    assertTrue(lbs.getLocatedBlocks().size() > 0);
+
+    BlockKey currentKey = cluster.getNamesystem().getBlockManager()
+        .getBlockTokenSecretManager().getCurrentKey();
+    for (LocatedBlock lb : lbs.getLocatedBlocks()) {
+      byte[] secret = lb.getBlockToken().getDnHandshakeSecret();
+      String decrypted = decryptMessage(secret, currentKey,
+          encryptionAlgorithm);
+      assertEquals(this.qopValue, decrypted);
+    }
+  }
+
+  private String decryptMessage(byte[] secret, BlockKey key,
+      String algorithm) throws Exception {
+    String[] qops = {"auth", "auth-conf", "auth-int"};
+    Mac mac = Mac.getInstance(algorithm);
+    mac.init(key.getKey());
+    for (String qop : qops) {
+      byte[] encrypted = mac.doFinal(qop.getBytes());
+      if (Arrays.equals(encrypted, secret)) {
+        return qop;
+      }
+    }
+    return null;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org