You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aa...@apache.org on 2019/03/04 08:00:16 UTC

[hadoop] 01/05: HDFS-14272. [SBN read] Make ObserverReadProxyProvider initialize its state ID against the active NN on startup. Contributed by Erik Krogen.

This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit fc17ba172bde2aeea98a84f9a8cd104a2fada673
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Fri Mar 1 12:58:55 2019 -0800

    HDFS-14272. [SBN read] Make ObserverReadProxyProvider initialize its state ID against the active NN on startup. Contributed by Erik Krogen.
---
 .../namenode/ha/ObserverReadProxyProvider.java     | 34 ++++++++++
 .../hdfs/server/namenode/NameNodeRpcServer.java    |  3 +-
 .../namenode/ha/TestConsistentReadsObserver.java   | 73 +++++++++++++++++++++-
 3 files changed, 107 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index 3cf14cb..a17c640 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -88,6 +88,15 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   private boolean observerReadEnabled;
 
   /**
+   * A client using an ObserverReadProxyProvider should first sync with the
+   * active NameNode on startup. This ensures that the client reads data which
+   * is consistent with the state of the world as of the time of its
+   * instantiation. This variable will be true after this initial sync has
+   * been performed.
+   */
+  private volatile boolean msynced = false;
+
+  /**
    * The index into the nameNodeProxies list currently being used. Should only
    * be accessed in synchronized methods.
    */
@@ -225,6 +234,22 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   }
 
   /**
+   * This will call {@link ClientProtocol#msync()} on the active NameNode
+   * (via the {@link #failoverProxy}) to initialize the state of this client.
+   * Calling it multiple times is a no-op; only the first will perform an
+   * msync.
+   *
+   * @see #msynced
+   */
+  private synchronized void initializeMsync() throws IOException {
+    if (msynced) {
+      return; // No need for an msync
+    }
+    failoverProxy.getProxy().proxy.msync();
+    msynced = true;
+  }
+
+  /**
    * An InvocationHandler to handle incoming requests. This class's invoke
    * method contains the primary logic for redirecting to observers.
    *
@@ -244,6 +269,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
       Object retVal;
 
       if (observerReadEnabled && isRead(method)) {
+        if (!msynced) {
+          // An msync() must first be performed to ensure that this client is
+          // up-to-date with the active's state. This will only be done once.
+          initializeMsync();
+        }
+
         int failedObserverCount = 0;
         int activeCount = 0;
         int standbyCount = 0;
@@ -315,6 +346,9 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
         // This exception will be handled by higher layers
         throw e.getCause();
       }
+      // If this was reached, the request reached the active, so the
+      // state is up-to-date with active and no further msync is needed.
+      msynced = true;
       lastProxy = activeProxy;
       return retVal;
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index f50648d..525d9c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1409,7 +1409,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
 
   @Override // ClientProtocol
   public void msync() throws IOException {
-    // TODO : need to be filled up if needed. May be a no-op here.
+    // Check for write access to ensure that msync only happens on active
+    namesystem.checkOperation(OperationCategory.WRITE);
   }
 
   @Override // ClientProtocol
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
index 2845670..2bed37c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
@@ -178,8 +178,12 @@ public class TestConsistentReadsObserver {
         // Therefore, the subsequent getFileStatus call should succeed.
         dfs2.getClient().msync();
         dfs2.getFileStatus(testPath);
-        readStatus.set(1);
-      } catch (IOException e) {
+        if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
+          readStatus.set(1);
+        } else {
+          readStatus.set(-1);
+        }
+      } catch (Exception e) {
         e.printStackTrace();
         readStatus.set(-1);
       }
@@ -196,6 +200,71 @@ public class TestConsistentReadsObserver {
     assertEquals(1, readStatus.get());
   }
 
+  // A new client should first contact the active, before using an observer,
+  // to ensure that it is up-to-date with the current state
+  @Test
+  public void testCallFromNewClient() throws Exception {
+    // Set the order of nodes: Observer, Standby, Active
+    // This is to ensure that test doesn't pass trivially because the active is
+    // the first node contacted
+    dfsCluster.transitionToStandby(0);
+    dfsCluster.transitionToObserver(0);
+    dfsCluster.transitionToStandby(2);
+    dfsCluster.transitionToActive(2);
+    try {
+      // 0 == not completed, 1 == succeeded, -1 == failed
+      AtomicInteger readStatus = new AtomicInteger(0);
+
+      // Initialize the proxies for Observer Node.
+      dfs.getClient().getHAServiceState();
+
+      // Advance Observer's state ID so it is ahead of client's.
+      dfs.mkdir(new Path("/test"), FsPermission.getDefault());
+      dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
+      dfsCluster.getNameNode(0)
+          .getNamesystem().getEditLogTailer().doTailEdits();
+
+      dfs.mkdir(testPath, FsPermission.getDefault());
+      assertSentTo(2);
+
+      Configuration conf2 = new Configuration(conf);
+
+      // Disable FS cache so two different DFS clients will be used.
+      conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
+      DistributedFileSystem dfs2 =
+          (DistributedFileSystem) FileSystem.get(conf2);
+      dfs2.getClient().getHAServiceState();
+
+      Thread reader = new Thread(() -> {
+        try {
+          dfs2.getFileStatus(testPath);
+          readStatus.set(1);
+        } catch (Exception e) {
+          e.printStackTrace();
+          readStatus.set(-1);
+        }
+      });
+
+      reader.start();
+
+      Thread.sleep(100);
+      assertEquals(0, readStatus.get());
+
+      dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
+      dfsCluster.getNameNode(0)
+          .getNamesystem().getEditLogTailer().doTailEdits();
+
+      GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
+      assertEquals(1, readStatus.get());
+    } finally {
+      // Put the cluster back the way it was when the test started
+      dfsCluster.transitionToStandby(2);
+      dfsCluster.transitionToObserver(2);
+      dfsCluster.transitionToStandby(0);
+      dfsCluster.transitionToActive(0);
+    }
+  }
+
   @Test
   public void testUncoordinatedCall() throws Exception {
     // make a write call so that client will be ahead of


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org