You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/03/03 18:36:48 UTC

[hadoop] 09/45: HDFS-14082. RBF: Add option to fail operations when a subcluster is unavailable. Contributed by Inigo Goiri.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit afe9ac91b27b2f50d7f3e5f06b13c14f9031b434
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Wed Nov 21 10:40:26 2018 +0800

    HDFS-14082. RBF: Add option to fail operations when a subcluster is unavailable. Contributed by Inigo Goiri.
---
 .../server/federation/router/RBFConfigKeys.java    |  4 ++
 .../federation/router/RouterClientProtocol.java    | 15 ++++--
 .../server/federation/router/RouterRpcServer.java  |  9 ++++
 .../src/main/resources/hdfs-rbf-default.xml        | 10 ++++
 .../router/TestRouterRpcMultiDestination.java      | 59 ++++++++++++++++++++++
 5 files changed, 93 insertions(+), 4 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index dd72e36..10018fe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -125,6 +125,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
   public static final String DFS_ROUTER_CLIENT_REJECT_OVERLOAD =
       FEDERATION_ROUTER_PREFIX + "client.reject.overload";
   public static final boolean DFS_ROUTER_CLIENT_REJECT_OVERLOAD_DEFAULT = false;
+  public static final String DFS_ROUTER_ALLOW_PARTIAL_LIST =
+      FEDERATION_ROUTER_PREFIX + "client.allow-partial-listing";
+  public static final boolean DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT = true;
+
 
   // HDFS Router State Store connection
   public static final String FEDERATION_FILE_RESOLVER_CLIENT_CLASS =
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index 9e2979b..6c44362 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -112,6 +112,9 @@ public class RouterClientProtocol implements ClientProtocol {
   private final FileSubclusterResolver subclusterResolver;
   private final ActiveNamenodeResolver namenodeResolver;
 
+  /** If it requires response from all subclusters. */
+  private final boolean allowPartialList;
+
   /** Identifier for the super user. */
   private final String superUser;
   /** Identifier for the super group. */
@@ -125,6 +128,10 @@ public class RouterClientProtocol implements ClientProtocol {
     this.subclusterResolver = rpcServer.getSubclusterResolver();
     this.namenodeResolver = rpcServer.getNamenodeResolver();
 
+    this.allowPartialList = conf.getBoolean(
+        RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST,
+        RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST_DEFAULT);
+
     // User and group for reporting
     this.superUser = System.getProperty("user.name");
     this.superGroup = conf.get(
@@ -608,8 +615,8 @@ public class RouterClientProtocol implements ClientProtocol {
         new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
         new RemoteParam(), startAfter, needLocation);
     Map<RemoteLocation, DirectoryListing> listings =
-        rpcClient.invokeConcurrent(
-            locations, method, false, false, DirectoryListing.class);
+        rpcClient.invokeConcurrent(locations, method,
+            !this.allowPartialList, false, DirectoryListing.class);
 
     Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
     int totalRemainingEntries = 0;
@@ -998,8 +1005,8 @@ public class RouterClientProtocol implements ClientProtocol {
       RemoteMethod method = new RemoteMethod("getContentSummary",
           new Class<?>[] {String.class}, new RemoteParam());
       Map<RemoteLocation, ContentSummary> results =
-          rpcClient.invokeConcurrent(
-              locations, method, false, false, ContentSummary.class);
+          rpcClient.invokeConcurrent(locations, method,
+              !this.allowPartialList, false, ContentSummary.class);
       summaries.addAll(results.values());
     } catch (FileNotFoundException e) {
       notFoundException = e;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index fcb35f4..ad5980b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -1484,6 +1484,15 @@ public class RouterRpcServer extends AbstractService
   }
 
   /**
+   * Get ClientProtocol module implementation.
+   * @return ClientProtocol implementation
+   */
+  @VisibleForTesting
+  public RouterClientProtocol getClientProtocolModule() {
+    return this.clientProto;
+  }
+
+  /**
    * Get RPC metrics info.
    * @return The instance of FederationRPCMetrics.
    */
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index 53bf53a..09050bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -483,6 +483,16 @@
   </property>
 
   <property>
+    <name>dfs.federation.router.client.allow-partial-listing</name>
+    <value>true</value>
+    <description>
+      If the Router can return a partial list of files in a multi-destination mount point when one of the subclusters is unavailable.
+      True may return a partial list of files if a subcluster is down.
+      False will fail the request if one is unavailable.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.federation.router.keytab.file</name>
     <value></value>
     <description>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
index 94b712f..3101748 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcMultiDestination.java
@@ -20,6 +20,13 @@ package org.apache.hadoop.hdfs.server.federation.router;
 import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createFile;
 import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.verifyFileExists;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.internal.util.reflection.Whitebox.getInternalState;
+import static org.mockito.internal.util.reflection.Whitebox.setInternalState;
 
 import java.io.IOException;
 import java.lang.reflect.Method;
@@ -44,6 +51,13 @@ import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterConte
 import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
 import org.apache.hadoop.hdfs.server.federation.resolver.PathLocation;
 import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Test;
 
 /**
  * The the RPC interface of the {@link getRouter()} implemented by
@@ -214,4 +228,49 @@ public class TestRouterRpcMultiDestination extends TestRouterRpc {
     testRename(getRouterContext(), filename1, renamedFile, false);
     testRename2(getRouterContext(), filename1, renamedFile, false);
   }
+
+  @Test
+  public void testSubclusterDown() throws Exception {
+    final int totalFiles = 6;
+
+    List<RouterContext> routers = getCluster().getRouters();
+
+    // Test the behavior when everything is fine
+    FileSystem fs = getRouterFileSystem();
+    FileStatus[] files = fs.listStatus(new Path("/"));
+    assertEquals(totalFiles, files.length);
+
+    // Simulate one of the subclusters is in standby
+    NameNode nn0 = getCluster().getNamenode("ns0", null).getNamenode();
+    FSNamesystem ns0 = nn0.getNamesystem();
+    HAContext nn0haCtx = (HAContext)getInternalState(ns0, "haContext");
+    HAContext mockCtx = mock(HAContext.class);
+    doThrow(new StandbyException("Mock")).when(mockCtx).checkOperation(any());
+    setInternalState(ns0, "haContext", mockCtx);
+
+    // router0 should throw an exception
+    RouterContext router0 = routers.get(0);
+    RouterRpcServer router0RPCServer = router0.getRouter().getRpcServer();
+    RouterClientProtocol router0ClientProtocol =
+        router0RPCServer.getClientProtocolModule();
+    setInternalState(router0ClientProtocol, "allowPartialList", false);
+    try {
+      router0.getFileSystem().listStatus(new Path("/"));
+      fail("I should throw an exception");
+    } catch (RemoteException re) {
+      GenericTestUtils.assertExceptionContains(
+          "No namenode available to invoke getListing", re);
+    }
+
+    // router1 should report partial results
+    RouterContext router1 = routers.get(1);
+    files = router1.getFileSystem().listStatus(new Path("/"));
+    assertTrue("Found " + files.length + " items, we should have less",
+        files.length < totalFiles);
+
+
+    // Restore the HA context and the Router
+    setInternalState(ns0, "haContext", nn0haCtx);
+    setInternalState(router0ClientProtocol, "allowPartialList", true);
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org