You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2020/01/16 18:39:29 UTC

[hadoop] branch trunk updated: HDFS-15112. RBF: Do not return FileNotFoundException when a subcluster is unavailable.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 263413e  HDFS-15112. RBF: Do not return FileNotFoundException when a subcluster is unavailable.
263413e is described below

commit 263413e83840c7795a988e3939cd292d020c8d5f
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Thu Jan 16 10:39:14 2020 -0800

    HDFS-15112. RBF: Do not return FileNotFoundException when a subcluster is unavailable.
---
 .../server/federation/router/RouterRpcClient.java  | 62 ++++++++++++-------
 .../server/federation/router/RouterRpcServer.java  |  6 ++
 .../federation/router/TestRouterFaultTolerant.java | 69 +++++++++++++++++++++-
 3 files changed, 114 insertions(+), 23 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 1c17c29..7003f96 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.federation.router;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_TIMEOUT_KEY;
 
+import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
@@ -436,8 +437,7 @@ public class RouterRpcClient {
             this.rpcMonitor.proxyOpFailureStandby();
           }
           failover = true;
-        } else if (ioe instanceof ConnectException ||
-            ioe instanceof ConnectTimeoutException) {
+        } else if (isUnavailableException(ioe)) {
           if (this.rpcMonitor != null) {
             this.rpcMonitor.proxyOpFailureCommunicate();
           }
@@ -503,8 +503,7 @@ public class RouterRpcClient {
       if (ioe instanceof StandbyException) {
         LOG.error("{} at {} is in Standby: {}",
             nnKey, addr, ioe.getMessage());
-      } else if (ioe instanceof ConnectException ||
-          ioe instanceof ConnectTimeoutException) {
+      } else if (isUnavailableException(ioe)) {
         exConnect++;
         LOG.error("{} at {} cannot be reached: {}",
             nnKey, addr, ioe.getMessage());
@@ -563,8 +562,7 @@ public class RouterRpcClient {
           // failover, invoker looks for standby exceptions for failover.
           if (ioe instanceof StandbyException) {
             throw ioe;
-          } else if (ioe instanceof ConnectException ||
-              ioe instanceof ConnectTimeoutException) {
+          } else if (isUnavailableException(ioe)) {
             throw ioe;
           } else {
             throw new StandbyException(ioe.getMessage());
@@ -579,6 +577,27 @@ public class RouterRpcClient {
   }
 
   /**
+   * Check if the exception comes from an unavailable subcluster.
+   * @param ioe IOException to check.
+   * @return If the exception comes from an unavailable subcluster.
+   */
+  public static boolean isUnavailableException(IOException ioe) {
+    if (ioe instanceof ConnectException ||
+        ioe instanceof ConnectTimeoutException ||
+        ioe instanceof EOFException ||
+        ioe instanceof StandbyException) {
+      return true;
+    }
+    if (ioe instanceof RetriableException) {
+      Throwable cause = ioe.getCause();
+      if (cause instanceof NoNamenodesAvailableException) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
    * Check if the cluster of given nameservice id is available.
    * @param nsId nameservice ID.
    * @return
@@ -833,8 +852,7 @@ public class RouterRpcClient {
 
     final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
     final Method m = remoteMethod.getMethod();
-    IOException firstThrownException = null;
-    IOException lastThrownException = null;
+    List<IOException> thrownExceptions = new ArrayList<>();
     Object firstResult = null;
     // Invoke in priority order
     for (final RemoteLocationContext loc : locations) {
@@ -862,29 +880,33 @@ public class RouterRpcClient {
         ioe = processException(ioe, loc);
 
         // Record it and move on
-        lastThrownException =  ioe;
-        if (firstThrownException == null) {
-          firstThrownException = lastThrownException;
-        }
+        thrownExceptions.add(ioe);
       } catch (Exception e) {
         // Unusual error, ClientProtocol calls always use IOException (or
         // RemoteException). Re-wrap in IOException for compatibility with
         // ClientProtcol.
         LOG.error("Unexpected exception {} proxying {} to {}",
             e.getClass(), m.getName(), ns, e);
-        lastThrownException = new IOException(
+        IOException ioe = new IOException(
             "Unexpected exception proxying API " + e.getMessage(), e);
-        if (firstThrownException == null) {
-          firstThrownException = lastThrownException;
-        }
+        thrownExceptions.add(ioe);
       }
     }
 
-    if (firstThrownException != null) {
-      // re-throw the last exception thrown for compatibility
-      throw firstThrownException;
+    if (!thrownExceptions.isEmpty()) {
+      // An unavailable subcluster may be the actual cause
+      // We cannot surface other exceptions (e.g., FileNotFoundException)
+      for (int i = 0; i < thrownExceptions.size(); i++) {
+        IOException ioe = thrownExceptions.get(i);
+        if (isUnavailableException(ioe)) {
+          throw ioe;
+        }
+      }
+
+      // re-throw the first exception thrown for compatibility
+      throw thrownExceptions.get(0);
     }
-    // Return the last result, whether it is the value we are looking for or a
+    // Return the first result, whether it is the value or not
     @SuppressWarnings("unchecked")
     T ret = (T)firstResult;
     return ret;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 1f2298d..14cd6e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -653,6 +653,12 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol,
         }
       } catch (FileNotFoundException fne) {
         // Ignore if the file is not found
+      } catch (IOException ioe) {
+        if (RouterRpcClient.isUnavailableException(ioe)) {
+          LOG.debug("Ignore unavailable exception: {}", ioe);
+        } else {
+          throw ioe;
+        }
       }
     }
     return createLocation;
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java
index 39d9561..8907ce5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java
@@ -25,6 +25,7 @@ import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.refre
 import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
 import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -40,6 +41,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -51,6 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -72,6 +75,7 @@ import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableE
 import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -153,9 +157,6 @@ public class TestRouterFaultTolerant {
     registerSubclusters(
         routers, namenodes.values(), Collections.singleton("ns1"));
 
-    LOG.info("Stop ns1 to simulate an unavailable subcluster");
-    namenodes.get("ns1").stop();
-
     service = Executors.newFixedThreadPool(10);
   }
 
@@ -209,6 +210,9 @@ public class TestRouterFaultTolerant {
   @Test
   public void testWriteWithFailedSubcluster() throws Exception {
 
+    LOG.info("Stop ns1 to simulate an unavailable subcluster");
+    namenodes.get("ns1").stop();
+
     // Run the actual tests with each approach
     final List<Callable<Boolean>> tasks = new ArrayList<>();
     final List<DestinationOrder> orders = asList(
@@ -609,4 +613,63 @@ public class TestRouterFaultTolerant {
     return userUgi.doAs(
         (PrivilegedExceptionAction<FileSystem>) () -> getFileSystem(router));
   }
+
+  @Test
+  public void testReadWithFailedSubcluster() throws Exception {
+
+    DestinationOrder order = DestinationOrder.HASH_ALL;
+    final String mountPoint = "/" + order + "-testread";
+    final Path mountPath = new Path(mountPoint);
+    LOG.info("Setup {} with order {}", mountPoint, order);
+    createMountTableEntry(
+        routers, mountPoint, order, namenodes.keySet());
+
+    FileSystem fs = getRandomRouterFileSystem();
+
+    // Create a file (we don't write because we have no mock Datanodes)
+    final Path fileexisting = new Path(mountPath, "fileexisting");
+    final Path filenotexisting = new Path(mountPath, "filenotexisting");
+    FSDataOutputStream os = fs.create(fileexisting);
+    assertNotNull(os);
+    os.close();
+
+    // We should be able to read existing files
+    FSDataInputStream fsdis = fs.open(fileexisting);
+    assertNotNull("We should be able to read the file", fsdis);
+    // We shouldn't be able to read non-existing files
+    LambdaTestUtils.intercept(FileNotFoundException.class,
+        () -> fs.open(filenotexisting));
+
+    // Check the subcluster where the file got created
+    String nsIdWithFile = null;
+    for (Entry<String, MockNamenode> entry : namenodes.entrySet()) {
+      String nsId = entry.getKey();
+      MockNamenode nn = entry.getValue();
+      int rpc = nn.getRPCPort();
+      FileSystem nnfs = getFileSystem(rpc);
+
+      try {
+        FileStatus fileStatus = nnfs.getFileStatus(fileexisting);
+        assertNotNull(fileStatus);
+        assertNull("The file cannot be in two subclusters", nsIdWithFile);
+        nsIdWithFile = nsId;
+      } catch (FileNotFoundException fnfe) {
+        LOG.debug("File not found in {}", nsId);
+      }
+    }
+    assertNotNull("The file has to be in one subcluster", nsIdWithFile);
+
+    LOG.info("Stop {} to simulate an unavailable subcluster", nsIdWithFile);
+    namenodes.get(nsIdWithFile).stop();
+
+    // We should not get FileNotFoundException anymore
+    try {
+      fs.open(fileexisting);
+      fail("It should throw an unavailable cluster exception");
+    } catch(RemoteException re) {
+      IOException ioe = re.unwrapRemoteException();
+      assertTrue("Expected an unavailable exception for:" + ioe.getClass(),
+          RouterRpcClient.isUnavailableException(ioe));
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org