You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2019/10/10 16:22:41 UTC

[hadoop] branch branch-2 updated: HDFS-14245. [SBN read] Enable ObserverReadProxyProvider to work with non-ClientProtocol proxy types. Contributed by Erik Krogen.

This is an automated email from the ASF dual-hosted git repository.

xkrogen pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 200c52f  HDFS-14245. [SBN read] Enable ObserverReadProxyProvider to work with non-ClientProtocol proxy types. Contributed by Erik Krogen.
200c52f is described below

commit 200c52f78b3237e6aeb3bca66a3ac0afa00e03db
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Wed Apr 17 14:38:24 2019 -0700

    HDFS-14245. [SBN read] Enable ObserverReadProxyProvider to work with non-ClientProtocol proxy types. Contributed by Erik Krogen.
    
    (cherry picked from 5847e0014343f60f853cb796781ca1fa03a72efd)
    (cherry picked from 6630c9b75d65deefb5550e355eef7783909a57bc)
    (cherry picked from 9fdb849e034573bb44abd593eefa1e13a3261376)
---
 .../ha/AbstractNNFailoverProxyProvider.java        |  3 +-
 .../namenode/ha/ObserverReadProxyProvider.java     | 54 ++++++++++++++++------
 .../namenode/ha/TestDelegationTokensWithHA.java    |  2 +-
 .../hdfs/server/namenode/ha/TestObserverNode.java  | 12 +++++
 .../namenode/ha/TestObserverReadProxyProvider.java | 29 ++++++++++++
 5 files changed, 83 insertions(+), 17 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
