You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2020/02/04 20:13:10 UTC

[hadoop] branch trunk updated: HDFS-15148. dfs.namenode.send.qop.enabled should not apply to primary NN port. Contributed by Chen Liang.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ce7b8b5  HDFS-15148. dfs.namenode.send.qop.enabled should not apply to primary NN port. Contributed by Chen Liang.
ce7b8b5 is described below

commit ce7b8b5634ef84602019cac4ce52337fbe4f9d42
Author: Chen Liang <cl...@apache.org>
AuthorDate: Tue Feb 4 12:12:35 2020 -0800

    HDFS-15148. dfs.namenode.send.qop.enabled should not apply to primary NN port. Contributed by Chen Liang.
---
 .../main/java/org/apache/hadoop/ipc/Server.java    |  49 ++++++---
 .../token/block/BlockTokenSecretManager.java       |   2 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java    |  11 --
 .../hadoop/hdfs/TestBlockTokenWrappingQOP.java     |  27 ++++-
 .../apache/hadoop/hdfs/TestMultipleNNPortQOP.java  | 120 +++++++++++++++++++--
 5 files changed, 174 insertions(+), 35 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 7cd3027..f4a124a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -382,20 +382,28 @@ public abstract class Server {
 
   /**
    * Returns the SASL qop for the current call, if the current call is
-   * set, and the SASL negotiation is done. Otherwise return null. Note
-   * that CurCall is thread local object. So in fact, different handler
-   * threads will process different CurCall object.
+   * set, and the SASL negotiation is done. Otherwise return null
+   * Note this only returns established QOP for auxiliary port, and
+   * returns null for primary (non-auxiliary) port.
+   *
+   * Also note that CurCall is thread local object. So in fact, different
+   * handler threads will process different CurCall object.
    *
    * Also, only return for RPC calls, not supported for other protocols.
    * @return the QOP of the current connection.
    */
-  public static String getEstablishedQOP() {
+  public static String getAuxiliaryPortEstablishedQOP() {
     Call call = CurCall.get();
-    if (call == null || !(call instanceof RpcCall)) {
+    if (!(call instanceof RpcCall)) {
       return null;
     }
     RpcCall rpcCall = (RpcCall)call;
-    return rpcCall.connection.getEstablishedQOP();
+    if (rpcCall.connection.isOnAuxiliaryPort()) {
+      return rpcCall.connection.getEstablishedQOP();
+    } else {
+      // Not sending back QOP for primary port
+      return null;
+    }
   }
 
   /**
@@ -1185,7 +1193,8 @@ public abstract class Server {
     private boolean reuseAddr = conf.getBoolean(
         CommonConfigurationKeysPublic.IPC_SERVER_REUSEADDR_KEY,
         CommonConfigurationKeysPublic.IPC_SERVER_REUSEADDR_DEFAULT);
-    
+    private boolean isOnAuxiliaryPort;
+
     Listener(int port) throws IOException {
       address = new InetSocketAddress(bindAddress, port);
       // Create a new server socket and set to non blocking mode
@@ -1213,6 +1222,11 @@ public abstract class Server {
       acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
       this.setName("IPC Server listener on " + port);
       this.setDaemon(true);
+      this.isOnAuxiliaryPort = false;
+    }
+
+    void setIsAuxiliary() {
+      this.isOnAuxiliaryPort = true;
     }
     
     private class Reader extends Thread {
@@ -1381,7 +1395,8 @@ public abstract class Server {
         channel.socket().setKeepAlive(true);
         
         Reader reader = getReader();
-        Connection c = connectionManager.register(channel, this.listenPort);
+        Connection c = connectionManager.register(channel,
+            this.listenPort, this.isOnAuxiliaryPort);
         // If the connectionManager can't take it, close the connection.
         if (c == null) {
           if (channel.isOpen()) {
@@ -1805,6 +1820,7 @@ public abstract class Server {
     private int serviceClass;
     private boolean shouldClose = false;
     private int ingressPort;
+    private boolean isOnAuxiliaryPort;
 
     UserGroupInformation user = null;
     public UserGroupInformation attemptingUser = null; // user name before auth
@@ -1817,7 +1833,7 @@ public abstract class Server {
     private boolean useWrap = false;
     
     public Connection(SocketChannel channel, long lastContact,
-        int ingressPort) {
+        int ingressPort, boolean isOnAuxiliaryPort) {
       this.channel = channel;
       this.lastContact = lastContact;
       this.data = null;
@@ -1830,6 +1846,7 @@ public abstract class Server {
       this.socket = channel.socket();
       this.addr = socket.getInetAddress();
       this.ingressPort = ingressPort;
+      this.isOnAuxiliaryPort = isOnAuxiliaryPort;
       if (addr == null) {
         this.hostAddress = "*Unknown*";
       } else {
@@ -1875,7 +1892,11 @@ public abstract class Server {
     public String getEstablishedQOP() {
       return establishedQOP;
     }
-    
+
+    public boolean isOnAuxiliaryPort() {
+      return isOnAuxiliaryPort;
+    }
+
     public void setLastContact(long lastContact) {
       this.lastContact = lastContact;
     }
@@ -3113,6 +3134,8 @@ public abstract class Server {
           "There is already a listener binding to: " + auxiliaryPort);
     }
     Listener newListener = new Listener(auxiliaryPort);
+    newListener.setIsAuxiliary();
+
     // in the case of port = 0, the listener would be on a != 0 port.
     LOG.info("Adding a server listener on port " +
         newListener.getAddress().getPort());
@@ -3732,11 +3755,13 @@ public abstract class Server {
       return connections.toArray(new Connection[0]);
     }
 
-    Connection register(SocketChannel channel, int ingressPort) {
+    Connection register(SocketChannel channel, int ingressPort,
+        boolean isOnAuxiliaryPort) {
       if (isFull()) {
         return null;
       }
-      Connection connection = new Connection(channel, Time.now(), ingressPort);
+      Connection connection = new Connection(channel, Time.now(),
+          ingressPort, isOnAuxiliaryPort);
       add(connection);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Server connection from " + connection +
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index a56074a..c01ab56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -290,7 +290,7 @@ public class BlockTokenSecretManager extends
         .getBlockPoolId(), block.getBlockId(), modes, storageTypes,
         storageIds, useProto);
     if (shouldWrapQOP) {
-      String qop = Server.getEstablishedQOP();
+      String qop = Server.getAuxiliaryPortEstablishedQOP();
       if (qop != null) {
         id.setHandshakeMsg(qop.getBytes(Charsets.UTF_8));
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index e8dace9..b403016 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1900,17 +1900,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     return clientMachine;
   }
 
-  /**
-   * Return the QOP of the client that the current handler thread
-   * is handling. Assuming the negotiation is done at this point,
-   * otherwise returns null.
-   *
-   * @return the established QOP of this client.
-   */
-  public static String getEstablishedClientQOP() {
-    return Server.getEstablishedQOP();
-  }
-
   @Override
   public DataEncryptionKey getDataEncryptionKey() throws IOException {
     checkNNStartup();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
index 94b80e6..c224c49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
@@ -17,14 +17,17 @@
  */
 package org.apache.hadoop.hdfs;
 
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.EnumSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@@ -77,12 +80,33 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
   @Before
   public void setup() throws Exception {
     conf = createSecureConfig(this.configKey);
+    conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "12000");
+    // explicitly setting service rpc for datanode. This because
+    // DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port
+    // and service port at the same time, and if no setting for service
+    // rpc, it would return client port, in this case, it will be the
+    // auxiliary port for data node. Which is not what auxiliary is for.
+    // setting service rpc port to avoid this.
+    conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020");
+    conf.set(
+        CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
+        "org.apache.hadoop.security.IngressPortBasedResolver");
+    conf.set("ingress.port.sasl.configured.ports", "12000");
+    conf.set("ingress.port.sasl.prop.12000", this.configKey);
     conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
     conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
     conf.set(HADOOP_RPC_PROTECTION, this.configKey);
     cluster = null;
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
     cluster.waitActive();
+
+    HdfsConfiguration clientConf = new HdfsConfiguration(conf);
+    clientConf.unset(
+        CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
+    URI currentURI = cluster.getURI();
+    URI uriAuxiliary = new URI(currentURI.getScheme() +
+        "://" + currentURI.getHost() + ":12000");
+    dfs = (DistributedFileSystem) FileSystem.get(uriAuxiliary, conf);
   }
 
   @After
@@ -97,7 +121,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
     final String src = "/testAddBlockWrappingQOP";
     final Path path = new Path(src);
 
-    dfs = cluster.getFileSystem();
     dfs.create(path);
 
     DFSClient client = dfs.getClient();
@@ -114,7 +137,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
     final String src = "/testAppendWrappingQOP";
     final Path path = new Path(src);
 
-    dfs = cluster.getFileSystem();
     FSDataOutputStream out = dfs.create(path);
     // NameNode append call returns a last block instance. If there is nothing
     // it returns as a null. So write something, so that lastBlock has
@@ -138,7 +160,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
     final String src = "/testGetBlockLocationWrappingQOP";
     final Path path = new Path(src);
 
-    dfs = cluster.getFileSystem();
     FSDataOutputStream out = dfs.create(path);
     // if the file is empty, there will be no blocks returned. Write something
     // so that getBlockLocations actually returns some block.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
index ca84557..db42dcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
@@ -21,13 +21,17 @@ import java.net.URI;
 import java.util.ArrayList;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
 import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.security.token.Token;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -78,6 +82,85 @@ public class TestMultipleNNPortQOP extends SaslDataTransferTestCase {
   }
 
   /**
+   * Test that when NameNode returns back its established QOP,
+   * it only does this for auxiliary port(s), not the primary port.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testAuxiliaryPortSendingQOP() throws Exception {
+    MiniDFSCluster cluster = null;
+
+    final String pathPrefix  = "/filetestAuxiliaryPortSendingQOP";
+    try {
+      cluster = new MiniDFSCluster.Builder(clusterConf)
+          .numDataNodes(3).build();
+
+      cluster.waitActive();
+      HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+      clientConf.unset(
+          CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
+
+      URI currentURI = cluster.getURI();
+      URI uriAuthPort = new URI(currentURI.getScheme() + "://" +
+              currentURI.getHost() + ":12000");
+      URI uriIntegrityPort = new URI(currentURI.getScheme() + "://" +
+              currentURI.getHost() + ":12100");
+      URI uriPrivacyPort = new URI(currentURI.getScheme() +
+          "://" + currentURI.getHost() + ":12200");
+
+      // If connecting to primary port, block token should not include
+      // handshake secret
+      byte[] secretOnPrimary = getHandshakeSecret(currentURI, clientConf,
+          new Path(pathPrefix + "Primary"));
+      assertTrue(secretOnPrimary == null || secretOnPrimary.length == 0);
+
+      // If connecting to auxiliary port, block token should include
+      // handshake secret
+      clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
+      byte[] secretPrivacy = getHandshakeSecret(uriPrivacyPort, clientConf,
+          new Path(pathPrefix + "Privacy"));
+      assertTrue(secretPrivacy.length > 0);
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
+      byte[] secretIntegrity = getHandshakeSecret(uriIntegrityPort, clientConf,
+          new Path(pathPrefix + "Integrity"));
+      assertTrue(secretIntegrity.length > 0);
+
+      clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
+      byte[] secretAuthentication = getHandshakeSecret(uriAuthPort,
+          clientConf, new Path(pathPrefix + "Authentication"));
+      assertTrue(secretAuthentication.length > 0);
+
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private byte[] getHandshakeSecret(URI uri, HdfsConfiguration conf,
+      Path path) throws Exception {
+    FileSystem fs = FileSystem.get(uri, conf);
+    FSDataOutputStream out = fs.create(
+        path, false, 4096, (short)1, BLOCK_SIZE);
+    try {
+      out.write(0);
+      out.hflush();
+      Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
+      final byte[] tokenBytes = token.getIdentifier();
+      DataInputBuffer dib = new DataInputBuffer();
+
+      dib.reset(tokenBytes, tokenBytes.length);
+      BlockTokenIdentifier blockToken = new BlockTokenIdentifier();
+      blockToken.readFields(dib);
+      return blockToken.getHandshakeMsg();
+    } finally {
+      out.close();
+    }
+  }
+
+  /**
    * Test accessing NameNode from three different ports.
    *
    * @throws Exception
@@ -168,33 +251,54 @@ public class TestMultipleNNPortQOP extends SaslDataTransferTestCase {
       clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
       FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
       doTest(fsPrivacy, PATH1);
-      // add a wait so that data has reached not only first DN,
-      // but also the rest
-      Thread.sleep(100);
       for (int i = 0; i < 2; i++) {
         DataNode dn = dataNodes.get(i);
         SaslDataTransferClient saslClient = dn.getSaslClient();
-        assertEquals("auth", saslClient.getTargetQOP());
+        String qop = null;
+        // It may take some time for the qop to populate
+        // to all DNs, check in a loop.
+        for (int trial = 0; trial < 10; trial++) {
+          qop = saslClient.getTargetQOP();
+          if (qop != null) {
+            break;
+          }
+          Thread.sleep(100);
+        }
+        assertEquals("auth", qop);
       }
 
       clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
       FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
       doTest(fsIntegrity, PATH2);
-      Thread.sleep(100);
       for (int i = 0; i < 2; i++) {
         DataNode dn = dataNodes.get(i);
         SaslDataTransferClient saslClient = dn.getSaslClient();
-        assertEquals("auth", saslClient.getTargetQOP());
+        String qop = null;
+        for (int trial = 0; trial < 10; trial++) {
+          qop = saslClient.getTargetQOP();
+          if (qop != null) {
+            break;
+          }
+          Thread.sleep(100);
+        }
+        assertEquals("auth", qop);
       }
 
       clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
       FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
       doTest(fsAuth, PATH3);
-      Thread.sleep(100);
       for (int i = 0; i < 3; i++) {
         DataNode dn = dataNodes.get(i);
         SaslDataTransferServer saslServer = dn.getSaslServer();
-        assertEquals("auth", saslServer.getNegotiatedQOP());
+        String qop = null;
+        for (int trial = 0; trial < 10; trial++) {
+          qop = saslServer.getNegotiatedQOP();
+          if (qop != null) {
+            break;
+          }
+          Thread.sleep(100);
+        }
+        assertEquals("auth", qop);
       }
     } finally {
       if (cluster != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org