You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2022/03/14 17:42:47 UTC
[hadoop] 03/08: HDFS-15630. RBF: Fix wrong client IP info in CallerContext when requests mount points with multi-destinations. Contributed by Chengwei Wang
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 2479d4ab6ce0567d9476b778c5401a03b5970638
Author: Hui Fei <fe...@apache.org>
AuthorDate: Fri Oct 23 13:41:43 2020 +0800
HDFS-15630. RBF: Fix wrong client IP info in CallerContext when requests mount points with multi-destinations. Contributed by Chengwei Wang
Cherry-picked from 264c948e by Owen O'Malley
---
.../server/federation/router/RouterRpcClient.java | 60 +++++++++++++++++-----
.../router/TestRouterRpcMultiDestination.java | 43 ++++++++++++++++
2 files changed, 91 insertions(+), 12 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index cf73638..97d3fba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.Server.Call;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.security.UserGroupInformation;
@@ -414,7 +415,8 @@ public class RouterRpcClient {
" with params " + Arrays.deepToString(params) + " from "
+ router.getRouterId());
}
- appendClientIpToCallerContext();
+
+ appendClientIpToCallerContextIfAbsent();
Object ret = null;
if (rpcMonitor != null) {
@@ -531,17 +533,26 @@ public class RouterRpcClient {
}
/**
- * For Tracking which is the actual client address.
- * It adds key/value (clientIp/"ip") pair to the caller context.
+ * For tracking which is the actual client address.
+ * It adds trace info "clientIp:ip" to caller context if it's absent.
*/
- private void appendClientIpToCallerContext() {
+ private void appendClientIpToCallerContextIfAbsent() {
+ String clientIpInfo = CLIENT_IP_STR + ":" + Server.getRemoteAddress();
final CallerContext ctx = CallerContext.getCurrent();
- String origContext = ctx == null ? null : ctx.getContext();
- byte[] origSignature = ctx == null ? null : ctx.getSignature();
- CallerContext.setCurrent(
- new CallerContext.Builder(origContext, contextFieldSeparator)
- .append(CLIENT_IP_STR, Server.getRemoteAddress())
- .setSignature(origSignature).build());
+ if (isClientIpInfoAbsent(clientIpInfo, ctx)) {
+ String origContext = ctx == null ? null : ctx.getContext();
+ byte[] origSignature = ctx == null ? null : ctx.getSignature();
+ CallerContext.setCurrent(
+ new CallerContext.Builder(origContext, contextFieldSeparator)
+ .append(clientIpInfo)
+ .setSignature(origSignature)
+ .build());
+ }
+ }
+
+ private boolean isClientIpInfoAbsent(String clientIpInfo, CallerContext ctx){
+ return ctx == null || ctx.getContext() == null
+ || !ctx.getContext().contains(clientIpInfo);
}
/**
@@ -1257,6 +1268,9 @@ public class RouterRpcClient {
List<T> orderedLocations = new ArrayList<>();
List<Callable<Object>> callables = new ArrayList<>();
+ // transfer originCall & callerContext to worker threads of executor.
+ final Call originCall = Server.getCurCall().get();
+ final CallerContext originContext = CallerContext.getCurrent();
for (final T location : locations) {
String nsId = location.getNameserviceId();
final List<? extends FederationNamenodeContext> namenodes =
@@ -1274,12 +1288,20 @@ public class RouterRpcClient {
nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
}
orderedLocations.add(nnLocation);
- callables.add(() -> invokeMethod(ugi, nnList, proto, m, paramList));
+ callables.add(
+ () -> {
+ transferThreadLocalContext(originCall, originContext);
+ return invokeMethod(ugi, nnList, proto, m, paramList);
+ });
}
} else {
// Call the objectGetter in order of nameservices in the NS list
orderedLocations.add(location);
- callables.add(() -> invokeMethod(ugi, namenodes, proto, m, paramList));
+ callables.add(
+ () -> {
+ transferThreadLocalContext(originCall, originContext);
+ return invokeMethod(ugi, namenodes, proto, m, paramList);
+ });
}
}
@@ -1347,6 +1369,20 @@ public class RouterRpcClient {
}
/**
+ * Transfer origin thread local context which is necessary to current
+ * worker thread when invoking method concurrently by executor service.
+ *
+ * @param originCall origin Call required for getting remote client ip.
+ * @param originContext origin CallerContext which should be transferred
+ * to server side.
+ */
+ private void transferThreadLocalContext(
+ final Call originCall, final CallerContext originContext) {
+ Server.getCurCall().set(originCall);
+ CallerContext.setCurrent(originContext);
+ }
+
+ /**
* Get a prioritized list of NNs that share the same nameservice ID (in the
* same namespace). NNs that are reported as ACTIVE will be first in the list.
*
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
index bab1fd2..e50464c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
@@ -32,6 +32,7 @@ import static org.apache.hadoop.test.Whitebox.setInternalState;
import java.io.IOException;
import java.lang.reflect.Method;
+import java.net.InetAddress;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.EnumSet;
@@ -67,6 +68,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
@@ -434,4 +436,45 @@ public class TestRouterRpcMultiDestination extends TestRouterRpc {
setInternalState(ns0, "haContext", nn0haCtx);
setInternalState(router0ClientProtocol, "allowPartialList", true);
}
+
+ @Test
+ public void testCallerContextWithMultiDestinations() throws IOException {
+ GenericTestUtils.LogCapturer auditLog =
+ GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
+
+ // set client context
+ CallerContext.setCurrent(
+ new CallerContext.Builder("clientContext").build());
+ // assert the initial caller context as expected
+ assertEquals("clientContext", CallerContext.getCurrent().getContext());
+
+ DistributedFileSystem routerFs =
+ (DistributedFileSystem) getRouterFileSystem();
+ // create a directory via the router
+ Path dirPath = new Path("/test_caller_context_with_multi_destinations");
+ routerFs.mkdirs(dirPath);
+ // invoke concurrently in RouterRpcClient
+ routerFs.listStatus(dirPath);
+ // invoke sequentially in RouterRpcClient
+ routerFs.getFileStatus(dirPath);
+
+ String auditFlag = "src=" + dirPath.toString();
+ String clientIpInfo = "clientIp:"
+ + InetAddress.getLocalHost().getHostAddress();
+ for (String line : auditLog.getOutput().split("\n")) {
+ if (line.contains(auditFlag)) {
+ // assert origin caller context exist in audit log
+ assertTrue(line.contains("callerContext=clientContext"));
+ String callerContext = line.substring(
+ line.indexOf("callerContext=clientContext"));
+ // assert client ip info exist in caller context
+ assertTrue(callerContext.contains(clientIpInfo));
+ // assert client ip info appears only once in caller context
+ assertEquals(callerContext.indexOf(clientIpInfo),
+ callerContext.lastIndexOf(clientIpInfo));
+ }
+ }
+ // clear client context
+ CallerContext.setCurrent(null);
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org