index 572cb1c..646b100 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
@@ -115,7 +115,8 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
     /**
      * The currently known state of the NameNode represented by this ProxyInfo.
      * This may be out of date if the NameNode has changed state since the last
-     * time the state was checked.
+     * time the state was checked. If the NameNode could not be contacted, this
+     * will store null to indicate an unknown state.
      */
     private HAServiceState cachedState;
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index 0d9d3e7..c30623b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -66,7 +66,7 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class ObserverReadProxyProvider<T extends ClientProtocol>
+public class ObserverReadProxyProvider<T>
     extends AbstractNNFailoverProxyProvider<T> {
   @VisibleForTesting
   static final Logger LOG = LoggerFactory.getLogger(
@@ -189,7 +189,13 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
         AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
 
     // TODO : make this configurable or remove this variable
-    this.observerReadEnabled = true;
+    if (wrappedProxy instanceof ClientProtocol) {
+      this.observerReadEnabled = true;
+    } else {
+      LOG.info("Disabling observer reads for {} because the requested proxy "
+          + "class does not implement {}", uri, ClientProtocol.class.getName());
+      this.observerReadEnabled = false;
+    }
   }
 
   public AlignmentContext getAlignmentContext() {
@@ -267,7 +273,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   private HAServiceState getHAServiceState(NNProxyInfo<T> proxyInfo) {
     IOException ioe;
     try {
-      return proxyInfo.proxy.getHAServiceState();
+      return getProxyAsClientProtocol(proxyInfo.proxy).getHAServiceState();
     } catch (RemoteException re) {
       // Though a Standby will allow a getHAServiceState call, it won't allow
       // delegation token lookup, so if DT is used it throws StandbyException
@@ -284,7 +290,19 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
       LOG.debug("Failed to connect to {} while fetching HAServiceState",
           proxyInfo.getAddress(), ioe);
     }
-    return HAServiceState.STANDBY;
+    return null;
+  }
+
+  /**
+   * Return the input proxy, cast as a {@link ClientProtocol}. This catches any
+   * {@link ClassCastException} and wraps it in a more helpful message. This
+   * should ONLY be called if the caller is certain that the proxy is, in fact,
+   * a {@link ClientProtocol}.
+   */
+  private ClientProtocol getProxyAsClientProtocol(T proxy) {
+    assert proxy instanceof ClientProtocol : "BUG: Attempted to use proxy "
+        + "of class " + proxy.getClass() + " as if it was a ClientProtocol.";
+    return (ClientProtocol) proxy;
   }
 
   /**
@@ -299,7 +317,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     if (msynced) {
       return; // No need for an msync
     }
-    failoverProxy.getProxy().proxy.msync();
+    getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
     msynced = true;
     lastMsyncTimeMs = Time.monotonicNow();
   }
@@ -315,7 +333,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   private void autoMsyncIfNecessary() throws IOException {
     if (autoMsyncPeriodMs == 0) {
       // Always msync
-      failoverProxy.getProxy().proxy.msync();
+      getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
     } else if (autoMsyncPeriodMs > 0) {
       if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
         synchronized (this) {
@@ -324,7 +342,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
           // Re-check the entry criterion since the status may have changed
           // while waiting for the lock.
           if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
-            failoverProxy.getProxy().proxy.msync();
+            getProxyAsClientProtocol(failoverProxy.getProxy().proxy).msync();
             lastMsyncTimeMs = Time.monotonicNow();
           }
         }
@@ -363,6 +381,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
         int failedObserverCount = 0;
         int activeCount = 0;
         int standbyCount = 0;
+        int unreachableCount = 0;
         for (int i = 0; i < nameNodeProxies.size(); i++) {
           NNProxyInfo<T> current = getCurrentProxy();
           HAServiceState currState = current.getCachedState();
@@ -371,9 +390,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
               activeCount++;
             } else if (currState == HAServiceState.STANDBY) {
               standbyCount++;
+            } else if (currState == null) {
+              unreachableCount++;
             }
             LOG.debug("Skipping proxy {} for {} because it is in state {}",
-                current.proxyInfo, method.getName(), currState);
+                current.proxyInfo, method.getName(),
+                currState == null ? "unreachable" : currState);
             changeProxy(current);
             continue;
           }
@@ -420,10 +442,10 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
         // be that there is simply no Observer node running at all.
         if (failedObserverCount > 0) {
           // If we get here, it means all observers have failed.
-          LOG.warn("{} observers have failed for read request {}; "
-                  + "also found {} standby, {} active. "
-                  + "Falling back to active.", failedObserverCount,
-              method.getName(), standbyCount, activeCount);
+          LOG.warn("{} observers have failed for read request {}; also found "
+                  + "{} standby, {} active, and {} unreachable. Falling back "
+                  + "to active.", failedObserverCount, method.getName(),
+              standbyCount, activeCount, unreachableCount);
         } else {
           if (LOG.isDebugEnabled()) {
             LOG.debug("Read falling back to active without observer read "
@@ -432,8 +454,9 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
         }
       }
 
-      // Either all observers have failed, or that it is a write request.
-      // In either case, we'll forward the request to active NameNode.
+      // Either all observers have failed, observer reads are disabled,
+      // or this is a write request. In any case, forward the request to
+      // the active NameNode.
       LOG.debug("Using failoverProxy to service {}", method.getName());
       ProxyInfo<T> activeProxy = failoverProxy.getProxy();
       try {
@@ -455,7 +478,8 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
 
     @Override
     public ConnectionId getConnectionId() {
-      return RPC.getConnectionIdForProxy(getCurrentProxy().proxy);
+      return RPC.getConnectionIdForProxy(observerReadEnabled
+          ? getCurrentProxy().proxy : failoverProxy.getProxy().proxy);
     }
   }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
index 1dd270f..d7601d8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDelegationTokensWithHA.java
@@ -156,7 +156,7 @@ public class TestDelegationTokensWithHA {
       cluster.shutdownNameNode(0);
       logCapture.clearOutput();
       dfs.access(new Path("/"), FsAction.READ);
-      assertTrue(logCapture.getOutput().contains("Assuming Standby state"));
+      assertTrue(logCapture.getOutput().contains("Failed to connect to"));
     } finally {
       logCapture.stopCapturing();
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
index 61bc9a7..7449b13 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.namenode.TestFsck;
+import org.apache.hadoop.hdfs.tools.GetGroups;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -119,6 +120,17 @@ public class TestObserverNode {
     fail("active cannot be transitioned to observer");
   }
 
+  /**
+   * Test that non-ClientProtocol proxies such as
+   * {@link org.apache.hadoop.tools.GetUserMappingsProtocol} still work
+   * when run in an environment with observers.
+   */
+  @Test
+  public void testGetGroups() throws Exception {
+    GetGroups getGroups = new GetGroups(conf);
+    assertEquals(0, getGroups.run(new String[0]));
+  }
+
   @Test
   public void testNoObserverToActive() throws Exception {
     try {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
index de9b3bd..b4237f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.tools.GetUserMappingsProtocol;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -42,9 +43,12 @@ import org.mockito.stubbing.Answer;
 
 import static org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 
 /**
  * Tests for {@link ObserverReadProxyProvider} under various configurations of
@@ -116,6 +120,31 @@ public class TestObserverReadProxyProvider {
   }
 
   @Test
+  public void testWithNonClientProxy() throws Exception {
+    setupProxyProvider(2); // This will initialize all of the instance fields
+    final String fakeUser = "fakeUser";
+    final String[] fakeGroups = {"fakeGroup"};
+    HAProxyFactory<GetUserMappingsProtocol> proxyFactory =
+        new NameNodeHAProxyFactory<GetUserMappingsProtocol>() {
+          @Override
+          public GetUserMappingsProtocol createProxy(Configuration config,
+              InetSocketAddress addr, Class<GetUserMappingsProtocol> xface,
+              UserGroupInformation ugi, boolean withRetries,
+              AtomicBoolean fallbackToSimpleAuth) throws IOException {
+            GetUserMappingsProtocol proxy =
+                mock(GetUserMappingsProtocol.class);
+            when(proxy.getGroupsForUser(fakeUser)).thenReturn(fakeGroups);
+            return proxy;
+          }
+        };
+    ObserverReadProxyProvider<GetUserMappingsProtocol> userProxyProvider =
+        new ObserverReadProxyProvider<>(conf, nnURI,
+            GetUserMappingsProtocol.class, proxyFactory);
+    assertArrayEquals(fakeGroups,
+        userProxyProvider.getProxy().proxy.getGroupsForUser(fakeUser));
+  }
+
+  @Test
   public void testReadOperationOnObserver() throws Exception {
     setupProxyProvider(3);
     namenodeAnswers[0].setActiveState();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org