You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2020/10/07 04:29:39 UTC
[hadoop] branch trunk updated: HDFS-15543. RBF: Write Should allow,
when a subcluster is unavailable for RANDOM mount points with fault
Tolerance enabled. Contributed by Hemanth Boyina.
This is an automated email from the ASF dual-hosted git repository.
hemanthboyina pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 921ca1f HDFS-15543. RBF: Write Should allow, when a subcluster is unavailable for RANDOM mount points with fault Tolerance enabled. Contributed by Hemanth Boyina.
921ca1f is described below
commit 921ca1f554e128e3b187e4124beaf264e3d1c66a
Author: hemanthboyina <he...@apache.org>
AuthorDate: Wed Oct 7 09:58:53 2020 +0530
HDFS-15543. RBF: Write Should allow, when a subcluster is unavailable for RANDOM mount points with fault Tolerance enabled. Contributed by Hemanth Boyina.
---
.../federation/router/RouterClientProtocol.java | 2 +-
.../server/federation/router/RouterRpcServer.java | 55 ++++++++++++++++++++--
...erRPCMultipleDestinationMountTableResolver.java | 39 +++++++++++++++
3 files changed, 90 insertions(+), 6 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index 2c7e90b..cfba7f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -306,7 +306,7 @@ public class RouterClientProtocol implements ClientProtocol {
* @return If caused by an unavailable subcluster. False if the should not be
* retried (e.g., NSQuotaExceededException).
*/
- private static boolean isUnavailableSubclusterException(
+ protected static boolean isUnavailableSubclusterException(
final IOException ioe) {
if (ioe instanceof ConnectException ||
ioe instanceof ConnectTimeoutException ||
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 97b146c..ce10bfd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -37,6 +37,7 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -587,6 +588,7 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
/**
* Invokes the method at default namespace, if default namespace is not
* available then at the first available namespace.
+ * If the namespace is unavailable, retry once with other namespace.
* @param <T> expected return type.
* @param method the remote method.
* @return the response received after invoking method.
@@ -595,18 +597,61 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
<T> T invokeAtAvailableNs(RemoteMethod method, Class<T> clazz)
throws IOException {
String nsId = subclusterResolver.getDefaultNamespace();
- if (!nsId.isEmpty()) {
- return rpcClient.invokeSingle(nsId, method, clazz);
- }
// If default Ns is not present return result from first namespace.
Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ try {
+ if (!nsId.isEmpty()) {
+ return rpcClient.invokeSingle(nsId, method, clazz);
+ }
+ // If no namespace is available, throw IOException.
+ IOException io = new IOException("No namespace available.");
+ return invokeOnNs(method, clazz, io, nss);
+ } catch (IOException ioe) {
+ if (!clientProto.isUnavailableSubclusterException(ioe)) {
+ LOG.debug("{} exception cannot be retried",
+ ioe.getClass().getSimpleName());
+ throw ioe;
+ }
+ Set<FederationNamespaceInfo> nssWithoutFailed = getNameSpaceInfo(nsId);
+ return invokeOnNs(method, clazz, ioe, nssWithoutFailed);
+ }
+ }
+
+ /**
+ * Invoke the method on first available namespace,
+ * throw no namespace available exception, if no namespaces are available.
+ * @param method the remote method.
+ * @param clazz Class for the return type.
+ * @param ioe IOException .
+ * @param nss List of name spaces in the federation
+ * @return the response received after invoking method.
+ * @throws IOException
+ */
+ <T> T invokeOnNs(RemoteMethod method, Class<T> clazz, IOException ioe,
+ Set<FederationNamespaceInfo> nss) throws IOException {
if (nss.isEmpty()) {
- throw new IOException("No namespace available.");
+ throw ioe;
}
- nsId = nss.iterator().next().getNameserviceId();
+ String nsId = nss.iterator().next().getNameserviceId();
return rpcClient.invokeSingle(nsId, method, clazz);
}
+ /**
+ * Get set of namespace info's removing the already invoked namespaceinfo.
+ * @param nsId already invoked namespace id
+ * @return List of name spaces in the federation on
+ * removing the already invoked namespaceinfo.
+ */
+ private Set<FederationNamespaceInfo> getNameSpaceInfo(String nsId) {
+ Set<FederationNamespaceInfo> namespaceInfos = new HashSet<>();
+ for (FederationNamespaceInfo ns : namespaceInfos) {
+ if (!nsId.equals(ns.getNameserviceId())) {
+ namespaceInfos.add(ns);
+ }
+ }
+ return namespaceInfos;
+ }
+
@Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
index ebb62d4..2887c08 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
@@ -45,6 +46,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
@@ -60,6 +62,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.GetDestinationRes
import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.tools.federation.RouterAdmin;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.ToolRunner;
import org.junit.After;
@@ -641,6 +644,42 @@ public class TestRouterRPCMultipleDestinationMountTableResolver {
}
/**
+ * Test write on mount point with multiple destinations
+ * and making a one of the destination's subcluster unavailable.
+ */
+ @Test
+ public void testWriteWithUnavailableSubCluster() throws IOException {
+ //create a mount point with multiple destinations
+ Path path = new Path("/testWriteWithUnavailableSubCluster");
+ Map<String, String> destMap = new HashMap<>();
+ destMap.put("ns0", "/testWriteWithUnavailableSubCluster");
+ destMap.put("ns1", "/testWriteWithUnavailableSubCluster");
+ nnFs0.mkdirs(path);
+ nnFs1.mkdirs(path);
+ MountTable addEntry =
+ MountTable.newInstance("/testWriteWithUnavailableSubCluster", destMap);
+ addEntry.setQuota(new RouterQuotaUsage.Builder().build());
+ addEntry.setDestOrder(DestinationOrder.RANDOM);
+ addEntry.setFaultTolerant(true);
+ assertTrue(addMountTable(addEntry));
+
+ //make one subcluster unavailable and perform write on mount point
+ MiniDFSCluster dfsCluster = cluster.getCluster();
+ dfsCluster.shutdownNameNode(0);
+ FSDataOutputStream out = null;
+ Path filePath = new Path(path, "aa");
+ try {
+ out = routerFs.create(filePath);
+ out.write("hello".getBytes());
+ out.hflush();
+ assertTrue(routerFs.exists(filePath));
+ } finally {
+ IOUtils.closeStream(out);
+ dfsCluster.restartNameNode(0);
+ }
+ }
+
+ /**
* Test to verify rename operation on directories in case of multiple
* destinations.
* @param order order to be followed by the mount entry.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org