You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2019/04/22 20:30:19 UTC

[hadoop] branch trunk updated: HDFS-14435. [SBN Read] Enable ObserverReadProxyProvider to gracefully handle StandbyException when fetching HAServiceState. Contributed by Erik Krogen.

This is an automated email from the ASF dual-hosted git repository.

xkrogen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 174b7d3  HDFS-14435. [SBN Read] Enable ObserverReadProxyProvider to gracefully handle StandbyException when fetching HAServiceState. Contributed by Erik Krogen.
174b7d3 is described below

commit 174b7d3126e215c519b1c4a74892c7020712f9df
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Wed Apr 17 12:41:48 2019 -0700

    HDFS-14435. [SBN Read] Enable ObserverReadProxyProvider to gracefully handle StandbyException when fetching HAServiceState. Contributed by Erik Krogen.
---
 .../namenode/ha/ObserverReadProxyProvider.java     | 38 +++++++++++++-----
 .../hadoop/hdfs/server/namenode/ha/HATestUtil.java | 20 +++++++---
 .../namenode/ha/TestDelegationTokensWithHA.java    | 46 ++++++++++++++++++++++
 3 files changed, 90 insertions(+), 14 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index b4130d7..0df5e1e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RpcInvocationHandler;
+import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,7 +68,8 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceStability.Evolving
 public class ObserverReadProxyProvider<T extends ClientProtocol>
     extends AbstractNNFailoverProxyProvider<T> {
-  private static final Logger LOG = LoggerFactory.getLogger(
+  @VisibleForTesting
+  static final Logger LOG = LoggerFactory.getLogger(
       ObserverReadProxyProvider.class);
 
   /** Configuration key for {@link #autoMsyncPeriodMs}. */
@@ -251,14 +253,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     }
     currentIndex = (currentIndex + 1) % nameNodeProxies.size();
     currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
-    try {
-      HAServiceState state = currentProxy.proxy.getHAServiceState();
-      currentProxy.setCachedState(state);
-    } catch (IOException e) {
-      LOG.info("Failed to connect to {}. Setting cached state to Standby",
-          currentProxy.getAddress(), e);
-      currentProxy.setCachedState(HAServiceState.STANDBY);
-    }
+    currentProxy.setCachedState(getHAServiceState(currentProxy));
     LOG.debug("Changed current proxy from {} to {}",
         initial == null ? "none" : initial.proxyInfo,
         currentProxy.proxyInfo);
@@ -266,6 +261,31 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   }
 
   /**
+   * Fetch the service state from a proxy. If it is unable to be fetched,
+   * assume it is in standby state, but log the exception.
+   */
+  private HAServiceState getHAServiceState(NNProxyInfo<T> proxyInfo) {
+    IOException ioe;
+    try {
+      return proxyInfo.proxy.getHAServiceState();
+    } catch (RemoteException re) {
+      // Though a Standby will allow a getHAServiceState call, it won't allow
+      // delegation token lookup, so if DT is used it throws StandbyException
+      if (re.unwrapRemoteException() instanceof StandbyException) {
+        LOG.debug("NameNode {} threw StandbyException when fetching HAState",
+            proxyInfo.getAddress());
+        return HAServiceState.STANDBY;
+      }
+      ioe = re;
+    } catch (IOException e) {
+      ioe = e;
+    }
+    LOG.info("Failed to connect to {}. Assuming Standby state",
+        proxyInfo.getAddress(), ioe);
+    return HAServiceState.STANDBY;
+  }
+
+  /**
    * This will call {@link ClientProtocol#msync()} on the active NameNode
    * (via the {@link #failoverProxy}) to initialize the state of this client.
    * Calling it multiple times is a no-op; only the first will perform an
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index f4843ac..261bf8c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -276,24 +276,34 @@ public abstract class HATestUtil {
   /** Sets the required configurations for performing failover.  */
   public static void setFailoverConfigurations(MiniDFSCluster cluster,
       Configuration conf, String logicalName, int nsIndex) {
+    setFailoverConfigurations(cluster, conf, logicalName, nsIndex,
+        ConfiguredFailoverProxyProvider.class);
+  }
+
+  /** Sets the required configurations for performing failover.  */
+  public static <P extends FailoverProxyProvider<?>> void
+      setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf,
+      String logicalName, int nsIndex, Class<P> classFPP) {
     MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
     List<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>(3);
     for (MiniDFSCluster.NameNodeInfo nn : nns) {
       nnAddresses.add(nn.nameNode.getNameNodeAddress());
     }
-    setFailoverConfigurations(conf, logicalName, nnAddresses);
+    setFailoverConfigurations(conf, logicalName, nnAddresses, classFPP);
   }
 
   public static void setFailoverConfigurations(Configuration conf, String logicalName,
       InetSocketAddress ... nnAddresses){
-    setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses));
+    setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses),
+        ConfiguredFailoverProxyProvider.class);
   }
 
   /**
    * Sets the required configurations for performing failover
    */
