You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2021/05/08 11:52:04 UTC

[hadoop] branch trunk updated: HDFS-15923. RBF: Authentication failed when rename accross sub clusters (#2819). Contributed by zhuobin zheng.

This is an automated email from the ASF dual-hosted git repository.

jinglun pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1b69942  HDFS-15923. RBF: Authentication failed when rename accross sub clusters (#2819). Contributed by zhuobin zheng.
1b69942 is described below

commit 1b6994287a86a9a96907e641664ce75e522d0be2
Author: Jinglun <ji...@apache.org>
AuthorDate: Sat May 8 19:50:17 2021 +0800

    HDFS-15923. RBF: Authentication failed when rename accross sub clusters (#2819). Contributed by zhuobin zheng.
    
    Reviewed-by: Jinglun <ji...@apache.org>
    Reviewed-by: Inigo Goiri <in...@apache.org>
---
 hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml        |   6 +
 .../federation/router/RouterFederationRename.java  |  47 ++--
 .../router/TestRouterFederationRename.java         |   2 +-
 .../TestRouterFederationRenameInKerberosEnv.java   | 260 +++++++++++++++++++++
 4 files changed, 298 insertions(+), 17 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
index 8ba016a..e17602d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/pom.xml
@@ -102,6 +102,12 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
     </dependency>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java
index 8074fdd..d2bf989 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterFederationRename.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
 import org.apache.hadoop.tools.fedbalance.FedBalanceConfigs;
 import org.apache.hadoop.tools.fedbalance.FedBalanceContext;
@@ -31,6 +32,8 @@ import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedure;
 import org.apache.hadoop.tools.fedbalance.procedure.BalanceProcedureScheduler;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -96,23 +99,35 @@ public class RouterFederationRename {
     }
     RemoteLocation srcLoc = srcLocations.get(0);
     RemoteLocation dstLoc = dstLocations.get(0);
-    // Build and submit router federation rename job.
-    BalanceJob job = buildRouterRenameJob(srcLoc.getNameserviceId(),
-        dstLoc.getNameserviceId(), srcLoc.getDest(), dstLoc.getDest());
-    BalanceProcedureScheduler scheduler = rpcServer.getFedRenameScheduler();
-    countIncrement();
+
+    UserGroupInformation routerUser = UserGroupInformation.getLoginUser();
+
     try {
-      scheduler.submit(job);
-      LOG.info("Rename {} to {} from namespace {} to {}. JobId={}.", src, dst,
-          srcLoc.getNameserviceId(), dstLoc.getNameserviceId(), job.getId());
-      scheduler.waitUntilDone(job);
-      if (job.getError() != null) {
-        throw new IOException("Rename of " + src + " to " + dst + " failed.",
-            job.getError());
-      }
-      return true;
-    } finally {
-      countDecrement();
+      // as router user with saveJournal and task submission privileges
+      return routerUser.doAs((PrivilegedExceptionAction<Boolean>) () -> {
+        // Build and submit router federation rename job.
+        BalanceJob job = buildRouterRenameJob(srcLoc.getNameserviceId(),
+            dstLoc.getNameserviceId(), srcLoc.getDest(), dstLoc.getDest());
+        BalanceProcedureScheduler scheduler = rpcServer.getFedRenameScheduler();
+        countIncrement();
+        try {
+          scheduler.submit(job);
+          LOG.info("Rename {} to {} from namespace {} to {}. JobId={}.", src,
+              dst, srcLoc.getNameserviceId(), dstLoc.getNameserviceId(),
+              job.getId());
+          scheduler.waitUntilDone(job);
+          if (job.getError() != null) {
+            throw new IOException("Rename of " + src + " to " + dst +
+                " failed.", job.getError());
+          }
+          return true;
+        } finally {
+          countDecrement();
+        }
+      });
+    } catch (InterruptedException e) {
+      LOG.warn("Fed balance job is interrupted.", e);
+      throw new InterruptedIOException(e.getMessage());
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java
index c47098f..a9a17b3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRename.java
@@ -409,7 +409,7 @@ public class TestRouterFederationRename {
     getRouterFileSystem().delete(new Path(renamedDir), true);
   }
 
-  @Test(timeout = 10000)
+  @Test(timeout = 20000)
   public void testCounter() throws Exception {
     final RouterRpcServer rpcServer = router.getRouter().getRpcServer();
     List<String> nss = cluster.getNameservices();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenameInKerberosEnv.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenameInKerberosEnv.java
new file mode 100644
index 0000000..369508f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederationRenameInKerberosEnv.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.fedbalance.DistCpProcedure;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
+import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FEDERATION_RENAME_MAP;
+import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE;
+import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING;
+import static org.apache.hadoop.test.GenericTestUtils.getMethodName;
+import static org.apache.hadoop.tools.fedbalance.FedBalanceConfigs.SCHEDULER_JOURNAL_URI;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.RM_PRINCIPAL;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Basic tests of router federation rename. Rename across namespaces.
+ */
+public class TestRouterFederationRenameInKerberosEnv
+    extends ClientBaseWithFixes {
+
+  private static final int NUM_SUBCLUSTERS = 2;
+  private static final int NUM_DNS = 6;
+
+  private static String clientPrincipal = "client@EXAMPLE.COM";
+  private static String serverPrincipal =  System.getenv().get("USERNAME")
+      + "/localhost@EXAMPLE.COM";
+  private static String keytab = new File(
+      System.getProperty("test.dir", "target"),
+      UUID.randomUUID().toString())
+      .getAbsolutePath();
+
+  private static Configuration baseConf = new Configuration(false);
+
+  private static MiniKdc kdc;
+
+  /** Federated HDFS cluster. */
+  private MiniRouterDFSCluster cluster;
+
+  /** Random Router for this federated cluster. */
+  private RouterContext router;
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    // init KDC
+    File workDir = new File(System.getProperty("test.dir", "target"));
+    kdc = new MiniKdc(MiniKdc.createConf(), workDir);
+    kdc.start();
+    kdc.createPrincipal(new File(keytab), clientPrincipal, serverPrincipal);
+
+
+    baseConf.setBoolean(DFSConfigKeys.HADOOP_CALLER_CONTEXT_ENABLED_KEY,
+        true);
+
+    SecurityUtil.setAuthenticationMethod(KERBEROS, baseConf);
+    baseConf.set(RBFConfigKeys.DFS_ROUTER_KERBEROS_PRINCIPAL_KEY,
+        serverPrincipal);
+    baseConf.set(RBFConfigKeys.DFS_ROUTER_KEYTAB_FILE_KEY, keytab);
+    baseConf.set(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY,
+        serverPrincipal);
+    baseConf.set(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);
+    baseConf.set(DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY,
+        serverPrincipal);
+    baseConf.set(DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY, keytab);
+    baseConf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY,
+        true);
+
+    baseConf.set(DFS_DATA_TRANSFER_PROTECTION_KEY,
+        DFS_DATA_TRANSFER_PROTECTION_DEFAULT);
+    baseConf.setBoolean(IGNORE_SECURE_PORTS_FOR_TESTING_KEY, true);
+
+    DistCpProcedure.enableForTest();
+  }
+
+  @AfterClass
+  public static void globalTearDown() {
+    kdc.stop();
+    DistCpProcedure.disableForTest();
+  }
+
+  @After
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    cluster.shutdown();
+  }
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+
+    cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
+    cluster.setNumDatanodesPerNameservice(NUM_DNS);
+    cluster.addNamenodeOverrides(baseConf);
+    cluster.setIndependentDNs();
+    // Start NNs and DNs and wait until ready.
+    cluster.startCluster();
+
+    // Start routers, enable router federation rename.
+    String journal = "hdfs://" + cluster.getCluster().getNameNode(1)
+        .getClientNamenodeAddress() + "/journal";
+    Configuration routerConf = new RouterConfigBuilder()
+        .metrics()
+        .rpc()
+        .routerRenameOption()
+        .set(SCHEDULER_JOURNAL_URI, journal)
+        .set(DFS_ROUTER_FEDERATION_RENAME_MAP, "1")
+        .set(DFS_ROUTER_FEDERATION_RENAME_BANDWIDTH, "1")
+        .set(ZK_DTSM_ZK_CONNECTION_STRING, hostPort)
+        .set(ZK_DTSM_ZK_AUTH_TYPE, "none")
+        .set(RM_PRINCIPAL, serverPrincipal)
+        .build();
+    // We decrease the DN cache times to make the test faster.
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    cluster.addRouterOverrides(baseConf);
+    cluster.addRouterOverrides(routerConf);
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+
+    // We decrease the DN heartbeat expire interval to make them dead faster
+    cluster.getCluster().getNamesystem(0).getBlockManager()
+        .getDatanodeManager().setHeartbeatInterval(1);
+    cluster.getCluster().getNamesystem(1).getBlockManager()
+        .getDatanodeManager().setHeartbeatInterval(1);
+    cluster.getCluster().getNamesystem(0).getBlockManager()
+        .getDatanodeManager().setHeartbeatExpireInterval(3000);
+    cluster.getCluster().getNamesystem(1).getBlockManager()
+        .getDatanodeManager().setHeartbeatExpireInterval(3000);
+
+    // Create mock locations
+    cluster.installMockLocations();
+
+    // Create test fixtures on NN
+    cluster.createTestDirectoriesNamenode();
+
+    // Random router for this test
+    RouterContext rndRouter = cluster.getRandomRouter();
+    setRouter(rndRouter);
+  }
+
+  protected void createDir(FileSystem fs, String dir) throws IOException {
+    fs.mkdirs(new Path(dir));
+    String file = dir + "/file";
+    createFile(fs, file, 32);
+    verifyFileExists(fs, dir);
+    verifyFileExists(fs, file);
+  }
+
+  protected void testRenameDir(RouterContext testRouter, String path,
+      String renamedPath, boolean exceptionExpected, Callable<Object> call)
+      throws IOException {
+    createDir(testRouter.getFileSystem(), path);
+    // rename
+    boolean exceptionThrown = false;
+    try {
+      call.call();
+      assertFalse(verifyFileExists(testRouter.getFileSystem(), path));
+      assertTrue(verifyFileExists(testRouter.getFileSystem(),
+          renamedPath + "/file"));
+    } catch (Exception ex) {
+      exceptionThrown = true;
+      assertTrue(verifyFileExists(testRouter.getFileSystem(),
+          path + "/file"));
+      assertFalse(verifyFileExists(testRouter.getFileSystem(), renamedPath));
+    } finally {
+      FileContext fileContext = testRouter.getFileContext();
+      fileContext.delete(new Path(path), true);
+      fileContext.delete(new Path(renamedPath), true);
+    }
+    if (exceptionExpected) {
+      // Error was expected.
+      assertTrue(exceptionThrown);
+    } else {
+      // No error was expected.
+      assertFalse(exceptionThrown);
+    }
+  }
+
+  protected void setRouter(RouterContext r) throws IOException {
+    this.router = r;
+  }
+
+  @Test
+  public void testClientRename() throws IOException {
+    String ns0 = cluster.getNameservices().get(0);
+    String ns1 = cluster.getNameservices().get(1);
+    // Test successfully rename a dir to a destination that is in a different
+    // namespace.
+    String dir =
+        cluster.getFederatedTestDirectoryForNS(ns0) + "/" + getMethodName();
+    String renamedDir =
+        cluster.getFederatedTestDirectoryForNS(ns1) + "/" + getMethodName();
+    testRenameDir(router, dir, renamedDir, false, () -> {
+      UserGroupInformation ugi = UserGroupInformation
+          .loginUserFromKeytabAndReturnUGI(clientPrincipal, keytab);
+      ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> {
+        DFSClient client = router.getClient();
+        ClientProtocol clientProtocol = client.getNamenode();
+        clientProtocol.rename(dir, renamedDir);
+        return null;
+      });
+      return null;
+    });
+
+  }
+
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org