You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2021/05/30 11:16:03 UTC

[hadoop] branch trunk updated: HDFS-15973. RBF: Add permission check before doing router federation rename. Contributed by Jinglun.

This is an automated email from the ASF dual-hosted git repository.

jinglun pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b7f6946  HDFS-15973. RBF: Add permission check before doing router federation rename. Contributed by Jinglun.
b7f6946 is described below

commit b7f69467c19f6114b931ca2377b0ed7a8ada609e
Author: 李经纶 <li...@xiaomi.com>
AuthorDate: Sun May 30 18:50:03 2021 +0800

    HDFS-15973. RBF: Add permission check before doing router federation rename. Contributed by Jinglun.
    
    Reviewed-by: Inigo Goiri <in...@apache.org>
---
 .../federation/router/RouterFederationRename.java  |  72 +++++-
 .../src/site/markdown/HDFSRouterFederation.md      |   5 +-
 .../router/TestRouterFederationRename.java         | 183 ++++-----------
 .../router/TestRouterFederationRenameBase.java     | 203 +++++++++++++++++
 .../TestRouterFederationRenameInKerberosEnv.java   |  49 +++-
 .../TestRouterFederationRenamePermission.java      | 246 +++++++++++++++++++++
 6 files changed, 606 insertions(+), 152 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java
index d2bf989..aafb685 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java
@@ -21,7 +21,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
 import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs;
@@ -55,7 +59,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Rename across router based federation namespaces.
+ * Rename across router federation namespaces based on federation balance. Both
+ * the src and the dst coming from different namespaces need to have only one
+ * destination. Snapshot paths are not allowed.
+ * Users need write privilege of both src parent and dst parent to do router
+ * federation rename.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -99,6 +107,8 @@ public class RouterFederationRename {
     }
     RemoteLocation srcLoc = srcLocations.get(0);
     RemoteLocation dstLoc = dstLocations.get(0);
+    checkSnapshotPath(srcLoc, dstLoc);
+    checkPermission(srcLoc, dstLoc);
 
     UserGroupInformation routerUser = UserGroupInformation.getLoginUser();
 
@@ -132,6 +142,66 @@ public class RouterFederationRename {
   }
 
   /**
+   * Check router federation rename permission.
+   */
+  private void checkPermission(RemoteLocation src, RemoteLocation dst)
+      throws IOException {
+    try {
+      if (UserGroupInformation.isSecurityEnabled()) {
+        // In security mode, check permission as remote user proxy by router
+        // user.
+        String remoteUserName = NameNode.getRemoteUser().getShortUserName();
+        UserGroupInformation proxyUser = UserGroupInformation
+            .createProxyUser(remoteUserName,
+                UserGroupInformation.getLoginUser());
+        proxyUser.doAs((PrivilegedExceptionAction<Object>) () -> {
+          checkRenamePermission(src, dst);
+          return null;
+        });
+      } else {
+        // In simple mode, check permission as remote user directly.
+        checkRenamePermission(src, dst);
+      }
+    } catch (AccessControlException e) {
+      throw new AccessControlException(
+          "Permission denied rename " + src.getSrc() + "(" + src + ") to " + dst
+              .getSrc() + "(" + dst + ") Reason=" + e.getMessage());
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new InterruptedIOException(
+          "Router Federation Rename is interrupted while checking permission.");
+    }
+  }
+
+  private void checkRenamePermission(RemoteLocation srcLoc,
+      RemoteLocation dstLoc) throws IOException {
+    // check src path permission.
+    Path srcPath =
+        new Path("hdfs://" + srcLoc.getNameserviceId() + srcLoc.getDest());
+    srcPath.getFileSystem(conf).access(srcPath.getParent(), FsAction.WRITE);
+    // check dst path permission.
+    Path dstPath =
+        new Path("hdfs://" + dstLoc.getNameserviceId() + dstLoc.getDest());
+    dstPath.getFileSystem(conf).access(dstPath.getParent(), FsAction.WRITE);
+  }
+
+  static void checkSnapshotPath(RemoteLocation src, RemoteLocation dst)
+      throws AccessControlException {
+    if (src.getDest()
+        .contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR + Path.SEPARATOR)) {
+      throw new AccessControlException(
+          "Router federation rename can't rename snapshot path. src=" + src
+              .getSrc() + "(" + src + ")");
+    }
+    if (dst.getDest()
+        .contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR + Path.SEPARATOR)) {
+      throw new AccessControlException(
+          "Router federation rename can't rename snapshot path. dst=" + dst
+              .getSrc() + "(" + dst + ")");
+    }
+  }
+
+  /**
    * Build router federation rename job moving data from src to dst.
    * @param srcNs the source namespace id.
    * @param dstNs the dst namespace id.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
index d7838c7..151289f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
@@ -514,7 +514,10 @@ More metrics info can see [RBF Metrics](../../hadoop-project-dist/hadoop-common/
 Router Federation Rename
 -------
 
-Enable Router to rename across namespaces. Currently the router federation rename is implemented by distcp. We must set the rpc timeout high enough so it won't timeout.
+Enable Router to rename across namespaces. Currently it is implemented based on [HDFS Federation Balance](../../../hadoop-federation-balance/HDFSFederationBalance.md) and has some limits comparing with normal rename.
+1. It is much slower than the normal rename so need a longer RPC timeout configuration. See `ipc.client.rpc-timeout.ms` and its description for more information about RPC timeout.
+2. It doesn't support snapshot path.
+3. It doesn't support to rename path with multiple destinations.
 
 | Property | Default | Description|
 |:---- |:---- |:---- |
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java
index a9a17b3..3f73d95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java
@@ -19,38 +19,32 @@ package org.apache.hadoop.hdfs.server.federation.router;
 
 import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
 import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH;
-import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP;
 import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
-import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.List;
-import java.util.Random;
+import java.util.Arrays;
+import java.util.Set;
 import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
-import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
-import org.apache.hadoop.hdfs.server.federation.MockResolver;
-import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
 import org.apache.hadoop.test.LambdaTestUtils;
-import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -60,137 +54,53 @@ import org.mockito.Mockito;
 /**
  * Basic tests of router federation rename. Rename across namespaces.
  */
