You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2022/03/21 17:37:34 UTC

[hadoop] branch branch-3.3 updated: HDFS-13248: Namenode needs to use the actual client IP when going through the RBF proxy. There is a new configuration knob dfs.namenode.ip-proxy-users that configures the list of users than can set their client ip address using the client context.

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 1175acb  HDFS-13248: Namenode needs to use the actual client IP when going through the RBF proxy. There is a new configuration knob dfs.namenode.ip-proxy-users that configures the list of users than can set their client ip address using the client context.
1175acb is described below

commit 1175acbb75919ccc51659948aae875526e7676f4
Author: Owen O'Malley <oo...@linkedin.com>
AuthorDate: Thu Mar 17 14:06:55 2022 -0700

    HDFS-13248: Namenode needs to use the actual client IP when going through the
    RBF proxy. There is a new configuration knob dfs.namenode.ip-proxy-users that configures
    the list of users than can set their client ip address using the client context.
    
    Fixes #4081
---
 .../java/org/apache/hadoop/ipc/CallerContext.java  |  5 ++
 .../hadoop/security/UserGroupInformation.java      | 14 +++-
 .../server/federation/router/RouterRpcClient.java  |  4 +-
 .../src/site/markdown/HDFSRouterFederation.md      | 14 ++++
 .../java/org/apache/hadoop/hdfs/DFSConfigKeys.java |  2 +
 .../hadoop/hdfs/server/namenode/FSNamesystem.java  |  5 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java    | 31 ++++++-
 .../src/main/resources/hdfs-default.xml            | 11 +++
 .../server/namenode/TestNameNodeRpcServer.java     | 96 +++++++++++++++++++++-
 9 files changed, 174 insertions(+), 8 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
