You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2018/08/31 16:11:13 UTC

[47/47] hadoop git commit: HDFS-13779. [SBN read] Implement proper failover and observer failure handling logic for for ObserverReadProxyProvider. Contributed by Erik Krogen.

HDFS-13779. [SBN read] Implement proper failover and observer failure handling logic for for ObserverReadProxyProvider. Contributed by Erik Krogen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/039c158d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/039c158d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/039c158d

Branch: refs/heads/HDFS-12943
Commit: 039c158d2c8a45906e6ea5f9661391bc541ab0cb
Parents: 5320173
Author: Erik Krogen <xk...@apache.org>
Authored: Fri Aug 24 05:04:27 2018 -0700
Committer: Erik Krogen <xk...@apache.org>
Committed: Fri Aug 31 09:09:59 2018 -0700

----------------------------------------------------------------------
 .../ha/AbstractNNFailoverProxyProvider.java     |  16 +
 .../namenode/ha/ObserverReadProxyProvider.java  | 255 ++++++++------
 .../server/namenode/ha/TestObserverNode.java    |  27 +-
 .../ha/TestObserverReadProxyProvider.java       | 335 +++++++++++++++++++
 4 files changed, 532 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c158d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
index 252b70d..32edb36 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/AbstractNNFailoverProxyProvider.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HAUtilClient;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -111,6 +112,12 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
    */
   public static class NNProxyInfo<T> extends ProxyInfo<T> {
     private InetSocketAddress address;
+    /**
+     * The currently known state of the NameNode represented by this ProxyInfo.
+     * This may be out of date if the NameNode has changed state since the last
+     * time the state was checked.
+     */
+    private HAServiceState cachedState;
 
     public NNProxyInfo(InetSocketAddress address) {
       super(null, address.toString());
@@ -120,6 +127,15 @@ public abstract class AbstractNNFailoverProxyProvider<T> implements
     public InetSocketAddress getAddress() {
       return address;
     }
+
+    public void setCachedState(HAServiceState state) {
+      cachedState = state;
+    }
+
+    public HAServiceState getCachedState() {
+      return cachedState;
+    }
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c158d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index dcae2db..e819282 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -20,18 +20,24 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.URI;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.ClientGSIContext;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.io.retry.AtMostOnce;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
 import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -59,16 +65,18 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   private static final Logger LOG = LoggerFactory.getLogger(
       ObserverReadProxyProvider.class);
 
-  /** Client-side context for syncing with the NameNode server side */
-  private AlignmentContext alignmentContext;
+  /** Client-side context for syncing with the NameNode server side. */
+  private final AlignmentContext alignmentContext;
 
-  private AbstractNNFailoverProxyProvider<T> failoverProxy;
-  /** All NameNdoe proxies */
-  private List<NNProxyInfo<T>> nameNodeProxies =
-      new ArrayList<NNProxyInfo<T>>();
-  /** Proxies for the observer namenodes */
-  private final List<NNProxyInfo<T>> observerProxies =
-      new ArrayList<NNProxyInfo<T>>();
+  /** The inner proxy provider used for active/standby failover. */
+  private final AbstractNNFailoverProxyProvider<T> failoverProxy;
+  /** List of all NameNode proxies. */
+  private final List<NNProxyInfo<T>> nameNodeProxies;
+
+  /** The policy used to determine if an exception is fatal or retriable. */
+  private final RetryPolicy observerRetryPolicy;
+  /** The combined proxy which redirects to other proxies as necessary. */
+  private final ProxyInfo<T> combinedProxy;
 
   /**
    * Whether reading from observer is enabled. If this is false, all read
@@ -77,12 +85,19 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   private boolean observerReadEnabled;
 
   /**
-   * Thread-local index to record the current index in the observer list.
+   * The index into the nameNodeProxies list currently being used. Should only
+   * be accessed in synchronized methods.
+   */
+  private int currentIndex = -1;
+  /**
+   * The proxy being used currently; this will match with currentIndex above.
+   * This field is volatile to allow reads without synchronization; updates
+   * should still be performed synchronously to maintain consistency between
+   * currentIndex and this field.
    */
-  private static final ThreadLocal<Integer> currentIndex =
-      ThreadLocal.withInitial(() -> 0);
+  private volatile NNProxyInfo<T> currentProxy;
 
-  /** The last proxy that has been used. Only used for testing */
+  /** The last proxy that has been used. Only used for testing. */
   private volatile ProxyInfo<T> lastProxy = null;
 
   /**
@@ -90,63 +105,53 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
    * {@link ConfiguredFailoverProxyProvider} for failover.
    */
   public ObserverReadProxyProvider(
-      Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory)
-      throws IOException {
+      Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory) {
     this(conf, uri, xface, factory,
-        new ConfiguredFailoverProxyProvider<T>(conf, uri, xface,factory));
+        new ConfiguredFailoverProxyProvider<>(conf, uri, xface,factory));
   }
 
+  @SuppressWarnings("unchecked")
   public ObserverReadProxyProvider(
       Configuration conf, URI uri, Class<T> xface, HAProxyFactory<T> factory,
-      AbstractNNFailoverProxyProvider<T> failoverProxy)
-      throws IOException {
+      AbstractNNFailoverProxyProvider<T> failoverProxy) {
     super(conf, uri, xface, factory);
     this.failoverProxy = failoverProxy;
     this.alignmentContext = new ClientGSIContext();
     ((ClientHAProxyFactory<T>) factory).setAlignmentContext(alignmentContext);
 
+    // Don't bother configuring the number of retries and such on the retry
+    // policy since it is mainly only used for determining whether or not an
+    // exception is retriable or fatal
+    observerRetryPolicy = RetryPolicies.failoverOnNetworkException(
+        RetryPolicies.TRY_ONCE_THEN_FAIL, 1);
+
     // Get all NameNode proxies
     nameNodeProxies = getProxyAddresses(uri,
         HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY);
-    // Find out all the observer proxies
-    for (NNProxyInfo<T> pi : nameNodeProxies) {
-      createProxyIfNeeded(pi);
-      if (isObserverState(pi)) {
-        observerProxies.add(pi);
-      }
-    }
-
-    // TODO: No observers is not an error
-    // Just direct all reads go to the active NameNode
-    if (observerProxies.isEmpty()) {
-      throw new RuntimeException("Couldn't find any namenode proxy in " +
-          "OBSERVER state");
-    }
-  }
-
-  public synchronized AlignmentContext getAlignmentContext() {
-    return alignmentContext;
-  }
 
-  @SuppressWarnings("unchecked")
-  @Override
-  public synchronized ProxyInfo<T> getProxy() {
-    // We just create a wrapped proxy containing all the proxies
+    // Create a wrapped proxy containing all the proxies. Since this combined
+    // proxy is just redirecting to other proxies, all invocations can share it.
     StringBuilder combinedInfo = new StringBuilder("[");
-
-    for (int i = 0; i < this.observerProxies.size(); i++) {
+    for (int i = 0; i < nameNodeProxies.size(); i++) {
       if (i > 0) {
         combinedInfo.append(",");
       }
-      combinedInfo.append(observerProxies.get(i).proxyInfo);
+      combinedInfo.append(nameNodeProxies.get(i).proxyInfo);
     }
-
     combinedInfo.append(']');
     T wrappedProxy = (T) Proxy.newProxyInstance(
         ObserverReadInvocationHandler.class.getClassLoader(),
-        new Class<?>[]{xface},
-        new ObserverReadInvocationHandler(observerProxies));
-    return new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
+        new Class<?>[] { xface }, new ObserverReadInvocationHandler());
+    combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
+  }
+
+  public AlignmentContext getAlignmentContext() {
+    return alignmentContext;
+  }
+
+  @Override
+  public ProxyInfo<T> getProxy() {
+    return combinedProxy;
   }
 
   @Override
@@ -159,8 +164,11 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
    *
    * @return whether the 'method' is a read-only operation.
    */
-  private boolean isRead(Method method) {
-    return method.isAnnotationPresent(ReadOnly.class);
+  private static boolean isRead(Method method) {
+    if (!method.isAnnotationPresent(ReadOnly.class)) {
+      return false;
+    }
+    return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly();
   }
 
   @VisibleForTesting
@@ -168,21 +176,13 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     this.observerReadEnabled = flag;
   }
 
-  /**
-   * After getting exception 'ex', whether we should retry the current request
-   * on a different observer.
-   */
-  private boolean shouldRetry(Exception ex) throws Exception {
-    // TODO: implement retry policy
-    return true;
-  }
-
   @VisibleForTesting
   ProxyInfo<T> getLastProxy() {
     return lastProxy;
   }
 
-  boolean isObserverState(NNProxyInfo<T> pi) {
+  private static <T extends ClientProtocol> HAServiceState getServiceState(
+      NNProxyInfo<T> pi) {
     // TODO: should introduce new ClientProtocol method to verify the
     // underlying service state, which does not require superuser access
     // The is a workaround
@@ -190,7 +190,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     try {
       // Verify write access first
       pi.proxy.reportBadBlocks(new LocatedBlock[0]);
-      return false; // Only active NameNode allows write
+      return HAServiceState.ACTIVE; // Only active NameNode allows write
     } catch (RemoteException re) {
       IOException sbe = re.unwrapRemoteException(StandbyException.class);
       if (!(sbe instanceof StandbyException)) {
@@ -200,15 +200,16 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
       ioe = e;
     }
     if (ioe != null) {
-      LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
-      return false;
+      LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
+      return HAServiceState.STANDBY; // Just assume standby in this case
+                                     // Anything besides observer is fine
     }
     // Verify read access
     // For now we assume only Observer nodes allow reads
     // Stale reads on StandbyNode should be turned off
     try {
       pi.proxy.checkAccess("/", FsAction.READ);
-      return true;
+      return HAServiceState.OBSERVER;
     } catch (RemoteException re) {
       IOException sbe = re.unwrapRemoteException(StandbyException.class);
       if (!(sbe instanceof StandbyException)) {
@@ -218,29 +219,60 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
       ioe = e;
     }
     if (ioe != null) {
-      LOG.error("Failed to connect to {}", pi.getAddress(), ioe);
+      LOG.warn("Failed to connect to {}", pi.getAddress(), ioe);
     }
-    return false;
+    return HAServiceState.STANDBY;
   }
 
+  /**
+   * Return the currently used proxy. If there is none, first calls
+   * {@link #changeProxy(NNProxyInfo)} to initialize one.
+   */
+  private NNProxyInfo<T> getCurrentProxy() {
+    if (currentProxy == null) {
+      changeProxy(null);
+    }
+    return currentProxy;
+  }
 
-  class ObserverReadInvocationHandler implements InvocationHandler {
-    final List<NNProxyInfo<T>> observerProxies;
-    final ProxyInfo<T> activeProxy;
-
-    ObserverReadInvocationHandler(List<NNProxyInfo<T>> observerProxies) {
-      this.observerProxies = observerProxies;
-      this.activeProxy = failoverProxy.getProxy();
+  /**
+   * Move to the next proxy in the proxy list. If the NNProxyInfo supplied by
+   * the caller does not match the current proxy, the call is ignored; this is
+   * to handle concurrent calls (to avoid changing the proxy multiple times).
+   * The service state of the newly selected proxy will be updated before
+   * returning.
+   *
+   * @param initial The expected current proxy
+   */
+  private synchronized void changeProxy(NNProxyInfo<T> initial) {
+    if (currentProxy != initial) {
+      // Must have been a concurrent modification; ignore the move request
+      return;
     }
+    // Attempt to force concurrent callers of getCurrentProxy to wait for the
+    // new proxy; best-effort by setting currentProxy to null
+    currentProxy = null;
+    currentIndex = (currentIndex + 1) % nameNodeProxies.size();
+    currentProxy = createProxyIfNeeded(nameNodeProxies.get(currentIndex));
+    currentProxy.setCachedState(getServiceState(currentProxy));
+    LOG.debug("Changed current proxy from {} to {}",
+        initial == null ? "none" : initial.proxyInfo,
+        currentProxy.proxyInfo);
+  }
+
+  /**
+   * An InvocationHandler to handle incoming requests. This class's invoke
+   * method contains the primary logic for redirecting to observers.
+   * 
+   * If observer reads are enabled, attempt to send read operations to the
+   * current proxy. If it is not an observer, or the observer fails, adjust
+   * the current proxy and retry on the next one. If all proxies are tried
+   * without success, the request is forwarded to the active.
+   *
+   * Write requests are always forwarded to the active.
+   */
+  private class ObserverReadInvocationHandler implements InvocationHandler {
 
-    /**
-     * Sends read operations to the observer (if enabled) specified by the
-     * current index, and send write operations to the active. If a observer
-     * fails, we increment the index and retry the next one. If all observers
-     * fail, the request is forwarded to the active.
-     *
-     * Write requests are always forwarded to the active.
-     */
     @Override
     public Object invoke(Object proxy, final Method method, final Object[] args)
         throws Throwable {
@@ -248,33 +280,65 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
       Object retVal;
 
       if (observerReadEnabled && isRead(method)) {
-        // Loop through all the proxies, starting from the current index.
-        for (int i = 0; i < observerProxies.size(); i++) {
-          NNProxyInfo<T> current = observerProxies.get(currentIndex.get());
+        int failedObserverCount = 0;
+        int activeCount = 0;
+        int standbyCount = 0;
+        for (int i = 0; i < nameNodeProxies.size(); i++) {
+          NNProxyInfo<T> current = getCurrentProxy();
+          HAServiceState currState = current.getCachedState();
+          if (currState != HAServiceState.OBSERVER) {
+            if (currState == HAServiceState.ACTIVE) {
+              activeCount++;
+            } else if (currState == HAServiceState.STANDBY) {
+              standbyCount++;
+            }
+            LOG.debug("Skipping proxy {} for {} because it is in state {}",
+                current.proxyInfo, method.getName(), currState);
+            changeProxy(current);
+            continue;
+          }
+          LOG.debug("Attempting to service {} using proxy {}",
+              method.getName(), current.proxyInfo);
           try {
             retVal = method.invoke(current.proxy, args);
             lastProxy = current;
+            LOG.debug("Invocation of {} using {} was successful",
+                method.getName(), current.proxyInfo);
             return retVal;
-          } catch (Exception e) {
-            if (!shouldRetry(e)) {
+          } catch (InvocationTargetException ite) {
+            if (!(ite.getCause() instanceof Exception)) {
+              throw ite.getCause();
+            }
+            Exception e = (Exception) ite.getCause();
+            RetryAction retryInfo = observerRetryPolicy.shouldRetry(e, 0, 0,
+                method.isAnnotationPresent(Idempotent.class)
+                    || method.isAnnotationPresent(AtMostOnce.class));
+            if (retryInfo.action == RetryAction.RetryDecision.FAIL) {
               throw e;
+            } else {
+              failedObserverCount++;
+              LOG.warn(
+                  "Invocation returned exception on [{}]; {} failure(s) so far",
+                  current.proxyInfo, failedObserverCount, e);
+              changeProxy(current);
             }
-            currentIndex.set((currentIndex.get() + 1) % observerProxies.size());
-            LOG.warn("Invocation returned exception on [{}]",
-                current.proxyInfo, e.getCause());
           }
         }
 
         // If we get here, it means all observers have failed.
-        LOG.warn("All observers have failed for read request {}. " +
-            "Fall back on active: {}", method.getName(), activeProxy);
+        LOG.warn("{} observers have failed for read request {}; also found " +
+            "{} standby and {} active. Falling back to active.",
+            failedObserverCount, standbyCount, activeCount, method.getName());
       }
 
       // Either all observers have failed, or that it is a write request.
       // In either case, we'll forward the request to active NameNode.
+      LOG.debug("Using failoverProxy to service {}", method.getName());
+      ProxyInfo<T> activeProxy = failoverProxy.getProxy();
       try {
         retVal = method.invoke(activeProxy.proxy, args);
-      } catch (Exception e) {
+      } catch (InvocationTargetException e) {
+        // This exception will be handled by higher layers
         throw e.getCause();
       }
       lastProxy = activeProxy;
@@ -284,7 +348,6 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
 
   @Override
   public synchronized void close() throws IOException {
-    failoverProxy.close();
     for (ProxyInfo<T> pi : nameNodeProxies) {
       if (pi.proxy != null) {
         if (pi.proxy instanceof Closeable) {
@@ -292,8 +355,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
         } else {
           RPC.stopProxy(pi.proxy);
         }
+        // Set to null to avoid the failoverProxy having to re-do the close
+        // if it is sharing a proxy instance
+        pi.proxy = null;
       }
     }
+    failoverProxy.close();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c158d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
index de34454..16371b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
@@ -27,22 +27,23 @@ import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.io.retry.FailoverProxyProvider;
 import org.apache.hadoop.io.retry.RetryInvocationHandler;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Proxy;
 import java.net.URI;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -200,6 +201,9 @@ public class TestObserverNode {
     // Start the observer again - requests should go to observer
     dfsCluster.restartNameNode(2);
     dfsCluster.transitionToObserver(2);
+    // The first request goes to the active because it has not refreshed yet;
+    // the second will properly go to the observer
+    dfs.getFileStatus(testPath);
     dfs.getFileStatus(testPath);
     assertSentTo(2);
   }
@@ -231,6 +235,9 @@ public class TestObserverNode {
 
     dfsCluster.transitionToObserver(2);
     dfs.getFileStatus(testPath);
+    // The first request goes to the active because it has not refreshed yet;
+    // the second will properly go to the observer
+    dfs.getFileStatus(testPath);
     assertSentTo(2);
   }
 
@@ -291,6 +298,10 @@ public class TestObserverNode {
     assertEquals(0, rc);
   }
 
+  // TODO this does not currently work because fetching the service state from
+  // e.g. the StandbyNameNode also waits for the transaction ID to catch up.
+  // This is disabled pending HDFS-13872 and HDFS-13749.
+  @Ignore("Disabled until HDFS-13872 and HDFS-13749 are committed")
   @Test
   public void testMsyncSimple() throws Exception {
     // disable fast path here because this test's assertions are based on the
@@ -304,7 +315,8 @@ public class TestObserverNode {
     setUpCluster(1);
     setObserverRead(true);
 
-    AtomicBoolean readSucceed = new AtomicBoolean(false);
+    // 0 == not completed, 1 == succeeded, -1 == failed
+    AtomicInteger readStatus = new AtomicInteger(0);
 
     dfs.mkdir(testPath, FsPermission.getDefault());
     assertSentTo(0);
@@ -313,20 +325,21 @@ public class TestObserverNode {
       try {
         // this read will block until roll and tail edits happen.
         dfs.getFileStatus(testPath);
-        readSucceed.set(true);
+        readStatus.set(1);
       } catch (IOException e) {
         e.printStackTrace();
+        readStatus.set(-1);
       }
     });
 
     reader.start();
     // the reader is still blocking, not succeeded yet.
-    assertFalse(readSucceed.get());
+    assertEquals(0, readStatus.get());
     rollEditLogAndTail(0);
     // wait a while for all the change to be done
-    Thread.sleep(100);
+    GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
     // the reader should have succeed.
-    assertTrue(readSucceed.get());
+    assertEquals(1, readStatus.get());
   }
 
   private void setUpCluster(int numObservers) throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/039c158d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
new file mode 100644
index 0000000..4d5bc13
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverReadProxyProvider.java
@@ -0,0 +1,335 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.ha;
+
+import com.google.common.base.Joiner;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+
+/**
+ * Tests for {@link ObserverReadProxyProvider} under various configurations of
+ * NameNode states. Mainly testing that the proxy provider picks the correct
+ * NameNode to communicate with.
+ */
+public class TestObserverReadProxyProvider {
+
+  private static final LocatedBlock[] EMPTY_BLOCKS = new LocatedBlock[0];
+  private String ns;
+  private URI nnURI;
+  private Configuration conf;
+
+  private ObserverReadProxyProvider<ClientProtocol> proxyProvider;
+  private ClientProtocolAnswer[] namenodeAnswers;
+  private String[] namenodeAddrs;
+
+  @Before
+  public void setup() throws Exception {
+    ns = "testcluster";
+    nnURI = URI.create("hdfs://" + ns);
+    conf = new Configuration();
+    conf.set(HdfsClientConfigKeys.DFS_NAMESERVICES, ns);
+  }
+
+  private void setupProxyProvider(int namenodeCount) throws Exception {
+    String[] namenodeIDs = new String[namenodeCount];
+    namenodeAddrs = new String[namenodeCount];
+    namenodeAnswers = new ClientProtocolAnswer[namenodeCount];
+    ClientProtocol[] proxies = new ClientProtocol[namenodeCount];
+    Map<String, ClientProtocol> proxyMap = new HashMap<>();
+    for (int i  = 0; i < namenodeCount; i++) {
+      namenodeIDs[i] = "nn" + i;
+      namenodeAddrs[i] = "namenode" + i + ".test:8020";
+      conf.set(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY + "." + ns +
+          "." + namenodeIDs[i], namenodeAddrs[i]);
+      namenodeAnswers[i] = new ClientProtocolAnswer();
+      proxies[i] = mock(ClientProtocol.class);
+      doWrite(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
+      doRead(Mockito.doAnswer(namenodeAnswers[i]).when(proxies[i]));
+      proxyMap.put(namenodeAddrs[i], proxies[i]);
+    }
+    conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
+        Joiner.on(",").join(namenodeIDs));
+    proxyProvider = new ObserverReadProxyProvider<>(conf, nnURI,
+        ClientProtocol.class, new ClientHAProxyFactory<ClientProtocol>() {
+      @Override
+      public ClientProtocol createProxy(Configuration conf,
+          InetSocketAddress nnAddr, Class<ClientProtocol> xface,
+          UserGroupInformation ugi, boolean withRetries,
+          AtomicBoolean fallbackToSimpleAuth) {
+        return proxyMap.get(nnAddr.toString());
+      }
+    });
+    proxyProvider.setObserverReadEnabled(true);
+  }
+
+  @Test
+  public void testReadOperationOnObserver() throws Exception {
+    setupProxyProvider(3);
+    namenodeAnswers[0].setActiveState();
+    namenodeAnswers[2].setObserverState();
+
+    doRead();
+    assertHandledBy(2);
+  }
+
+  @Test
+  public void testWriteOperationOnActive() throws Exception {
+    setupProxyProvider(3);
+    namenodeAnswers[0].setActiveState();
+    namenodeAnswers[2].setObserverState();
+
+    doWrite();
+    assertHandledBy(0);
+  }
+
+  @Test
+  public void testUnreachableObserverWithNoBackup() throws Exception {
+    setupProxyProvider(2);
+    namenodeAnswers[0].setActiveState();
+    namenodeAnswers[1].setObserverState();
+
+    namenodeAnswers[1].setUnreachable(true);
+    // Confirm that read still succeeds even though observer is not available
+    doRead();
+    assertHandledBy(0);
+  }
+
+  @Test
+  public void testUnreachableObserverWithMultiple() throws Exception {
+    setupProxyProvider(4);
+    namenodeAnswers[0].setActiveState();
+    namenodeAnswers[2].setObserverState();
+    namenodeAnswers[3].setObserverState();
+
+    doRead();
+    assertHandledBy(2);
+
+    namenodeAnswers[2].setUnreachable(true);
+    doRead();
+    // Fall back to the second observer node
+    assertHandledBy(3);
+
+    namenodeAnswers[2].setUnreachable(false);
+    doRead();
+    // Current index has changed, so although the first observer is back,
+    // it should continue requesting from the second observer
+    assertHandledBy(3);
+
+    namenodeAnswers[3].setUnreachable(true);
+    doRead();
+    // Now that second is unavailable, go back to using the first observer
+    assertHandledBy(2);
+
+    namenodeAnswers[2].setUnreachable(true);
+    doRead();
+    // Both observers are now unavailable, so it should fall back to active
+    assertHandledBy(0);
+  }
+
+  @Test
+  public void testObserverToActive() throws Exception {
+    setupProxyProvider(3);
+    namenodeAnswers[0].setActiveState();
+    namenodeAnswers[1].setObserverState();
+    namenodeAnswers[2].setObserverState();
+
+    doWrite();
+    assertHandledBy(0);
+
+    // Transition an observer to active
+    namenodeAnswers[0].setStandbyState();
+    namenodeAnswers[1].setActiveState();
+    try {
+      doWrite();
+      fail("Write should fail; failover required");
+    } catch (RemoteException re) {
+      assertEquals(re.getClassName(),
+          StandbyException.class.getCanonicalName());
+    }
+    proxyProvider.performFailover(proxyProvider.getProxy().proxy);
+    doWrite();
+    // After failover, previous observer is now active
+    assertHandledBy(1);
+    doRead();
+    assertHandledBy(2);
+
+    // Transition back to original state but second observer not available
+    namenodeAnswers[0].setActiveState();
+    namenodeAnswers[1].setObserverState();
+    namenodeAnswers[2].setUnreachable(true);
+    for (int i = 0; i < 2; i++) {
+      try {
+        doWrite();
+        fail("Should have failed");
+      } catch (IOException ioe) {
+        proxyProvider.performFailover(proxyProvider.getProxy().proxy);
+      }
+    }
+    doWrite();
+    assertHandledBy(0);
+
+    doRead();
+    assertHandledBy(1);
+  }
+
+  @Test
+  public void testObserverToStandby() throws Exception {
+    setupProxyProvider(3);
+    namenodeAnswers[0].setActiveState();
+    namenodeAnswers[1].setObserverState();
+    namenodeAnswers[2].setObserverState();
+
+    doRead();
+    assertHandledBy(1);
+
+    namenodeAnswers[1].setStandbyState();
+    doRead();
+    assertHandledBy(2);
+
+    namenodeAnswers[2].setStandbyState();
+    doRead();
+    assertHandledBy(0);
+
+    namenodeAnswers[1].setObserverState();
+    doRead();
+    assertHandledBy(1);
+  }
+
+  @Test
+  public void testSingleObserverToStandby() throws Exception {
+    setupProxyProvider(2);
+    namenodeAnswers[0].setActiveState();
+    namenodeAnswers[1].setObserverState();
+
+    doRead();
+    assertHandledBy(1);
+
+    namenodeAnswers[1].setStandbyState();
+    doRead();
+    assertHandledBy(0);
+
+    namenodeAnswers[1].setObserverState();
+    // The proxy provider still thinks the second NN is in observer state,
+    // so it will take a second call for it to notice the new observer
+    doRead();
+    doRead();
+    assertHandledBy(1);
+  }
+
+  private void doRead() throws Exception {
+    doRead(proxyProvider.getProxy().proxy);
+  }
+
+  private void doWrite() throws Exception {
+    doWrite(proxyProvider.getProxy().proxy);
+  }
+
+  private void assertHandledBy(int namenodeIdx) {
+    assertEquals(namenodeAddrs[namenodeIdx],
+        proxyProvider.getLastProxy().proxyInfo);
+  }
+
+  private static void doWrite(ClientProtocol client) throws Exception {
+    client.reportBadBlocks(EMPTY_BLOCKS);
+  }
+
+  private static void doRead(ClientProtocol client) throws Exception {
+    client.checkAccess("/", FsAction.READ);
+  }
+
+  /**
+   * An {@link Answer} used for mocking of a {@link ClientProtocol}. Setting
+   * the state or unreachability of this Answer will make the linked
+   * ClientProtocol respond as if it was communicating with a NameNode of
+   * the corresponding state. It is in Standby state by default.
+   */
+  private static class ClientProtocolAnswer implements Answer<Void> {
+
+    private volatile boolean unreachable = false;
+    // Standby state by default
+    private volatile boolean allowWrites = false;
+    private volatile boolean allowReads = false;
+
+    @Override
+    public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+      if (unreachable) {
+        throw new IOException("Unavailable");
+      }
+      switch (invocationOnMock.getMethod().getName()) {
+        case "reportBadBlocks":
+          if (!allowWrites) {
+            throw new RemoteException(StandbyException.class.getCanonicalName(),
+                "No writes!");
+          }
+          return null;
+        case "checkAccess":
+          if (!allowReads) {
+            throw new RemoteException(StandbyException.class.getCanonicalName(),
+                "No reads!");
+          }
+          return null;
+        default:
+          throw new IllegalArgumentException(
+              "Only reportBadBlocks and checkAccess supported!");
+      }
+    }
+
+    void setUnreachable(boolean unreachable) {
+      this.unreachable = unreachable;
+    }
+
+    void setActiveState() {
+      allowReads = true;
+      allowWrites = true;
+    }
+
+    void setStandbyState() {
+      allowReads = false;
+      allowWrites = false;
+    }
+
+    void setObserverState() {
+      allowReads = true;
+      allowWrites = false;
+    }
+
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org