You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2022/03/14 17:42:50 UTC
[hadoop] 06/08: HDFS-16310. RBF: Add client port to CallerContext for Router (#3635)
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 496657c63fc504dfeef47a3da56214800575367b
Author: litao <to...@gmail.com>
AuthorDate: Thu Nov 18 12:46:35 2021 +0800
HDFS-16310. RBF: Add client port to CallerContext for Router (#3635)
Cherry-picked from 5b05068f by Owen O'Malley
---
.../server/federation/router/RouterRpcClient.java | 34 +++++++--------
.../server/federation/router/TestRouterRpc.java | 48 ++++++++++++++++++++++
.../hadoop/hdfs/server/namenode/FSNamesystem.java | 7 ++--
3 files changed, 66 insertions(+), 23 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 97d3fba..351926b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -128,6 +128,7 @@ public class RouterRpcClient {
Pattern.compile("\\tat (.*)\\.(.*)\\((.*):(\\d*)\\)");
private static final String CLIENT_IP_STR = "clientIp";
+ private static final String CLIENT_PORT_STR = "clientPort";
/**
* Create a router RPC client to manage remote procedure calls to NNs.
@@ -416,7 +417,7 @@ public class RouterRpcClient {
+ router.getRouterId());
}
- appendClientIpToCallerContextIfAbsent();
+ appendClientIpPortToCallerContextIfAbsent();
Object ret = null;
if (rpcMonitor != null) {
@@ -534,25 +535,20 @@ public class RouterRpcClient {
/**
* For tracking which is the actual client address.
- * It adds trace info "clientIp:ip" to caller context if it's absent.
+ * It adds trace info "clientIp:ip" and "clientPort:port"
+ * to caller context if they are absent.
*/
- private void appendClientIpToCallerContextIfAbsent() {
- String clientIpInfo = CLIENT_IP_STR + ":" + Server.getRemoteAddress();
- final CallerContext ctx = CallerContext.getCurrent();
- if (isClientIpInfoAbsent(clientIpInfo, ctx)) {
- String origContext = ctx == null ? null : ctx.getContext();
- byte[] origSignature = ctx == null ? null : ctx.getSignature();
- CallerContext.setCurrent(
- new CallerContext.Builder(origContext, contextFieldSeparator)
- .append(clientIpInfo)
- .setSignature(origSignature)
- .build());
- }
- }
-
- private boolean isClientIpInfoAbsent(String clientIpInfo, CallerContext ctx){
- return ctx == null || ctx.getContext() == null
- || !ctx.getContext().contains(clientIpInfo);
+ private void appendClientIpPortToCallerContextIfAbsent() {
+ CallerContext ctx = CallerContext.getCurrent();
+ String origContext = ctx == null ? null : ctx.getContext();
+ byte[] origSignature = ctx == null ? null : ctx.getSignature();
+ CallerContext.setCurrent(
+ new CallerContext.Builder(origContext, contextFieldSeparator)
+ .appendIfAbsent(CLIENT_IP_STR, Server.getRemoteAddress())
+ .appendIfAbsent(CLIENT_PORT_STR,
+ Integer.toString(Server.getRemotePort()))
+ .setSignature(origSignature)
+ .build());
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index ceb291d..f2b86dc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -97,6 +97,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
@@ -1867,4 +1868,51 @@ public class TestRouterRpc {
.contains("callerContext=clientContext,clientIp:"));
assertTrue(verifyFileExists(routerFS, dirPath));
}
+
+ @Test
+ public void testSetBalancerBandwidth() throws Exception {
+ long defaultBandwidth =
+ DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT;
+ long newBandwidth = defaultBandwidth * 2;
+ routerProtocol.setBalancerBandwidth(newBandwidth);
+ ArrayList<DataNode> datanodes = cluster.getCluster().getDataNodes();
+ GenericTestUtils.waitFor(() -> {
+ return datanodes.get(0).getBalancerBandwidth() == newBandwidth;
+ }, 100, 60 * 1000);
+ }
+
+ @Test
+ public void testAddClientIpPortToCallerContext() throws IOException {
+ GenericTestUtils.LogCapturer auditLog =
+ GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
+
+ // 1. ClientIp and ClientPort are not set on the client.
+ // Set client context.
+ CallerContext.setCurrent(
+ new CallerContext.Builder("clientContext").build());
+
+ // Create a directory via the router.
+ String dirPath = "/test";
+ routerProtocol.mkdirs(dirPath, new FsPermission("755"), false);
+
+ // The audit log should contains "clientIp:" and "clientPort:".
+ assertTrue(auditLog.getOutput().contains("clientIp:"));
+ assertTrue(auditLog.getOutput().contains("clientPort:"));
+ assertTrue(verifyFileExists(routerFS, dirPath));
+ auditLog.clearOutput();
+
+ // 2. ClientIp and ClientPort are set on the client.
+ // Reset client context.
+ CallerContext.setCurrent(
+ new CallerContext.Builder(
+ "clientContext,clientIp:1.1.1.1,clientPort:1234").build());
+
+ // Create a directory via the router.
+ routerProtocol.getFileInfo(dirPath);
+
+ // The audit log should contains the original clientIp and clientPort
+ // set by client.
+ assertTrue(auditLog.getOutput().contains("clientIp:1.1.1.1"));
+ assertTrue(auditLog.getOutput().contains("clientPort:1234"));
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index e890322..964879b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -448,8 +448,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private void appendClientPortToCallerContextIfAbsent() {
final CallerContext ctx = CallerContext.getCurrent();
- if (isClientPortInfoAbsent(CLIENT_PORT_STR + ":" + Server.getRemotePort(),
- ctx)) {
+ if (isClientPortInfoAbsent(ctx)) {
String origContext = ctx == null ? null : ctx.getContext();
byte[] origSignature = ctx == null ? null : ctx.getSignature();
CallerContext.setCurrent(
@@ -460,9 +459,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
}
}
- private boolean isClientPortInfoAbsent(String clientPortInfo, CallerContext ctx){
+ private boolean isClientPortInfoAbsent(CallerContext ctx){
return ctx == null || ctx.getContext() == null
- || !ctx.getContext().contains(clientPortInfo);
+ || !ctx.getContext().contains(CLIENT_PORT_STR);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org