You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2020/01/16 18:39:29 UTC
[hadoop] branch trunk updated: HDFS-15112. RBF: Do not return
FileNotFoundException when a subcluster is unavailable.
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 263413e HDFS-15112. RBF: Do not return FileNotFoundException when a subcluster is unavailable.
263413e is described below
commit 263413e83840c7795a988e3939cd292d020c8d5f
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Thu Jan 16 10:39:14 2020 -0800
HDFS-15112. RBF: Do not return FileNotFoundException when a subcluster is unavailable.
---
.../server/federation/router/RouterRpcClient.java | 62 ++++++++++++-------
.../server/federation/router/RouterRpcServer.java | 6 ++
.../federation/router/TestRouterFaultTolerant.java | 69 +++++++++++++++++++++-
3 files changed, 114 insertions(+), 23 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 1c17c29..7003f96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.federation.router;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
+import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
@@ -436,8 +437,7 @@ public class RouterRpcClient {
this.rpcMonitor.proxyOpFailureStandby();
}
failover = true;
- } else if (ioe instanceof ConnectException ||
- ioe instanceof ConnectTimeoutException) {
+ } else if (isUnavailableException(ioe)) {
if (this.rpcMonitor != null) {
this.rpcMonitor.proxyOpFailureCommunicate();
}
@@ -503,8 +503,7 @@ public class RouterRpcClient {
if (ioe instanceof StandbyException) {
LOG.error("{} at {} is in Standby: {}",
nnKey, addr, ioe.getMessage());
- } else if (ioe instanceof ConnectException ||
- ioe instanceof ConnectTimeoutException) {
+ } else if (isUnavailableException(ioe)) {
exConnect++;
LOG.error("{} at {} cannot be reached: {}",
nnKey, addr, ioe.getMessage());
@@ -563,8 +562,7 @@ public class RouterRpcClient {
// failover, invoker looks for standby exceptions for failover.
if (ioe instanceof StandbyException) {
throw ioe;
- } else if (ioe instanceof ConnectException ||
- ioe instanceof ConnectTimeoutException) {
+ } else if (isUnavailableException(ioe)) {
throw ioe;
} else {
throw new StandbyException(ioe.getMessage());
@@ -579,6 +577,27 @@ public class RouterRpcClient {
}
/**
+ * Check if the exception comes from an unavailable subcluster.
+ * @param ioe IOException to check.
+ * @return If the exception comes from an unavailable subcluster.
+ */
+ public static boolean isUnavailableException(IOException ioe) {
+ if (ioe instanceof ConnectException ||
+ ioe instanceof ConnectTimeoutException ||
+ ioe instanceof EOFException ||
+ ioe instanceof StandbyException) {
+ return true;
+ }
+ if (ioe instanceof RetriableException) {
+ Throwable cause = ioe.getCause();
+ if (cause instanceof NoNamenodesAvailableException) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Check if the cluster of given nameservice id is available.
* @param nsId nameservice ID.
* @return
@@ -833,8 +852,7 @@ public class RouterRpcClient {
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
final Method m = remoteMethod.getMethod();
- IOException firstThrownException = null;
- IOException lastThrownException = null;
+ List<IOException> thrownExceptions = new ArrayList<>();
Object firstResult = null;
// Invoke in priority order
for (final RemoteLocationContext loc : locations) {
@@ -862,29 +880,33 @@ public class RouterRpcClient {
ioe = processException(ioe, loc);
// Record it and move on
- lastThrownException = ioe;
- if (firstThrownException == null) {
- firstThrownException = lastThrownException;
- }
+ thrownExceptions.add(ioe);
} catch (Exception e) {
// Unusual error, ClientProtocol calls always use IOException (or
// RemoteException). Re-wrap in IOException for compatibility with
// ClientProtcol.
LOG.error("Unexpected exception {} proxying {} to {}",
e.getClass(), m.getName(), ns, e);
- lastThrownException = new IOException(
+ IOException ioe = new IOException(
"Unexpected exception proxying API " + e.getMessage(), e);
- if (firstThrownException == null) {
- firstThrownException = lastThrownException;
- }
+ thrownExceptions.add(ioe);
}
}
- if (firstThrownException != null) {
- // re-throw the last exception thrown for compatibility
- throw firstThrownException;
+ if (!thrownExceptions.isEmpty()) {
+ // An unavailable subcluster may be the actual cause
+ // We cannot surface other exceptions (e.g., FileNotFoundException)
+ for (int i = 0; i < thrownExceptions.size(); i++) {
+ IOException ioe = thrownExceptions.get(i);
+ if (isUnavailableException(ioe)) {
+ throw ioe;
+ }
+ }
+
+ // re-throw the first exception thrown for compatibility
+ throw thrownExceptions.get(0);
}
- // Return the last result, whether it is the value we are looking for or a
+ // Return the first result, whether it is the value or not
@SuppressWarnings("unchecked")
T ret = (T)firstResult;
return ret;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 1f2298d..14cd6e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -653,6 +653,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
}
} catch (FileNotFoundException fne) {
// Ignore if the file is not found
+ } catch (IOException ioe) {
+ if (RouterRpcClient.isUnavailableException(ioe)) {
+ LOG.debug("Ignore unavailable exception: {}", ioe);
+ } else {
+ throw ioe;
+ }
}
}
return createLocation;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java
index 39d9561..8907ce5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java
@@ -25,6 +25,7 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.refre
import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -40,6 +41,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -51,6 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -72,6 +75,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableE
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -153,9 +157,6 @@ public class TestRouterFaultTolerant {
registerSubclusters(
routers, namenodes.values(), Collections.singleton("ns1"));
- LOG.info("Stop ns1 to simulate an unavailable subcluster");
- namenodes.get("ns1").stop();
-
service = Executors.newFixedThreadPool(10);
}
@@ -209,6 +210,9 @@ public class TestRouterFaultTolerant {
@Test
public void testWriteWithFailedSubcluster() throws Exception {
+ LOG.info("Stop ns1 to simulate an unavailable subcluster");
+ namenodes.get("ns1").stop();
+
// Run the actual tests with each approach
final List<Callable<Boolean>> tasks = new ArrayList<>();
final List<DestinationOrder> orders = asList(
@@ -609,4 +613,63 @@ public class TestRouterFaultTolerant {
return userUgi.doAs(
(PrivilegedExceptionAction<FileSystem>) () -> getFileSystem(router));
}
+
+ @Test
+ public void testReadWithFailedSubcluster() throws Exception {
+
+ DestinationOrder order = DestinationOrder.HASH_ALL;
+ final String mountPoint = "/" + order + "-testread";
+ final Path mountPath = new Path(mountPoint);
+ LOG.info("Setup {} with order {}", mountPoint, order);
+ createMountTableEntry(
+ routers, mountPoint, order, namenodes.keySet());
+
+ FileSystem fs = getRandomRouterFileSystem();
+
+ // Create a file (we don't write because we have no mock Datanodes)
+ final Path fileexisting = new Path(mountPath, "fileexisting");
+ final Path filenotexisting = new Path(mountPath, "filenotexisting");
+ FSDataOutputStream os = fs.create(fileexisting);
+ assertNotNull(os);
+ os.close();
+
+ // We should be able to read existing files
+ FSDataInputStream fsdis = fs.open(fileexisting);
+ assertNotNull("We should be able to read the file", fsdis);
+ // We shouldn't be able to read non-existing files
+ LambdaTestUtils.intercept(FileNotFoundException.class,
+ () -> fs.open(filenotexisting));
+
+ // Check the subcluster where the file got created
+ String nsIdWithFile = null;
+ for (Entry<String, MockNamenode> entry : namenodes.entrySet()) {
+ String nsId = entry.getKey();
+ MockNamenode nn = entry.getValue();
+ int rpc = nn.getRPCPort();
+ FileSystem nnfs = getFileSystem(rpc);
+
+ try {
+ FileStatus fileStatus = nnfs.getFileStatus(fileexisting);
+ assertNotNull(fileStatus);
+ assertNull("The file cannot be in two subclusters", nsIdWithFile);
+ nsIdWithFile = nsId;
+ } catch (FileNotFoundException fnfe) {
+ LOG.debug("File not found in {}", nsId);
+ }
+ }
+ assertNotNull("The file has to be in one subcluster", nsIdWithFile);
+
+ LOG.info("Stop {} to simulate an unavailable subcluster", nsIdWithFile);
+ namenodes.get(nsIdWithFile).stop();
+
+ // We should not get FileNotFoundException anymore
+ try {
+ fs.open(fileexisting);
+ fail("It should throw an unavailable cluster exception");
+ } catch(RemoteException re) {
+ IOException ioe = re.unwrapRemoteException();
+ assertTrue("Expected an unavailable exception for:" + ioe.getClass(),
+ RouterRpcClient.isUnavailableException(ioe));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org