You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ay...@apache.org on 2020/09/21 17:54:37 UTC

[hadoop] branch trunk updated: HDFS-15554. RBF: force router check file existence in destinations before adding/updating mount points (#2266). Contributed by Fengnan Li.

This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3e8b1e7  HDFS-15554. RBF: force router check file existence in destinations before adding/updating mount points (#2266). Contributed by Fengnan Li.
3e8b1e7 is described below

commit 3e8b1e74268994d244eeeb6400c6144edbbb486f
Author: lfengnan <lf...@uber.com>
AuthorDate: Mon Sep 21 10:54:13 2020 -0700

    HDFS-15554. RBF: force router check file existence in destinations before adding/updating mount points (#2266). Contributed by Fengnan Li.
---
 .../server/federation/router/RBFConfigKeys.java    |  4 ++
 .../federation/router/RouterAdminServer.java       | 75 ++++++++++++++++++++--
 .../src/main/resources/hdfs-rbf-default.xml        | 10 +++
 .../src/site/markdown/HDFSRouterFederation.md      |  3 +-
 .../server/federation/router/TestRouterAdmin.java  | 73 ++++++++++++++++++++-
 5 files changed, 156 insertions(+), 9 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 7b06ca4..efd4a3a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -277,6 +277,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
   public static final String DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY =
       FEDERATION_ROUTER_PREFIX + "fs-limits.max-component-length";
   public static final int DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_DEFAULT = 0;
+  public static final String DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE =
+      FEDERATION_ROUTER_PREFIX + "admin.mount.check.enable";
+  public static final boolean DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE_DEFAULT =
+      false;
 
   // HDFS Router-based federation web
   public static final String DFS_ROUTER_HTTP_ENABLE =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
index 50094b0..c7525f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -89,6 +90,7 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -126,6 +128,7 @@ public class RouterAdminServer extends AbstractService
   private static boolean isPermissionEnabled;
   private boolean iStateStoreCache;
   private final long maxComponentLength;
+  private boolean mountTableCheckDestination;
 
   public RouterAdminServer(Configuration conf, Router router)
       throws IOException {
@@ -184,6 +187,9 @@ public class RouterAdminServer extends AbstractService
     this.maxComponentLength = (int) conf.getLongBytes(
         RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_KEY,
         RBFConfigKeys.DFS_ROUTER_ADMIN_MAX_COMPONENT_LENGTH_DEFAULT);
+    this.mountTableCheckDestination = conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE,
+        RBFConfigKeys.DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE_DEFAULT);
 
     GenericRefreshProtocolServerSideTranslatorPB genericRefreshXlator =
         new GenericRefreshProtocolServerSideTranslatorPB(this);
@@ -326,6 +332,13 @@ public class RouterAdminServer extends AbstractService
     // Checks max component length limit.
     MountTable mountTable = request.getEntry();
     verifyMaxComponentLength(mountTable);
+    if (this.mountTableCheckDestination) {
+      List<String> nsIds = verifyFileInDestinations(mountTable);
+      if (!nsIds.isEmpty()) {
+        throw new IllegalArgumentException("File not found in downstream " +
+            "nameservices: " + StringUtils.join(",", nsIds));
+      }
+    }
     return getMountTableStore().addMountTableEntry(request);
   }
 
@@ -336,6 +349,13 @@ public class RouterAdminServer extends AbstractService
     MountTable oldEntry = null;
     // Checks max component length limit.
     verifyMaxComponentLength(updateEntry);
+    if (this.mountTableCheckDestination) {
+      List<String> nsIds = verifyFileInDestinations(updateEntry);
+      if (!nsIds.isEmpty()) {
+        throw new IllegalArgumentException("File not found in downstream " +
+            "nameservices: " + StringUtils.join(",", nsIds));
+      }
+    }
     if (this.router.getSubclusterResolver() instanceof MountTableResolver) {
       MountTableResolver mResolver =
           (MountTableResolver) this.router.getSubclusterResolver();
@@ -542,10 +562,31 @@ public class RouterAdminServer extends AbstractService
   @Override
   public GetDestinationResponse getDestination(
       GetDestinationRequest request) throws IOException {
+    RouterRpcServer rpcServer = this.router.getRpcServer();
+    List<RemoteLocation> locations =
+        rpcServer.getLocationsForPath(request.getSrcPath(), false);
+    List<String> nsIds = getDestinationNameServices(request, locations);
+    if (nsIds.isEmpty() && !locations.isEmpty()) {
+      String nsId = locations.get(0).getNameserviceId();
+      nsIds.add(nsId);
+    }
+    return GetDestinationResponse.newInstance(nsIds);
+  }
+
+  /**
+   * Get destination nameservices where the file in request exists.
+   *
+   * @param request request with src info.
+   * @param locations remote locations to check against.
+   * @return list of nameservices where the dest file was found
+   * @throws IOException
+   */
+  private List<String> getDestinationNameServices(
+      GetDestinationRequest request, List<RemoteLocation> locations)
+      throws IOException {
     final String src = request.getSrcPath();
     final List<String> nsIds = new ArrayList<>();
     RouterRpcServer rpcServer = this.router.getRpcServer();
-    List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false);
     RouterRpcClient rpcClient = rpcServer.getRPCClient();
     RemoteMethod method = new RemoteMethod("getFileInfo",
         new Class<?>[] {String.class}, new RemoteParam());
@@ -562,11 +603,35 @@ public class RouterAdminServer extends AbstractService
       LOG.error("Cannot get location for {}: {}",
           src, ioe.getMessage());
     }
-    if (nsIds.isEmpty() && !locations.isEmpty()) {
-      String nsId = locations.get(0).getNameserviceId();
-      nsIds.add(nsId);
+    return nsIds;
+  }
+
+  /**
+   * Verify the file exists in destination nameservices to avoid dangling
+   * mount points.
+   *
+   * @param entry the new mount points added, could be from add or update.
+   * @return destination nameservices where the file doesn't exist.
+   * @throws IOException unable to verify the file in destinations
+   */
+  public List<String> verifyFileInDestinations(MountTable entry)
+      throws IOException {
+    GetDestinationRequest request =
+        GetDestinationRequest.newInstance(entry.getSourcePath());
+    List<RemoteLocation> locations = entry.getDestinations();
+    List<String> nsId =
+        getDestinationNameServices(request, locations);
+
+    // get nameservices where no target file exists
+    Set<String> destNs = new HashSet<>(nsId);
+    List<String> nsWithoutFile = new ArrayList<>();
+    for (RemoteLocation location : locations) {
+      String ns = location.getNameserviceId();
+      if (!destNs.contains(ns)) {
+        nsWithoutFile.add(ns);
+      }
     }
-    return GetDestinationResponse.newInstance(nsIds);
+    return nsWithoutFile;
   }
 
   /**
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 4bd2ac3..aab90e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -216,6 +216,16 @@
   </property>
 
   <property>
+    <name>dfs.federation.router.admin.mount.check.enable</name>
+    <value>false</value>
+    <description>
+      If true, add/update mount table will include a destination check to make
+      sure the file exists in downstream namenodes, and changes to mount table
+      will fail if the file doesn't exist in any of the destination namenode.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.federation.router.http-address</name>
     <value>0.0.0.0:50071</value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
index de45645..66f039a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/site/markdown/HDFSRouterFederation.md
@@ -201,7 +201,8 @@ And to stop it:
 
 ### Mount table management
 
-The mount table entries are pretty much the same as in [ViewFs](../hadoop-hdfs/ViewFs.html).
+The mount table entries are pretty much the same as in [ViewFs](../hadoop-hdfs/ViewFs.html). Please make sure the downstream namespace path
+exists before creating mount table entry pointing to it.
 A good practice for simplifying the management is to name the federated namespace with the same names as the destination namespaces.
 For example, if we to mount `/data/app1` in the federated namespace, it is recommended to have that same name as in the destination namespace.
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
index 8a57224..0cb229f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdmin.java
@@ -27,11 +27,14 @@ import static org.junit.Assert.assertTrue;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
 import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
@@ -57,7 +60,6 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableE
 import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.Whitebox;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.AfterClass;
@@ -65,6 +67,9 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.FieldSetter;
+
+import com.google.common.collect.Lists;
 
 /**
  * The administrator interface of the {@link Router} implemented by
@@ -78,6 +83,7 @@ public class TestRouterAdmin {
       "Hadoop:service=Router,name=FederationRPC";
   private static List<MountTable> mockMountTable;
   private static StateStoreService stateStore;
+  private static RouterRpcClient mockRpcClient;
 
   @BeforeClass
   public static void globalSetUp() throws Exception {
@@ -88,6 +94,7 @@ public class TestRouterAdmin {
         .admin()
         .rpc()
         .build();
+    conf.setBoolean(RBFConfigKeys.DFS_ROUTER_ADMIN_MOUNT_CHECK_ENABLE, true);
     cluster.addRouterOverrides(conf);
     cluster.startRouters();
     routerContext = cluster.getRandomRouter();
@@ -103,11 +110,51 @@ public class TestRouterAdmin {
         createNamenodeReport("ns1", "nn1", HAServiceState.ACTIVE));
     stateStore.refreshCaches(true);
 
+    setUpMocks();
+  }
+
+  /**
+   * Group all mocks together.
+   *
+   * @throws IOException
+   * @throws NoSuchFieldException
+   */
+  private static void setUpMocks() throws IOException, NoSuchFieldException {
     RouterRpcServer spyRpcServer =
         Mockito.spy(routerContext.getRouter().createRpcServer());
-    Whitebox
-        .setInternalState(routerContext.getRouter(), "rpcServer", spyRpcServer);
+    FieldSetter.setField(routerContext.getRouter(),
+        Router.class.getDeclaredField("rpcServer"), spyRpcServer);
     Mockito.doReturn(null).when(spyRpcServer).getFileInfo(Mockito.anyString());
+
+    // mock rpc client for destination check when editing mount tables.
+    mockRpcClient = Mockito.spy(spyRpcServer.getRPCClient());
+    FieldSetter.setField(spyRpcServer,
+        RouterRpcServer.class.getDeclaredField("rpcClient"),
+        mockRpcClient);
+    RemoteLocation remoteLocation0 =
+        new RemoteLocation("ns0", "/testdir", null);
+    RemoteLocation remoteLocation1 =
+        new RemoteLocation("ns1", "/", null);
+    final Map<RemoteLocation, HdfsFileStatus> mockResponse0 = new HashMap<>();
+    final Map<RemoteLocation, HdfsFileStatus> mockResponse1 = new HashMap<>();
+    mockResponse0.put(remoteLocation0,
+        new HdfsFileStatus.Builder().build());
+    Mockito.doReturn(mockResponse0).when(mockRpcClient).invokeConcurrent(
+        Mockito.eq(Lists.newArrayList(remoteLocation0)),
+        Mockito.any(RemoteMethod.class),
+        Mockito.eq(false),
+        Mockito.eq(false),
+        Mockito.eq(HdfsFileStatus.class)
+    );
+    mockResponse1.put(remoteLocation1,
+        new HdfsFileStatus.Builder().build());
+    Mockito.doReturn(mockResponse1).when(mockRpcClient).invokeConcurrent(
+        Mockito.eq(Lists.newArrayList(remoteLocation1)),
+        Mockito.any(RemoteMethod.class),
+        Mockito.eq(false),
+        Mockito.eq(false),
+        Mockito.eq(HdfsFileStatus.class)
+    );
   }
 
   @AfterClass
@@ -332,6 +379,26 @@ public class TestRouterAdmin {
     assertEquals(entry.getSourcePath(), "/ns0");
   }
 
+  @Test
+  public void testVerifyFileInDestinations() throws IOException {
+    // this entry has been created in the mock setup
+    MountTable newEntry = MountTable.newInstance(
+        "/testpath", Collections.singletonMap("ns0", "/testdir"),
+        Time.now(), Time.now());
+    RouterAdminServer adminServer =
+        this.routerContext.getRouter().getAdminServer();
+    List<String> result = adminServer.verifyFileInDestinations(newEntry);
+    assertEquals(0, result.size());
+
+    // this entry was not created in the mock
+    newEntry = MountTable.newInstance(
+        "/testpath", Collections.singletonMap("ns0", "/testdir1"),
+        Time.now(), Time.now());
+    result = adminServer.verifyFileInDestinations(newEntry);
+    assertEquals(1, result.size());
+    assertEquals("ns0", result.get(0));
+  }
+
   /**
    * Gets an existing mount table record in the state store.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org