You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2020/10/07 04:29:39 UTC

[hadoop] branch trunk updated: HDFS-15543. RBF: Write Should allow, when a subcluster is unavailable for RANDOM mount points with fault Tolerance enabled. Contributed by Hemanth Boyina.

This is an automated email from the ASF dual-hosted git repository.

hemanthboyina pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 921ca1f  HDFS-15543. RBF: Write Should allow, when a subcluster is unavailable for RANDOM mount points with fault Tolerance enabled. Contributed by Hemanth Boyina.
921ca1f is described below

commit 921ca1f554e128e3b187e4124beaf264e3d1c66a
Author: hemanthboyina <he...@apache.org>
AuthorDate: Wed Oct 7 09:58:53 2020 +0530

    HDFS-15543. RBF: Write Should allow, when a subcluster is unavailable for RANDOM mount points with fault Tolerance enabled. Contributed by Hemanth Boyina.
---
 .../federation/router/RouterClientProtocol.java    |  2 +-
 .../server/federation/router/RouterRpcServer.java  | 55 ++++++++++++++++++++--
 ...erRPCMultipleDestinationMountTableResolver.java | 39 +++++++++++++++
 3 files changed, 90 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index 2c7e90b..cfba7f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -306,7 +306,7 @@ public class RouterClientProtocol implements ClientProtocol {
    * @return If caused by an unavailable subcluster. False if the should not be
    *         retried (e.g., NSQuotaExceededException).
    */
-  private static boolean isUnavailableSubclusterException(
+  protected static boolean isUnavailableSubclusterException(
       final IOException ioe) {
     if (ioe instanceof ConnectException ||
         ioe instanceof ConnectTimeoutException ||
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 97b146c..ce10bfd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -37,6 +37,7 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -587,6 +588,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
   /**
    * Invokes the method at default namespace, if default namespace is not
    * available then at the first available namespace.
+   * If the namespace is unavailable, retry once with other namespace.
    * @param <T> expected return type.
    * @param method the remote method.
    * @return the response received after invoking method.
@@ -595,18 +597,61 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
   <T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
       throws IOException {
     String nsId = subclusterResolver.getDefaultNamespace();
-    if (!nsId.isEmpty()) {
-      return rpcClient.invokeSingle(nsId, method, clazz);
-    }
     // If default Ns is not present return result from first namespace.
     Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+    try {
+      if (!nsId.isEmpty()) {
+        return rpcClient.invokeSingle(nsId, method, clazz);
+      }
+      // If no namespace is available, throw IOException.
+      IOException io = new IOException("No namespace available.");
+      return invokeOnNs(method, clazz, io, nss);
+    } catch (IOException ioe) {
+      if (!clientProto.isUnavailableSubclusterException(ioe)) {
+        LOG.debug("{} exception cannot be retried",
+            ioe.getClass().getSimpleName());
+        throw ioe;
+      }
+      Set<FederationNamespaceInfo> nssWithoutFailed = getNameSpaceInfo(nsId);
+      return invokeOnNs(method, clazz, ioe, nssWithoutFailed);
+    }
+  }
+
+  /**
+   * Invoke the method on first available namespace,
+   * throw no namespace available exception, if no namespaces are available.
+   * @param method the remote method.
+   * @param clazz  Class for the return type.
+   * @param ioe    IOException .
+   * @param nss    List of name spaces in the federation
+   * @return the response received after invoking method.
+   * @throws IOException
+   */
+  <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe,
+      Set<FederationNamespaceInfo> nss) throws IOException {
     if (nss.isEmpty()) {
-      throw new IOException("No namespace available.");
+      throw ioe;
     }
-    nsId = nss.iterator().next().getNameserviceId();
+    String nsId = nss.iterator().next().getNameserviceId();
     return rpcClient.invokeSingle(nsId, method, clazz);
   }
 
+  /**
+   * Get set of namespace info's removing the already invoked namespaceinfo.
+   * @param nsId already invoked namespace id
+   * @return List of name spaces in the federation on
+   * removing the already invoked namespaceinfo.
+   */
+  private Set<FederationNamespaceInfo> getNameSpaceInfo(String nsId) {
+    Set<FederationNamespaceInfo> namespaceInfos = new HashSet<>();
+    for (FederationNamespaceInfo ns : namespaceInfos) {
+      if (!nsId.equals(ns.getNameserviceId())) {
+        namespaceInfos.add(ns);
+      }
+    }
+    return namespaceInfos;
+  }
+
   @Override // ClientProtocol
   public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
       throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
index ebb62d4..2887c08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Options.Rename;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AclEntry;
@@ -45,6 +46,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
 import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
 import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
@@ -60,6 +62,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRes
 import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.ToolRunner;
 import org.junit.After;
@@ -641,6 +644,42 @@ public class TestRouterRPCMultipleDestinationMountTableResolver {
   }
 
   /**
+   * Test write on mount point with multiple destinations
+   * and making a one of the destination's subcluster unavailable.
+   */
+  @Test
+  public void testWriteWithUnavailableSubCluster() throws IOException {
+    //create a mount point with multiple destinations
+    Path path = new Path("/testWriteWithUnavailableSubCluster");
+    Map<String, String> destMap = new HashMap<>();
+    destMap.put("ns0", "/testWriteWithUnavailableSubCluster");
+    destMap.put("ns1", "/testWriteWithUnavailableSubCluster");
+    nnFs0.mkdirs(path);
+    nnFs1.mkdirs(path);
+    MountTable addEntry =
+        MountTable.newInstance("/testWriteWithUnavailableSubCluster", destMap);
+    addEntry.setQuota(new RouterQuotaUsage.Builder().build());
+    addEntry.setDestOrder(DestinationOrder.RANDOM);
+    addEntry.setFaultTolerant(true);
+    assertTrue(addMountTable(addEntry));
+
+    //make one subcluster unavailable and perform write on mount point
+    MiniDFSCluster dfsCluster = cluster.getCluster();
+    dfsCluster.shutdownNameNode(0);
+    FSDataOutputStream out = null;
+    Path filePath = new Path(path, "aa");
+    try {
+      out = routerFs.create(filePath);
+      out.write("hello".getBytes());
+      out.hflush();
+      assertTrue(routerFs.exists(filePath));
+    } finally {
+      IOUtils.closeStream(out);
+      dfsCluster.restartNameNode(0);
+    }
+  }
+
+  /**
    * Test to verify rename operation on directories in case of multiple
    * destinations.
    * @param order order to be followed by the mount entry.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org