-  public static void setFailoverConfigurations(Configuration conf,
-      String logicalName, List<InetSocketAddress> nnAddresses) {
+  public static <P extends FailoverProxyProvider<?>> void
+      setFailoverConfigurations(Configuration conf, String logicalName,
+      List<InetSocketAddress> nnAddresses, Class<P> classFPP) {
     setFailoverConfigurations(conf, logicalName,
         Iterables.transform(nnAddresses, new Function<InetSocketAddress, String>() {
 
@@ -302,7 +312,7 @@ public abstract class HATestUtil {
           public String apply(InetSocketAddress addr) {
             return "hdfs://" + addr.getHostName() + ":" + addr.getPort();
           }
-        }), ConfiguredFailoverProxyProvider.class);
+        }), classFPP);
   }
 
   public static <P extends FailoverProxyProvider<?>>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
index 7076ec6..718d13f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -47,6 +48,7 @@ import org.apache.hadoop.test.Whitebox;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.event.Level;
 
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -112,6 +114,50 @@ public class TestDelegationTokensWithHA {
     }
   }
 
+  /**
+   * Test that, when using ObserverReadProxyProvider with DT authentication,
+   * the ORPP gracefully handles when the Standby NN throws a StandbyException.
+   */
+  @Test(timeout = 300000)
+  public void testObserverReadProxyProviderWithDT() throws Exception {
+    // Make the first node standby, so that the ORPP will try it first
+    // instead of just using and succeeding on the active
+    cluster.transitionToStandby(0);
+    cluster.transitionToActive(1);
+
+    HATestUtil.setFailoverConfigurations(cluster, conf,
+        HATestUtil.getLogicalHostname(cluster), 0,
+        ObserverReadProxyProvider.class);
+    conf.setBoolean("fs.hdfs.impl.disable.cache", true);
+
+    dfs = (DistributedFileSystem) FileSystem.get(conf);
+    final UserGroupInformation ugi = UserGroupInformation
+        .createRemoteUser("JobTracker");
+    final Token<DelegationTokenIdentifier> token =
+        getDelegationToken(dfs, ugi.getShortUserName());
+    ugi.addToken(token);
+    // Recreate the DFS, this time authenticating using a DT
+    dfs = ugi.doAs((PrivilegedExceptionAction<DistributedFileSystem>)
+        () -> (DistributedFileSystem) FileSystem.get(conf));
+
+    GenericTestUtils.setLogLevel(ObserverReadProxyProvider.LOG, Level.DEBUG);
+    GenericTestUtils.LogCapturer logCapture = GenericTestUtils.LogCapturer
+        .captureLogs(ObserverReadProxyProvider.LOG);
+    try {
+      dfs.access(new Path("/"), FsAction.READ);
+      assertTrue(logCapture.getOutput()
+          .contains("threw StandbyException when fetching HAState"));
+      HATestUtil.isSentToAnyOfNameNodes(dfs, cluster, 1);
+
+      cluster.shutdownNameNode(0);
+      logCapture.clearOutput();
+      dfs.access(new Path("/"), FsAction.READ);
+      assertTrue(logCapture.getOutput().contains("Assuming Standby state"));
+    } finally {
+      logCapture.stopCapturing();
+    }
+  }
+
   @Test(timeout = 300000)
   public void testDelegationTokenDFSApi() throws Exception {
     final Token<DelegationTokenIdentifier> token =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org