-public class TestRouterFederationRename {
-
-  private static final int NUM_SUBCLUSTERS = 2;
-  private static final int NUM_DNS = 6;
+public class TestRouterFederationRename extends TestRouterFederationRenameBase {
 
-  /** Federated HDFS cluster. */
-  private static MiniRouterDFSCluster cluster;
-
-  /** Random Router for this federated cluster. */
-  private RouterContext router;
+  public static class MockGroupsMapping implements
+      GroupMappingServiceProvider {
 
-  /** Random nameservice in the federated cluster.  */
-  private String ns;
-  /** Filesystem interface to the Router. */
-  private FileSystem routerFS;
-  /** Filesystem interface to the Namenode. */
-  private FileSystem nnFS;
-  /** File in the Namenode. */
-  private String nnFile;
+    @Override
+    public List<String> getGroups(String user) {
+      return Arrays.asList(user+"_group");
+    }
 
-  @BeforeClass
-  public static void globalSetUp() throws Exception {
-    Configuration namenodeConf = new Configuration();
-    namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
-        true);
-    cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
-    cluster.setNumDatanodesPerNameservice(NUM_DNS);
-    cluster.addNamenodeOverrides(namenodeConf);
-    cluster.setIndependentDNs();
+    @Override
+    public void cacheGroupsRefresh() {
+    }
 
-    Configuration conf = new Configuration();
-    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 5);
-    cluster.addNamenodeOverrides(conf);
-    // Start NNs and DNs and wait until ready.
-    cluster.startCluster();
+    @Override
+    public void cacheGroupsAdd(List<String> groups) {
+    }
 
-    // Start routers, enable router federation rename.
-    String journal = "hdfs://" + cluster.getCluster().getNameNode(1)
-        .getClientNamenodeAddress() + "/journal";
-    Configuration routerConf = new RouterConfigBuilder()
-        .metrics()
-        .rpc()
-        .routerRenameOption()
-        .set(SCHEDULER_JOURNAL_URI, journal)
-        .set(DFS_ROUTER_FEDERATION_RENAME_MAP, "1")
-        .set(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, "1")
-        .build();
-    // We decrease the DN cache times to make the test faster.
-    routerConf.setTimeDuration(
-        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
-    cluster.addRouterOverrides(routerConf);
-    cluster.startRouters();
+    @Override
+    public Set<String> getGroupsSet(String user) {
+      return ImmutableSet.of(user+"_group");
+    }
+  }
 
-    // Register and verify all NNs with all routers
-    cluster.registerNamenodes();
-    cluster.waitNamenodeRegistration();
+  private RouterContext router;
+  private FileSystem routerFS;
+  private MiniRouterDFSCluster cluster;
 
-    // We decrease the DN heartbeat expire interval to make them dead faster
-    cluster.getCluster().getNamesystem(0).getBlockManager()
-        .getDatanodeManager().setHeartbeatInterval(1);
-    cluster.getCluster().getNamesystem(1).getBlockManager()
-        .getDatanodeManager().setHeartbeatInterval(1);
-    cluster.getCluster().getNamesystem(0).getBlockManager()
-        .getDatanodeManager().setHeartbeatExpireInterval(3000);
-    cluster.getCluster().getNamesystem(1).getBlockManager()
-        .getDatanodeManager().setHeartbeatExpireInterval(3000);
-    DistCpProcedure.enableForTest();
+  @BeforeClass
+  public static void before() throws Exception {
+    globalSetUp();
   }
 
   @AfterClass
-  public static void tearDown() {
-    cluster.shutdown();
-    DistCpProcedure.disableForTest();
+  public static void after() {
+    tearDown();
   }
 
   @Before
   public void testSetup() throws Exception {
-
-    // Create mock locations
-    cluster.installMockLocations();
-
-    // Delete all files via the NNs and verify
-    cluster.deleteAllFiles();
-
-    // Create test fixtures on NN
-    cluster.createTestDirectoriesNamenode();
-
-    // Wait to ensure NN has fully created its test directories
-    Thread.sleep(100);
-
-    // Random router for this test
-    RouterContext rndRouter = cluster.getRandomRouter();
-    this.setRouter(rndRouter);
-
-    // Create a mount that points to 2 dirs in the same ns:
-    // /same
-    //   ns0 -> /
-    //   ns0 -> /target-ns0
-    for (RouterContext rc : cluster.getRouters()) {
-      Router r = rc.getRouter();
-      MockResolver resolver = (MockResolver) r.getSubclusterResolver();
-      List<String> nss = cluster.getNameservices();
-      String ns0 = nss.get(0);
-      resolver.addLocation("/same", ns0, "/");
-      resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0));
-    }
-
-    // Pick a namenode for this test
-    String ns0 = cluster.getNameservices().get(0);
-    this.setNs(ns0);
-    this.setNamenode(cluster.getNamenode(ns0, null));
-
-    // Create a test file on the NN
-    Random rnd = new Random();
-    String randomFile = "testfile-" + rnd.nextInt();
-    this.nnFile =
-        cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile;
-
-    createFile(nnFS, nnFile, 32);
-    verifyFileExists(nnFS, nnFile);
-  }
-
-  protected void createDir(FileSystem fs, String dir) throws IOException {
-    fs.mkdirs(new Path(dir));
-    String file = dir + "/file";
-    createFile(fs, file, 32);
-    verifyFileExists(fs, dir);
-    verifyFileExists(fs, file);
+    setup();
+    router = getRouterContext();
+    routerFS = getRouterFileSystem();
+    cluster = getCluster();
   }
 
-  protected void testRenameDir(RouterContext testRouter, String path,
+  private void testRenameDir(RouterContext testRouter, String path,
       String renamedPath, boolean exceptionExpected, Callable<Object> call)
       throws IOException {
     createDir(testRouter.getFileSystem(), path);
@@ -219,23 +129,6 @@ public class TestRouterFederationRename {
     }
   }
 
-  protected void setRouter(RouterContext r) throws IOException {
-    this.router = r;
-    this.routerFS = r.getFileSystem();
-  }
-
-  protected void setNs(String nameservice) {
-    this.ns = nameservice;
-  }
-
-  protected void setNamenode(NamenodeContext nn) throws IOException {
-    this.nnFS = nn.getFileSystem();
-  }
-
-  protected FileSystem getRouterFileSystem() {
-    return this.routerFS;
-  }
-
   @Test
   public void testSuccessfulRbfRename() throws Exception {
     List<String> nss = cluster.getNameservices();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenameBase.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenameBase.java
new file mode 100644
index 0000000..4046492
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenameBase.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_ADMIN_ENABLE;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MockResolver;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
+
+/**
+ * Test base of router federation rename.
+ */
+public class TestRouterFederationRenameBase {
+
+  static final int NUM_SUBCLUSTERS = 2;
+  static final int NUM_DNS = 6;
+
+  /** Random Router for this federated cluster. */
+  private MiniRouterDFSCluster.RouterContext router;
+
+  /** Random nameservice in the federated cluster.  */
+  private String ns;
+  /** Filesystem interface to the Router. */
+  private FileSystem routerFS;
+  /** Filesystem interface to the Namenode. */
+  private FileSystem nnFS;
+  /** File in the Namenode. */
+  private String nnFile;
+
+  /** Federated HDFS cluster. */
+  private static MiniRouterDFSCluster cluster;
+
+  public static void globalSetUp() throws Exception {
+    Configuration namenodeConf = new Configuration();
+    namenodeConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
+        true);
+    namenodeConf.set(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        TestRouterFederationRename.MockGroupsMapping.class.getName());
+    cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
+    cluster.setNumDatanodesPerNameservice(NUM_DNS);
+    cluster.addNamenodeOverrides(namenodeConf);
+    cluster.setIndependentDNs();
+
+    Configuration conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 5);
+    cluster.addNamenodeOverrides(conf);
+    // Start NNs and DNs and wait until ready.
+    cluster.startCluster();
+
+    // Start routers, enable router federation rename.
+    String journal = "hdfs://" + cluster.getCluster().getNameNode(1)
+        .getClientNamenodeAddress() + "/journal";
+    Configuration routerConf = new RouterConfigBuilder()
+        .metrics()
+        .rpc()
+        .routerRenameOption()
+        .set(SCHEDULER_JOURNAL_URI, journal)
+        .set(DFS_ROUTER_FEDERATION_RENAME_MAP, "1")
+        .set(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, "1")
+        .build();
+    // We decrease the DN cache times to make the test faster.
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    routerConf.setBoolean(DFS_ROUTER_ADMIN_ENABLE, true);
+    routerConf.setBoolean(DFS_PERMISSIONS_ENABLED_KEY, true);
+    routerConf.set(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+        TestRouterFederationRename.MockGroupsMapping.class.getName());
+    cluster.addRouterOverrides(routerConf);
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+
+    // We decrease the DN heartbeat expire interval to make them dead faster
+    cluster.getCluster().getNamesystem(0).getBlockManager()
+        .getDatanodeManager().setHeartbeatInterval(1);
+    cluster.getCluster().getNamesystem(1).getBlockManager()
+        .getDatanodeManager().setHeartbeatInterval(1);
+    cluster.getCluster().getNamesystem(0).getBlockManager()
+        .getDatanodeManager().setHeartbeatExpireInterval(3000);
+    cluster.getCluster().getNamesystem(1).getBlockManager()
+        .getDatanodeManager().setHeartbeatExpireInterval(3000);
+    DistCpProcedure.enableForTest();
+  }
+
+  public static void tearDown() {
+    cluster.shutdown();
+    cluster = null;
+    DistCpProcedure.disableForTest();
+  }
+
+  protected void setup() throws IOException, InterruptedException {
+
+    // Create mock locations
+    cluster.installMockLocations();
+
+    // Delete all files via the NNs and verify
+    cluster.deleteAllFiles();
+
+    // Create test fixtures on NN
+    cluster.createTestDirectoriesNamenode();
+
+    // Random router for this test
+    MiniRouterDFSCluster.RouterContext rndRouter = cluster.getRandomRouter();
+    this.setRouter(rndRouter);
+
+    // Create a mount that points to 2 dirs in the same ns:
+    // /same
+    //   ns0 -> /
+    //   ns0 -> /target-ns0
+    for (MiniRouterDFSCluster.RouterContext rc : cluster.getRouters()) {
+      Router r = rc.getRouter();
+      MockResolver resolver = (MockResolver) r.getSubclusterResolver();
+      List<String> nss = cluster.getNameservices();
+      String ns0 = nss.get(0);
+      resolver.addLocation("/same", ns0, "/");
+      resolver.addLocation("/same", ns0, cluster.getNamenodePathForNS(ns0));
+    }
+
+    // Pick a namenode for this test
+    String ns0 = cluster.getNameservices().get(0);
+    this.setNs(ns0);
+    this.setNamenode(cluster.getNamenode(ns0, null));
+
+    // Create a test file on the NN
+    Random rnd = new Random();
+    String randomFile = "testfile-" + rnd.nextInt();
+    this.nnFile =
+        cluster.getNamenodeTestDirectoryForNS(ns) + "/" + randomFile;
+
+    createFile(nnFS, nnFile, 32);
+    verifyFileExists(nnFS, nnFile);
+  }
+
+  protected void setRouter(MiniRouterDFSCluster.RouterContext r) throws
+      IOException {
+    this.router = r;
+    this.routerFS = r.getFileSystem();
+  }
+
+  protected void setNs(String nameservice) {
+    this.ns = nameservice;
+  }
+
+  protected void setNamenode(MiniRouterDFSCluster.NamenodeContext nn)
+      throws IOException {
+    this.nnFS = nn.getFileSystem();
+  }
+
+  protected FileSystem getRouterFileSystem() {
+    return this.routerFS;
+  }
+
+  protected void createDir(FileSystem fs, String dir) throws IOException {
+    fs.mkdirs(new Path(dir));
+    String file = dir + "/file";
+    createFile(fs, file, 32);
+    verifyFileExists(fs, dir);
+    verifyFileExists(fs, file);
+  }
+
+  public MiniRouterDFSCluster getCluster() {
+    return cluster;
+  }
+
+  public MiniRouterDFSCluster.RouterContext getRouterContext() {
+    return router;
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenameInKerberosEnv.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenameInKerberosEnv.java
index 369508f..2895692 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenameInKerberosEnv.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenameInKerberosEnv.java
@@ -18,9 +18,11 @@
 package org.apache.hadoop.hdfs.server.federation.router;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -31,6 +33,8 @@ import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ImpersonationProvider;
 import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -40,11 +44,13 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
 import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
@@ -116,10 +122,35 @@ public class TestRouterFederationRenameInKerberosEnv
     baseConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY,
         DFS_DATA_TRANSFER_PROTECTION_DEFAULT);
     baseConf.setBoolean(IGNORE_SECURE_PORTS_FOR_TESTING_KEY, true);
+    baseConf.setClass(HADOOP_SECURITY_IMPERSONATION_PROVIDER_CLASS,
+        AllowUserImpersonationProvider.class, ImpersonationProvider.class);
 
     DistCpProcedure.enableForTest();
   }
 
+  /**
+   * {@link ImpersonationProvider} that confirms the user doing the
+   * impersonating is the same as the user running the MiniCluster.
+   */
+  private static class AllowUserImpersonationProvider extends Configured
+      implements ImpersonationProvider {
+    public void init(String configurationPrefix) {
+      // Do nothing
+    }
+
+    public void authorize(UserGroupInformation user, InetAddress remoteAddress)
+        throws AuthorizationException {
+      try {
+        if (!user.getRealUser().getShortUserName()
+            .equals(UserGroupInformation.getCurrentUser().getShortUserName())) {
+          throw new AuthorizationException();
+        }
+      } catch (IOException ioe) {
+        throw new AuthorizationException(ioe);
+      }
+    }
+  }
+
   @AfterClass
   public static void globalTearDown() {
     kdc.stop();
@@ -191,18 +222,26 @@ public class TestRouterFederationRenameInKerberosEnv
     setRouter(rndRouter);
   }
 
-  protected void createDir(FileSystem fs, String dir) throws IOException {
-    fs.mkdirs(new Path(dir));
-    String file = dir + "/file";
+  protected void prepareEnv(FileSystem fs, Path path, Path renamedPath)
+      throws IOException {
+    // Set permission of parent to 777.
+    fs.setPermission(path.getParent(),
+        FsPermission.createImmutable((short)511));
+    fs.setPermission(renamedPath.getParent(),
+        FsPermission.createImmutable((short)511));
+    // Create src path and file.
+    fs.mkdirs(path);
+    String file = path.toString() + "/file";
     createFile(fs, file, 32);
-    verifyFileExists(fs, dir);
+    verifyFileExists(fs, path.toString());
     verifyFileExists(fs, file);
   }
 
   protected void testRenameDir(RouterContext testRouter, String path,
       String renamedPath, boolean exceptionExpected, Callable<Object> call)
       throws IOException {
-    createDir(testRouter.getFileSystem(), path);
+    prepareEnv(testRouter.getFileSystem(), new Path(path),
+        new Path(renamedPath));
     // rename
     boolean exceptionThrown = false;
     try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenamePermission.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenamePermission.java
new file mode 100644
index 0000000..cb828db
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenamePermission.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.apache.hadoop.fs.permission.FsAction.ALL;
+import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
+import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.AclEntryType;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test permission check of router federation rename.
+ */
+public class TestRouterFederationRenamePermission
+    extends TestRouterFederationRenameBase {
+
+  private String srcNs; // the source namespace.
+  private String dstNs; // the dst namespace.
+  // the source path.
+  private String srcStr;
+  private Path srcPath;
+  // the dst path.
+  private String dstStr;
+  private Path dstPath;
+  private UserGroupInformation foo;
+  private MiniRouterDFSCluster.RouterContext router;
+  private FileSystem routerFS;
+  private MiniRouterDFSCluster cluster;
+
+  @BeforeClass
+  public static void before() throws Exception {
+    globalSetUp();
+  }
+
+  @AfterClass
+  public static void after() {
+    tearDown();
+  }
+
+  @Before
+  public void testSetup() throws Exception {
+    setup();
+    cluster = getCluster();
+    List<String> nss = cluster.getNameservices();
+    srcNs = nss.get(0);
+    dstNs = nss.get(1);
+    srcStr = cluster.getFederatedTestDirectoryForNS(srcNs) + "/d0/"
+        + getMethodName();
+    dstStr = cluster.getFederatedTestDirectoryForNS(dstNs) + "/d0/"
+        + getMethodName();
+    srcPath = new Path(srcStr);
+    dstPath = new Path(dstStr);
+    foo = UserGroupInformation.createRemoteUser("foo");
+    router = getRouterContext();
+    routerFS = getRouterFileSystem();
+  }
+
+  @Test
+  public void testRenameSnapshotPath() throws Exception {
+    LambdaTestUtils.intercept(IOException.class,
+        "Router federation rename can't rename snapshot path",
+        "Expect IOException.", () -> RouterFederationRename.checkSnapshotPath(
+            new RemoteLocation(srcNs, "/foo/.snapshot/src", "/src"),
+            new RemoteLocation(dstNs, "/foo/dst", "/dst")));
+    LambdaTestUtils.intercept(IOException.class,
+        "Router federation rename can't rename snapshot path",
+        "Expect IOException.", () -> RouterFederationRename
+            .checkSnapshotPath(new RemoteLocation(srcNs, "/foo/src", "/src"),
+                new RemoteLocation(dstNs, "/foo/.snapshot/dst", "/dst")));
+  }
+
+  // Case1: the source path doesn't exist.
+  @Test
+  public void testPermission1() throws Exception {
+    LambdaTestUtils.intercept(RemoteException.class, "FileNotFoundException",
+        "Expect FileNotFoundException.", () -> {
+          DFSClient client = router.getClient(foo);
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename(srcStr, dstStr);
+        });
+  }
+
+  // Case2: the source path parent without any permission.
+  @Test
+  public void testPermission2() throws Exception {
+    createDir(routerFS, srcStr);
+    routerFS.setPermission(srcPath.getParent(),
+        FsPermission.createImmutable((short) 0));
+    LambdaTestUtils.intercept(RemoteException.class, "AccessControlException",
+        "Expect AccessControlException.", () -> {
+          DFSClient client = router.getClient(foo);
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename(srcStr, dstStr);
+        });
+  }
+
+  // Case3: the source path with rwxr-xr-x permission.
+  @Test
+  public void testPermission3() throws Exception {
+    createDir(routerFS, srcStr);
+    routerFS.setPermission(srcPath.getParent(),
+        FsPermission.createImmutable((short) 493));
+    LambdaTestUtils.intercept(RemoteException.class, "AccessControlException",
+        "Expect AccessControlException.", () -> {
+          DFSClient client = router.getClient(foo);
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename(srcStr, dstStr);
+        });
+  }
+
+  // Case4: the source path with unrelated acl user:not-foo:rwx.
+  @Test
+  public void testPermission4() throws Exception {
+    createDir(routerFS, srcStr);
+    routerFS.setAcl(srcPath.getParent(), buildAcl("not-foo", ALL));
+    LambdaTestUtils.intercept(RemoteException.class, "AccessControlException",
+        "Expect AccessControlException.", () -> {
+          DFSClient client = router.getClient(foo);
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename(srcStr, dstStr);
+        });
+  }
+
+  // Case5: the source path with user:foo:rwx. And the dst path doesn't exist.
+  @Test
+  public void testPermission5() throws Exception {
+    createDir(routerFS, srcStr);
+    routerFS.setAcl(srcPath.getParent(), buildAcl("foo", ALL));
+    assertFalse(routerFS.exists(dstPath.getParent()));
+    LambdaTestUtils.intercept(RemoteException.class, "FileNotFoundException",
+        "Expect FileNotFoundException.", () -> {
+          DFSClient client = router.getClient(foo);
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename(srcStr, dstStr);
+        });
+  }
+
+  // Case6: the src path with correct permission and the dst path with bad
+  //        permission.
+  @Test
+  public void testPermission6() throws Exception {
+    createDir(routerFS, srcStr);
+    routerFS.setAcl(srcPath.getParent(), buildAcl("foo", ALL));
+    assertTrue(routerFS.mkdirs(dstPath.getParent()));
+    LambdaTestUtils.intercept(RemoteException.class, "AccessControlException",
+        "Expect AccessControlException.", () -> {
+          DFSClient client = router.getClient(foo);
+          ClientProtocol clientProtocol = client.getNamenode();
+          clientProtocol.rename(srcStr, dstStr);
+        });
+  }
+
+  // Case7: successful rename.
+  @Test
+  public void testPermission7() throws Exception {
+    createDir(routerFS, srcStr);
+    routerFS.setAcl(srcPath.getParent(), buildAcl("foo", ALL));
+    assertTrue(routerFS.mkdirs(dstPath.getParent()));
+    routerFS.setOwner(dstPath.getParent(), "foo", "foogroup");
+    DFSClient client = router.getClient(foo);
+    ClientProtocol clientProtocol = client.getNamenode();
+    clientProtocol.rename(srcStr, dstStr);
+    assertFalse(verifyFileExists(routerFS, srcStr));
+    assertTrue(
+        verifyFileExists(routerFS, dstStr + "/file"));
+  }
+
+  /**
+   * Build acl list.
+   *
+   * user::rwx
+   * group::rwx
+   * user:input_user:input_permission
+   * other::r-x
+   * @param user the input user.
+   * @param permission the input fs action.
+   */
+  private List<AclEntry> buildAcl(String user, FsAction permission) {
+    List<AclEntry> aclEntryList = Lists.newArrayList();
+    aclEntryList.add(
+        new AclEntry.Builder()
+            .setName(user)
+            .setPermission(permission)
+            .setScope(AclEntryScope.ACCESS)
+            .setType(AclEntryType.USER)
+            .build());
+    aclEntryList.add(
+        new AclEntry.Builder()
+            .setPermission(FsAction.ALL)
+            .setScope(AclEntryScope.ACCESS)
+            .setType(AclEntryType.USER)
+            .build());
+    aclEntryList.add(
+        new AclEntry.Builder()
+            .setPermission(FsAction.ALL)
+            .setScope(AclEntryScope.ACCESS)
+            .setType(AclEntryType.GROUP)
+            .build());
+    aclEntryList.add(
+        new AclEntry.Builder()
+            .setPermission(READ_EXECUTE)
+            .setScope(AclEntryScope.ACCESS)
+            .setType(AclEntryType.OTHER)
+            .build());
+    return aclEntryList;
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org