index c8b4135..dbd9184 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallerContext.java
@@ -43,6 +43,11 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_CALLER_C
 @InterfaceStability.Evolving
 public final class CallerContext {
   public static final Charset SIGNATURE_ENCODING = StandardCharsets.UTF_8;
+
+  // field names
+  public static final String CLIENT_IP_STR = "clientIp";
+  public static final String CLIENT_PORT_STR = "clientPort";
+
   /** The caller context.
    *
    * It will be truncated if it exceeds the maximum allowed length in
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
index 67d1518..973e6a2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/UserGroupInformation.java
@@ -1509,7 +1509,19 @@ public class UserGroupInformation {
     return null;
   }
 
-
+  /**
+   * If this is a proxy user, get the real user. Otherwise, return
+   * this user.
+   * @param user the user to check
+   * @return the real user or self
+   */
+  public static UserGroupInformation getRealUserOrSelf(UserGroupInformation user) {
+    if (user == null) {
+      return null;
+    }
+    UserGroupInformation real = user.getRealUser();
+    return real != null ? real : user;
+  }
   
   /**
    * This class is used for storing the groups for testing. It stores a local
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 16cd7a5..ef84f30 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -545,8 +545,8 @@ public class RouterRpcClient {
     byte[] origSignature = ctx == null ? null : ctx.getSignature();
     CallerContext.Builder builder =
         new CallerContext.Builder("", contextFieldSeparator)
-            .append(CLIENT_IP_STR, Server.getRemoteAddress())
-            .append(CLIENT_PORT_STR,
+            .append(CallerContext.CLIENT_IP_STR, Server.getRemoteAddress())
+            .append(CallerContext.CLIENT_PORT_STR,
                 Integer.toString(Server.getRemotePort()))
             .setSignature(origSignature);
     // Append the original caller context
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
index de45645..dce2c65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
@@ -362,6 +362,20 @@ With this setting a user can interact with `ns-fed` as a regular namespace:
 This federated namespace can also be set as the default one at **core-site.xml** using `fs.defaultFS`.
 
 
+NameNode configuration
+--------------------
+
+In order for the system to support data-locality, you must configure your NameNodes so that they will trust the routers to supply the user's client IP address. `dfs.namenode.ip-proxy-users` defines a comma separated list of users that are allowed to provide the client ip address via the caller context.
+
+```xml
+<configuration>
+  <property>
+    <name>dfs.namenode.ip-proxy-users</name>
+    <value>hdfs</value>
+  </property>
+</configuration>
+```
+
 Router configuration
 --------------------
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index f47ad6c..7df1c5c 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -938,6 +938,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.namenode.lifeline.handler.count";
   public static final String  DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";
   public static final int     DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT = 10;
+  // List of users that can override their client ip
+  public static final String  DFS_NAMENODE_IP_PROXY_USERS = "dfs.namenode.ip-proxy-users";
   public static final String  DFS_HTTP_POLICY_KEY = "dfs.http.policy";
   public static final String  DFS_HTTP_POLICY_DEFAULT =  HttpConfig.Policy.HTTP_ONLY.name();
   public static final String  DFS_DATANODE_HTTPSERVER_FILTER_HANDLERS = "dfs.datanode.httpserver.filter.handlers";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 318442d..3ce8a80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -387,7 +387,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   @Metric final MutableRatesWithAggregation detailedLockHoldTimeMetrics =
       registry.newRatesWithAggregation("detailedLockHoldTimeMetrics");
 
-  private static final String CLIENT_PORT_STR = "clientPort";
   private final String contextFieldSeparator;
 
   boolean isAuditEnabled() {
@@ -453,7 +452,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       byte[] origSignature = ctx == null ? null : ctx.getSignature();
       CallerContext.setCurrent(
           new CallerContext.Builder(origContext, contextFieldSeparator)
-              .append(CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
+              .append(CallerContext.CLIENT_PORT_STR, String.valueOf(Server.getRemotePort()))
               .setSignature(origSignature)
               .build());
     }
@@ -461,7 +460,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
 
   private boolean isClientPortInfoAbsent(CallerContext ctx){
     return ctx == null || ctx.getContext() == null
-        || !ctx.getContext().contains(CLIENT_PORT_STR);
+        || !ctx.getContext().contains(CallerContext.CLIENT_PORT_STR);
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index e5d8aa9..e6610e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENG
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_COUNT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_HANDLER_RATIO_KEY;
@@ -45,7 +46,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.ipc.CallerContext;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -267,6 +270,9 @@ public class NameNodeRpcServer implements NamenodeProtocols {
 
   private final String defaultECPolicyName;
 
+  // Users who can override the client ip
+  private final String[] ipProxyUsers;
+
   public NameNodeRpcServer(Configuration conf, NameNode nn)
       throws IOException {
     this.nn = nn;
@@ -277,6 +283,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     int handlerCount = 
       conf.getInt(DFS_NAMENODE_HANDLER_COUNT_KEY, 
                   DFS_NAMENODE_HANDLER_COUNT_DEFAULT);
+    ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS);
 
     RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class,
         ProtobufRpcEngine2.class);
@@ -1890,7 +1897,29 @@ public class NameNodeRpcServer implements NamenodeProtocols {
     }
   }
 
-  private static String getClientMachine() {
+  private String getClientMachine() {
+    if (ipProxyUsers != null) {
+      // Get the real user (or effective if it isn't a proxy user)
+      UserGroupInformation user =
+          UserGroupInformation.getRealUserOrSelf(Server.getRemoteUser());
+      if (user != null &&
+          ArrayUtils.contains(ipProxyUsers, user.getShortUserName())) {
+        CallerContext context = CallerContext.getCurrent();
+        if (context != null && context.isContextValid()) {
+          String cc = context.getContext();
+          // if the rpc has a caller context of "clientIp:1.2.3.4,CLI",
+          // return "1.2.3.4" as the client machine.
+          String key = CallerContext.CLIENT_IP_STR +
+              CallerContext.Builder.KEY_VALUE_SEPARATOR;
+          int posn = cc.indexOf(key);
+          if (posn != -1) {
+            posn += key.length();
+            int end = cc.indexOf(",", posn);
+            return end == -1 ? cc.substring(posn) : cc.substring(posn, end);
+          }
+        }
+      }
+    }
     String clientMachine = Server.getRemoteAddress();
     if (clientMachine == null) { //not a RPC client
       clientMachine = "";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index c24e288..925d42e 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -545,6 +545,17 @@
 </property>
 
 <property>
+   <name>dfs.namenode.ip-proxy-users</name>
+   <value></value>
+   <description>A comma separated list of user names that are allowed by the
+     NameNode to specify a different client IP address in the caller context.
+     This is used by Router-Based Federation (RBF) to provide the actual client's
+     IP address to the NameNode, which is critical to preserve data locality when
+     using RBF. If you are using RBF, add the user that runs the routers.
+   </description>
+</property>
+
+<property>
   <name>dfs.namenode.acls.enabled</name>
   <value>true</value>
   <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java
index ada93e8..74d85bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeRpcServer.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,14 +24,25 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_IP_PROXY_USERS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_BIND_HOST_KEY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.ipc.CallerContext;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
 
 public class TestNameNodeRpcServer {
@@ -59,5 +70,88 @@ public class TestNameNodeRpcServer {
       conf.unset(DFS_NAMENODE_RPC_BIND_HOST_KEY);
     }
   }
+
+  /**
+   * Get the preferred DataNode location for the first block of the
+   * given file.
+   * @param fs The file system to use
+   * @param p The path to use
+   * @return the preferred host to get the data
+   */
+  private static String getPreferredLocation(DistributedFileSystem fs,
+                                             Path p) throws IOException{
+    // Use getLocatedBlocks because it is the basis for HDFS open,
+    // but provides visibility into which host will be used.
+    LocatedBlocks blocks = fs.getClient()
+        .getLocatedBlocks(p.toUri().getPath(), 0);
+    return blocks.get(0).getLocations()[0].getHostName();
+  }
+
+  // Because of the randomness of the NN assigning DN, we run multiple
+  // trials. 1/3^20=3e-10, so that should be good enough.
+  static final int ITERATIONS_TO_USE = 20;
+
+  /**
+   * A test to make sure that if an authorized user adds "clientIp:" to their
+   * caller context, it will be used to make locality decisions on the NN.
+   */
+  @Test
+  public void testNamenodeRpcClientIpProxy() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+
+    conf.set(DFS_NAMENODE_IP_PROXY_USERS, "fake_joe");
+    // Make 3 nodes & racks so that we have a decent shot of detecting when
+    // our change overrides the random choice of datanode.
+    final String[] racks = new String[]{"/rack1", "/rack2", "/rack3"};
+    final String[] hosts = new String[]{"node1", "node2", "node3"};
+    MiniDFSCluster cluster = null;
+    final CallerContext original = CallerContext.getCurrent();
+
+    try {
+      cluster = new MiniDFSCluster.Builder(conf)
+          .racks(racks).hosts(hosts).numDataNodes(hosts.length)
+          .build();
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      // Write a sample file
+      final Path fooName = fs.makeQualified(new Path("/foo"));
+      FSDataOutputStream stream = fs.create(fooName);
+      stream.write("Hello world!\n".getBytes(StandardCharsets.UTF_8));
+      stream.close();
+      // Set the caller context to set the ip address
+      CallerContext.setCurrent(
+          new CallerContext.Builder("test", conf)
+              .append(CallerContext.CLIENT_IP_STR, hosts[0])
+              .build());
+      // Should get a random mix of DataNodes since we aren't joe.
+      for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) {
+        String host = getPreferredLocation(fs, fooName);
+        if (!hosts[0].equals(host)) {
+          // found some other host, so things are good
+          break;
+        } else if (trial == ITERATIONS_TO_USE - 1) {
+          assertNotEquals("Failed to get non-node1", hosts[0], host);
+        }
+      }
+      // Run as fake joe to authorize the test
+      UserGroupInformation joe =
+          UserGroupInformation.createUserForTesting("fake_joe",
+              new String[]{"fake_group"});
+      DistributedFileSystem joeFs =
+          (DistributedFileSystem) DFSTestUtil.getFileSystemAs(joe, conf);
+      // As joe, we should get all node1.
+      for (int trial = 0; trial < ITERATIONS_TO_USE; ++trial) {
+        String host = getPreferredLocation(joeFs, fooName);
+        assertEquals("Trial " + trial + " failed", hosts[0], host);
+      }
+    } finally {
+      CallerContext.setCurrent(original);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+      // Reset the config
+      conf.unset(DFS_NAMENODE_IP_PROXY_USERS);
+    }
+  }
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org