You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2020/02/04 20:41:19 UTC
[hadoop] branch branch-3.1 updated: HDFS-15148.
dfs.namenode.send.qop.enabled should not apply to primary NN port.
Contributed by Chen Liang.
This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 466f77a HDFS-15148. dfs.namenode.send.qop.enabled should not apply to primary NN port. Contributed by Chen Liang.
466f77a is described below
commit 466f77a5cfa2bbb53ed3250a50941436c6248bcb
Author: Chen Liang <cl...@apache.org>
AuthorDate: Tue Feb 4 12:34:19 2020 -0800
HDFS-15148. dfs.namenode.send.qop.enabled should not apply to primary NN port. Contributed by Chen Liang.
(cherry picked from commit 226cd3977bfc5948afff6a2d38a41e341089d5fe)
---
.../main/java/org/apache/hadoop/ipc/Server.java | 49 ++++++---
.../token/block/BlockTokenSecretManager.java | 2 +-
.../hdfs/server/namenode/NameNodeRpcServer.java | 11 --
.../hadoop/hdfs/TestBlockTokenWrappingQOP.java | 27 ++++-
.../apache/hadoop/hdfs/TestMultipleNNPortQOP.java | 120 +++++++++++++++++++--
5 files changed, 174 insertions(+), 35 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 4ab108b..4cacc59 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -381,20 +381,28 @@ public abstract class Server {
/**
* Returns the SASL qop for the current call, if the current call is
- * set, and the SASL negotiation is done. Otherwise return null. Note
- * that CurCall is thread local object. So in fact, different handler
- * threads will process different CurCall object.
+ * set, and the SASL negotiation is done. Otherwise return null
+ * Note this only returns established QOP for auxiliary port, and
+ * returns null for primary (non-auxiliary) port.
+ *
+ * Also note that CurCall is thread local object. So in fact, different
+ * handler threads will process different CurCall object.
*
* Also, only return for RPC calls, not supported for other protocols.
* @return the QOP of the current connection.
*/
- public static String getEstablishedQOP() {
+ public static String getAuxiliaryPortEstablishedQOP() {
Call call = CurCall.get();
- if (call == null || !(call instanceof RpcCall)) {
+ if (!(call instanceof RpcCall)) {
return null;
}
RpcCall rpcCall = (RpcCall)call;
- return rpcCall.connection.getEstablishedQOP();
+ if (rpcCall.connection.isOnAuxiliaryPort()) {
+ return rpcCall.connection.getEstablishedQOP();
+ } else {
+ // Not sending back QOP for primary port
+ return null;
+ }
}
/**
@@ -1181,7 +1189,8 @@ public abstract class Server {
private int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
-
+ private boolean isOnAuxiliaryPort;
+
Listener(int port) throws IOException {
address = new InetSocketAddress(bindAddress, port);
// Create a new server socket and set to non blocking mode
@@ -1208,6 +1217,11 @@ public abstract class Server {
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
this.setName("IPC Server listener on " + port);
this.setDaemon(true);
+ this.isOnAuxiliaryPort = false;
+ }
+
+ void setIsAuxiliary() {
+ this.isOnAuxiliaryPort = true;
}
private class Reader extends Thread {
@@ -1376,7 +1390,8 @@ public abstract class Server {
channel.socket().setKeepAlive(true);
Reader reader = getReader();
- Connection c = connectionManager.register(channel, this.listenPort);
+ Connection c = connectionManager.register(channel,
+ this.listenPort, this.isOnAuxiliaryPort);
// If the connectionManager can't take it, close the connection.
if (c == null) {
if (channel.isOpen()) {
@@ -1800,6 +1815,7 @@ public abstract class Server {
private int serviceClass;
private boolean shouldClose = false;
private int ingressPort;
+ private boolean isOnAuxiliaryPort;
UserGroupInformation user = null;
public UserGroupInformation attemptingUser = null; // user name before auth
@@ -1812,7 +1828,7 @@ public abstract class Server {
private boolean useWrap = false;
public Connection(SocketChannel channel, long lastContact,
- int ingressPort) {
+ int ingressPort, boolean isOnAuxiliaryPort) {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
@@ -1825,6 +1841,7 @@ public abstract class Server {
this.socket = channel.socket();
this.addr = socket.getInetAddress();
this.ingressPort = ingressPort;
+ this.isOnAuxiliaryPort = isOnAuxiliaryPort;
if (addr == null) {
this.hostAddress = "*Unknown*";
} else {
@@ -1870,7 +1887,11 @@ public abstract class Server {
public String getEstablishedQOP() {
return establishedQOP;
}
-
+
+ public boolean isOnAuxiliaryPort() {
+ return isOnAuxiliaryPort;
+ }
+
public void setLastContact(long lastContact) {
this.lastContact = lastContact;
}
@@ -3106,6 +3127,8 @@ public abstract class Server {
"There is already a listener binding to: " + auxiliaryPort);
}
Listener newListener = new Listener(auxiliaryPort);
+ newListener.setIsAuxiliary();
+
// in the case of port = 0, the listener would be on a != 0 port.
LOG.info("Adding a server listener on port " +
newListener.getAddress().getPort());
@@ -3725,11 +3748,13 @@ public abstract class Server {
return connections.toArray(new Connection[0]);
}
- Connection register(SocketChannel channel, int ingressPort) {
+ Connection register(SocketChannel channel, int ingressPort,
+ boolean isOnAuxiliaryPort) {
if (isFull()) {
return null;
}
- Connection connection = new Connection(channel, Time.now(), ingressPort);
+ Connection connection = new Connection(channel, Time.now(),
+ ingressPort, isOnAuxiliaryPort);
add(connection);
if (LOG.isDebugEnabled()) {
LOG.debug("Server connection from " + connection +
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
index 1c5c19b..ca20709 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenSecretManager.java
@@ -289,7 +289,7 @@ public class BlockTokenSecretManager extends
.getBlockPoolId(), block.getBlockId(), modes, storageTypes,
storageIds, useProto);
if (shouldWrapQOP) {
- String qop = Server.getEstablishedQOP();
+ String qop = Server.getAuxiliaryPortEstablishedQOP();
if (qop != null) {
id.setHandshakeMsg(qop.getBytes(Charsets.UTF_8));
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index a51fde6..ef9f5c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1850,17 +1850,6 @@ public class NameNodeRpcServer implements NamenodeProtocols {
return clientMachine;
}
- /**
- * Return the QOP of the client that the current handler thread
- * is handling. Assuming the negotiation is done at this point,
- * otherwise returns null.
- *
- * @return the established QOP of this client.
- */
- public static String getEstablishedClientQOP() {
- return Server.getEstablishedQOP();
- }
-
@Override
public DataEncryptionKey getDataEncryptionKey() throws IOException {
checkNNStartup();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
index 94b80e6..c224c49 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockTokenWrappingQOP.java
@@ -17,14 +17,17 @@
*/
package org.apache.hadoop.hdfs;
+import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
@@ -77,12 +80,33 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
@Before
public void setup() throws Exception {
conf = createSecureConfig(this.configKey);
+ conf.set(DFS_NAMENODE_RPC_ADDRESS_AUXILIARY_KEY, "12000");
+ // explicitly setting service rpc for datanode. This because
+ // DFSUtil.getNNServiceRpcAddressesForCluster looks up client facing port
+ // and service port at the same time, and if no setting for service
+ // rpc, it would return client port, in this case, it will be the
+ // auxiliary port for data node. Which is not what auxiliary is for.
+ // setting service rpc port to avoid this.
+ conf.set(DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, "localhost:9020");
+ conf.set(
+ CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS,
+ "org.apache.hadoop.security.IngressPortBasedResolver");
+ conf.set("ingress.port.sasl.configured.ports", "12000");
+ conf.set("ingress.port.sasl.prop.12000", this.configKey);
conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);
conf.setBoolean(DFS_NAMENODE_SEND_QOP_ENABLED, true);
conf.set(HADOOP_RPC_PROTECTION, this.configKey);
cluster = null;
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
+
+ HdfsConfiguration clientConf = new HdfsConfiguration(conf);
+ clientConf.unset(
+ CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
+ URI currentURI = cluster.getURI();
+ URI uriAuxiliary = new URI(currentURI.getScheme() +
+ "://" + currentURI.getHost() + ":12000");
+ dfs = (DistributedFileSystem) FileSystem.get(uriAuxiliary, conf);
}
@After
@@ -97,7 +121,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
final String src = "/testAddBlockWrappingQOP";
final Path path = new Path(src);
- dfs = cluster.getFileSystem();
dfs.create(path);
DFSClient client = dfs.getClient();
@@ -114,7 +137,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
final String src = "/testAppendWrappingQOP";
final Path path = new Path(src);
- dfs = cluster.getFileSystem();
FSDataOutputStream out = dfs.create(path);
// NameNode append call returns a last block instance. If there is nothing
// it returns as a null. So write something, so that lastBlock has
@@ -138,7 +160,6 @@ public class TestBlockTokenWrappingQOP extends SaslDataTransferTestCase {
final String src = "/testGetBlockLocationWrappingQOP";
final Path path = new Path(src);
- dfs = cluster.getFileSystem();
FSDataOutputStream out = dfs.create(path);
// if the file is empty, there will be no blocks returned. Write something
// so that getBlockLocations actually returns some block.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
index ca84557..db42dcc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultipleNNPortQOP.java
@@ -21,13 +21,17 @@ import java.net.URI;
import java.util.ArrayList;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.security.token.Token;
import org.junit.Before;
import org.junit.Test;
@@ -78,6 +82,85 @@ public class TestMultipleNNPortQOP extends SaslDataTransferTestCase {
}
/**
+ * Test that when NameNode returns back its established QOP,
+ * it only does this for auxiliary port(s), not the primary port.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testAuxiliaryPortSendingQOP() throws Exception {
+ MiniDFSCluster cluster = null;
+
+ final String pathPrefix = "/filetestAuxiliaryPortSendingQOP";
+ try {
+ cluster = new MiniDFSCluster.Builder(clusterConf)
+ .numDataNodes(3).build();
+
+ cluster.waitActive();
+ HdfsConfiguration clientConf = new HdfsConfiguration(clusterConf);
+ clientConf.unset(
+ CommonConfigurationKeys.HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS);
+
+ URI currentURI = cluster.getURI();
+ URI uriAuthPort = new URI(currentURI.getScheme() + "://" +
+ currentURI.getHost() + ":12000");
+ URI uriIntegrityPort = new URI(currentURI.getScheme() + "://" +
+ currentURI.getHost() + ":12100");
+ URI uriPrivacyPort = new URI(currentURI.getScheme() +
+ "://" + currentURI.getHost() + ":12200");
+
+ // If connecting to primary port, block token should not include
+ // handshake secret
+ byte[] secretOnPrimary = getHandshakeSecret(currentURI, clientConf,
+ new Path(pathPrefix + "Primary"));
+ assertTrue(secretOnPrimary == null || secretOnPrimary.length == 0);
+
+ // If connecting to auxiliary port, block token should include
+ // handshake secret
+ clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
+ byte[] secretPrivacy = getHandshakeSecret(uriPrivacyPort, clientConf,
+ new Path(pathPrefix + "Privacy"));
+ assertTrue(secretPrivacy.length > 0);
+
+ clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
+ byte[] secretIntegrity = getHandshakeSecret(uriIntegrityPort, clientConf,
+ new Path(pathPrefix + "Integrity"));
+ assertTrue(secretIntegrity.length > 0);
+
+ clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
+ byte[] secretAuthentication = getHandshakeSecret(uriAuthPort,
+ clientConf, new Path(pathPrefix + "Authentication"));
+ assertTrue(secretAuthentication.length > 0);
+
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private byte[] getHandshakeSecret(URI uri, HdfsConfiguration conf,
+ Path path) throws Exception {
+ FileSystem fs = FileSystem.get(uri, conf);
+ FSDataOutputStream out = fs.create(
+ path, false, 4096, (short)1, BLOCK_SIZE);
+ try {
+ out.write(0);
+ out.hflush();
+ Token<BlockTokenIdentifier> token = DFSTestUtil.getBlockToken(out);
+ final byte[] tokenBytes = token.getIdentifier();
+ DataInputBuffer dib = new DataInputBuffer();
+
+ dib.reset(tokenBytes, tokenBytes.length);
+ BlockTokenIdentifier blockToken = new BlockTokenIdentifier();
+ blockToken.readFields(dib);
+ return blockToken.getHandshakeMsg();
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
* Test accessing NameNode from three different ports.
*
* @throws Exception
@@ -168,33 +251,54 @@ public class TestMultipleNNPortQOP extends SaslDataTransferTestCase {
clientConf.set(HADOOP_RPC_PROTECTION, "privacy");
FileSystem fsPrivacy = FileSystem.get(uriPrivacyPort, clientConf);
doTest(fsPrivacy, PATH1);
- // add a wait so that data has reached not only first DN,
- // but also the rest
- Thread.sleep(100);
for (int i = 0; i < 2; i++) {
DataNode dn = dataNodes.get(i);
SaslDataTransferClient saslClient = dn.getSaslClient();
- assertEquals("auth", saslClient.getTargetQOP());
+ String qop = null;
+ // It may take some time for the qop to populate
+ // to all DNs, check in a loop.
+ for (int trial = 0; trial < 10; trial++) {
+ qop = saslClient.getTargetQOP();
+ if (qop != null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ assertEquals("auth", qop);
}
clientConf.set(HADOOP_RPC_PROTECTION, "integrity");
FileSystem fsIntegrity = FileSystem.get(uriIntegrityPort, clientConf);
doTest(fsIntegrity, PATH2);
- Thread.sleep(100);
for (int i = 0; i < 2; i++) {
DataNode dn = dataNodes.get(i);
SaslDataTransferClient saslClient = dn.getSaslClient();
- assertEquals("auth", saslClient.getTargetQOP());
+ String qop = null;
+ for (int trial = 0; trial < 10; trial++) {
+ qop = saslClient.getTargetQOP();
+ if (qop != null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ assertEquals("auth", qop);
}
clientConf.set(HADOOP_RPC_PROTECTION, "authentication");
FileSystem fsAuth = FileSystem.get(uriAuthPort, clientConf);
doTest(fsAuth, PATH3);
- Thread.sleep(100);
for (int i = 0; i < 3; i++) {
DataNode dn = dataNodes.get(i);
SaslDataTransferServer saslServer = dn.getSaslServer();
- assertEquals("auth", saslServer.getNegotiatedQOP());
+ String qop = null;
+ for (int trial = 0; trial < 10; trial++) {
+ qop = saslServer.getNegotiatedQOP();
+ if (qop != null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+ assertEquals("auth", qop);
}
} finally {
if (cluster != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org