You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2022/03/14 17:42:47 UTC

[hadoop] 03/08: HDFS-15630. RBF: Fix wrong client IP info in CallerContext when requests mount points with multi-destinations. Contributed by Chengwei Wang

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 2479d4ab6ce0567d9476b778c5401a03b5970638
Author: Hui Fei <fe...@apache.org>
AuthorDate: Fri Oct 23 13:41:43 2020 +0800

    HDFS-15630. RBF: Fix wrong client IP info in CallerContext when requests mount points with multi-destinations. Contributed by Chengwei Wang
    
    Cherry-picked from 264c948e by Owen O'Malley
---
 .../server/federation/router/RouterRpcClient.java  | 60 +++++++++++++++++-----
 .../router/TestRouterRpcMultiDestination.java      | 43 ++++++++++++++++
 2 files changed, 91 insertions(+), 12 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index cf73638..97d3fba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RetriableException;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.Server.Call;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -414,7 +415,8 @@ public class RouterRpcClient {
           " with params " + Arrays.deepToString(params) + " from "
           + router.getRouterId());
     }
-    appendClientIpToCallerContext();
+
+    appendClientIpToCallerContextIfAbsent();
 
     Object ret = null;
     if (rpcMonitor != null) {
@@ -531,17 +533,26 @@ public class RouterRpcClient {
   }
 
   /**
-   * For Tracking which is the actual client address.
-   * It adds key/value (clientIp/"ip") pair to the caller context.
+   * For tracking which is the actual client address.
+   * It adds trace info "clientIp:ip" to caller context if it's absent.
    */
-  private void appendClientIpToCallerContext() {
+  private void appendClientIpToCallerContextIfAbsent() {
+    String clientIpInfo = CLIENT_IP_STR + ":" + Server.getRemoteAddress();
     final CallerContext ctx = CallerContext.getCurrent();
-    String origContext = ctx == null ? null : ctx.getContext();
-    byte[] origSignature = ctx == null ? null : ctx.getSignature();
-    CallerContext.setCurrent(
-        new CallerContext.Builder(origContext, contextFieldSeparator)
-            .append(CLIENT_IP_STR, Server.getRemoteAddress())
-            .setSignature(origSignature).build());
+    if (isClientIpInfoAbsent(clientIpInfo, ctx)) {
+      String origContext = ctx == null ? null : ctx.getContext();
+      byte[] origSignature = ctx == null ? null : ctx.getSignature();
+      CallerContext.setCurrent(
+          new CallerContext.Builder(origContext, contextFieldSeparator)
+              .append(clientIpInfo)
+              .setSignature(origSignature)
+              .build());
+    }
+  }
+
+  private boolean isClientIpInfoAbsent(String clientIpInfo, CallerContext ctx){
+    return ctx == null || ctx.getContext() == null
+        || !ctx.getContext().contains(clientIpInfo);
   }
 
   /**
@@ -1257,6 +1268,9 @@ public class RouterRpcClient {
 
     List<T> orderedLocations = new ArrayList<>();
     List<Callable<Object>> callables = new ArrayList<>();
+    // transfer originCall & callerContext to worker threads of executor.
+    final Call originCall = Server.getCurCall().get();
+    final CallerContext originContext = CallerContext.getCurrent();
     for (final T location : locations) {
       String nsId = location.getNameserviceId();
       final List<? extends FederationNamenodeContext> namenodes =
@@ -1274,12 +1288,20 @@ public class RouterRpcClient {
             nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
           }
           orderedLocations.add(nnLocation);
-          callables.add(() -> invokeMethod(ugi, nnList, proto, m, paramList));
+          callables.add(
+              () -> {
+                transferThreadLocalContext(originCall, originContext);
+                return invokeMethod(ugi, nnList, proto, m, paramList);
+              });
         }
       } else {
         // Call the objectGetter in order of nameservices in the NS list
         orderedLocations.add(location);
-        callables.add(() ->  invokeMethod(ugi, namenodes, proto, m, paramList));
+        callables.add(
+            () -> {
+              transferThreadLocalContext(originCall, originContext);
+              return invokeMethod(ugi, namenodes, proto, m, paramList);
+            });
       }
     }
 
@@ -1347,6 +1369,20 @@ public class RouterRpcClient {
   }
 
   /**
+   * Transfer origin thread local context which is necessary to current
+   * worker thread when invoking method concurrently by executor service.
+   *
+   * @param originCall origin Call required for getting remote client ip.
+   * @param originContext origin CallerContext which should be transferred
+   *                      to server side.
+   */
+  private void transferThreadLocalContext(
+      final Call originCall, final CallerContext originContext) {
+    Server.getCurCall().set(originCall);
+    CallerContext.setCurrent(originContext);
+  }
+
+  /**
    * Get a prioritized list of NNs that share the same nameservice ID (in the
    * same namespace). NNs that are reported as ACTIVE will be first in the list.
    *
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
index bab1fd2..e50464c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
@@ -32,6 +32,7 @@ import static org.apache.hadoop.test.Whitebox.setInternalState;
 
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.net.InetAddress;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -67,6 +68,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -434,4 +436,45 @@ public class TestRouterRpcMultiDestination extends TestRouterRpc {
     setInternalState(ns0, "haContext", nn0haCtx);
     setInternalState(router0ClientProtocol, "allowPartialList", true);
   }
+
+  @Test
+  public void testCallerContextWithMultiDestinations() throws IOException {
+    GenericTestUtils.LogCapturer auditLog =
+        GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.auditLog);
+
+    // set client context
+    CallerContext.setCurrent(
+        new CallerContext.Builder("clientContext").build());
+    // assert the initial caller context as expected
+    assertEquals("clientContext", CallerContext.getCurrent().getContext());
+
+    DistributedFileSystem routerFs =
+        (DistributedFileSystem) getRouterFileSystem();
+    // create a directory via the router
+    Path dirPath = new Path("/test_caller_context_with_multi_destinations");
+    routerFs.mkdirs(dirPath);
+    // invoke concurrently in RouterRpcClient
+    routerFs.listStatus(dirPath);
+    // invoke sequentially in RouterRpcClient
+    routerFs.getFileStatus(dirPath);
+
+    String auditFlag = "src=" + dirPath.toString();
+    String clientIpInfo = "clientIp:"
+        + InetAddress.getLocalHost().getHostAddress();
+    for (String line : auditLog.getOutput().split("\n")) {
+      if (line.contains(auditFlag)) {
+        // assert origin caller context exist in audit log
+        assertTrue(line.contains("callerContext=clientContext"));
+        String callerContext = line.substring(
+            line.indexOf("callerContext=clientContext"));
+        // assert client ip info exist in caller context
+        assertTrue(callerContext.contains(clientIpInfo));
+        // assert client ip info appears only once in caller context
+        assertEquals(callerContext.indexOf(clientIpInfo),
+            callerContext.lastIndexOf(clientIpInfo));
+      }
+    }
+    // clear client context
+    CallerContext.setCurrent(null);
+  }
 }
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org