You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:51 UTC

[hadoop] 48/50: HDFS-14211. [SBN Read]. Add a configurable flag to enable always-msync mode to ObserverReadProxyProvider. Contributed by Erik Krogen.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 3c6b74a6a4eade993cb4e9efd7eb58a9394bec35
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Fri Mar 8 10:35:31 2019 -0800

    HDFS-14211. [SBN Read]. Add a configurable flag to enable always-msync mode to ObserverReadProxyProvider. Contributed by Erik Krogen.
---
 .../namenode/ha/ObserverReadProxyProvider.java     | 64 ++++++++++++++++++++++
 .../src/site/markdown/ObserverNameNode.md          | 48 +++++++++++++++-
 .../namenode/ha/TestConsistentReadsObserver.java   | 56 +++++++++++++++----
 3 files changed, 156 insertions(+), 12 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index fe867c5..31c2ddf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.net.URI;
+import java.util.concurrent.TimeUnit;
 import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -43,6 +44,7 @@ import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RpcInvocationHandler;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,6 +70,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   private static final Logger LOG = LoggerFactory.getLogger(
       ObserverReadProxyProvider.class);
 
+  /** Configuration key for {@link #autoMsyncPeriodMs}. */
+  static final String AUTO_MSYNC_PERIOD_KEY_PREFIX =
+      HdfsClientConfigKeys.Failover.PREFIX + "observer.auto-msync-period";
+  /** Auto-msync disabled by default. */
+  static final long AUTO_MSYNC_PERIOD_DEFAULT = -1;
+
   /** Client-side context for syncing with the NameNode server side. */
   private final AlignmentContext alignmentContext;
 
@@ -88,6 +96,24 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
   private boolean observerReadEnabled;
 
   /**
+   * This adjusts how frequently this proxy provider should auto-msync to the
+   * Active NameNode, automatically performing an msync() call to the active
+   * to fetch the current transaction ID before submitting read requests to
+   * observer nodes. See HDFS-14211 for more description of this feature.
+   * If this is below 0, never auto-msync. If this is 0, perform an msync on
+   * every read operation. If this is above 0, perform an msync after this many
+   * ms have elapsed since the last msync.
+   */
+  private final long autoMsyncPeriodMs;
+
+  /**
+   * The time, in millisecond epoch, that the last msync operation was
+   * performed. This includes any implicit msync (any operation which is
+   * serviced by the Active NameNode).
+   */
+  private volatile long lastMsyncTimeMs = -1;
+
+  /**
    * A client using an ObserverReadProxyProvider should first sync with the
    * active NameNode on startup. This ensures that the client reads data which
    * is consistent with the state of the world as of the time of its
@@ -154,6 +180,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
         ObserverReadInvocationHandler.class.getClassLoader(),
         new Class<?>[] {xface}, new ObserverReadInvocationHandler());
     combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
+
+    autoMsyncPeriodMs = conf.getTimeDuration(
+        // The host of the URI is the nameservice ID
+        AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(),
+        AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+
     // TODO : make this configurable or remove this variable
     this.observerReadEnabled = true;
   }
@@ -247,6 +279,35 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
     }
     failoverProxy.getProxy().proxy.msync();
     msynced = true;
+    lastMsyncTimeMs = Time.monotonicNow();
+  }
+
+  /**
+   * This will call {@link ClientProtocol#msync()} on the active NameNode
+   * (via the {@link #failoverProxy}) to update the state of this client, only
+   * if at least {@link #autoMsyncPeriodMs} ms has elapsed since the last time
+   * an msync was performed.
+   *
+   * @see #autoMsyncPeriodMs
+   */
+  private void autoMsyncIfNecessary() throws IOException {
+    if (autoMsyncPeriodMs == 0) {
+      // Always msync
+      failoverProxy.getProxy().proxy.msync();
+    } else if (autoMsyncPeriodMs > 0) {
+      if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+        synchronized (this) {
+          // Use a synchronized block so that only one thread will msync
+          // if many operations are submitted around the same time.
+          // Re-check the entry criterion since the status may have changed
+          // while waiting for the lock.
+          if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+            failoverProxy.getProxy().proxy.msync();
+            lastMsyncTimeMs = Time.monotonicNow();
+          }
+        }
+      }
+    }
   }
 
   /**
@@ -273,6 +334,8 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
           // An msync() must first be performed to ensure that this client is
           // up-to-date with the active's state. This will only be done once.
           initializeMsync();
+        } else {
+          autoMsyncIfNecessary();
         }
 
         int failedObserverCount = 0;
@@ -349,6 +412,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
       // If this was reached, the request reached the active, so the
       // state is up-to-date with active and no further msync is needed.
       msynced = true;
+      lastMsyncTimeMs = Time.monotonicNow();
       lastProxy = activeProxy;
       return retVal;
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md
index d93256c..b212f00 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md
@@ -61,9 +61,12 @@ ID, which is implemented using transaction ID within NameNode, is
 introduced in RPC headers. When a client performs write through Active
 NameNode, it updates its state ID using the latest transaction ID from
 the NameNode. When performing a subsequent read, the client passes this
-state ID to Observe NameNode, which will then check against its own
+state ID to Observer NameNode, which will then check against its own
 transaction ID, and will ensure its own transaction ID has caught up
-with the request's state ID, before serving the read request.
+with the request's state ID, before serving the read request. This ensures
+"read your own writes" semantics from a single client. Maintaining
+consistency between multiple clients in the face of out-of-band communication
+is discussed in the "Maintaining Client Consistency" section below.
 
 Edit log tailing is critical for Observer NameNode as it directly affects
 the latency between when a transaction is applied in Active NameNode and
@@ -83,6 +86,32 @@ available in the cluster, and only fall back to Active NameNode if all
 of the former failed. Similarly, ObserverReadProxyProviderWithIPFailover
 is introduced to replace IPFailoverProxyProvider in a IP failover setup.
 
+### Maintaining Client Consistency
+
+As discussed above, a client 'foo' will update its state ID upon every request
+to the Active NameNode, which includes all write operations. Any request
+directed to an Observer NameNode will wait until the Observer has seen
+this transaction ID, ensuring that the client is able to read all of its own
+writes. However, if 'foo' sends an out-of-band (i.e., non-HDFS) message to
+client 'bar' telling it that a write has been performed, a subsequent read by
+'bar' may not see the recent write by 'foo'. To prevent this inconsistent
+behavior, a new `msync()`, or "metadata sync", command has been added. When
+`msync()` is called on a client, it will update its state ID against the
+Active NameNode -- a very lightweight operation -- so that subsequent reads
+are guaranteed to be consistent up to the point of the `msync()`. Thus as long
+as 'bar' calls `msync()` before performing its read, it is guaranteed to see
+the write made by 'foo'.
+
+To make use of `msync()`, an application does not necessarily have to make any
+code changes. Upon startup, a client will automatically call `msync()` before
+performing any reads against an Observer, so that any writes performed prior
+to the initialization of the client will be visible. In addition, there is
+a configurable "auto-msync" mode supported by ObserverReadProxyProvider which
+will automatically perform an `msync()` at some configurable interval, to
+prevent a client from ever seeing data that is more stale than a time bound.
+There is some overhead associated with this, as each refresh requires an RPC
+to the Active NameNode, so it is disabled by default.
+
 Deployment
 -----------
 
@@ -185,3 +214,18 @@ implementation, in the client-side **hdfs-site.xml** configuration file:
 Clients who do not wish to use Observer NameNode can still use the
 existing ConfiguredFailoverProxyProvider and should not see any behavior
 change.
+
+Clients who wish to make use of the "auto-msync" functionality should adjust
+the configuration below. This will specify some time period after which,
+if the client's state ID has not been updated from the Active NameNode, an
+`msync()` will automatically be performed. If this is specified as 0, an
+`msync()` will be performed before _every_ read operation. If this is a
+positive time duration, an `msync()` will be performed every time a read
+operation is requested and the Active has not been contacted for longer than
+that period. If this is negative (the default), no automatic `msync()` will
+be performed.
+
+    <property>
+        <name>dfs.client.failover.observer.auto-msync-period.<nameservice></name>
+        <value>500ms</value>
+    </property>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
index 2bed37c..1ec47ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
@@ -34,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.ipc.RpcScheduler;
 import org.apache.hadoop.ipc.Schedulable;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -57,7 +60,7 @@ public class TestConsistentReadsObserver {
   private static Configuration conf;
   private static MiniQJMHACluster qjmhaCluster;
   private static MiniDFSCluster dfsCluster;
-  private static DistributedFileSystem dfs;
+  private DistributedFileSystem dfs;
 
   private final Path testPath= new Path("/TestConsistentReadsObserver");
 
@@ -74,7 +77,7 @@ public class TestConsistentReadsObserver {
 
   @Before
   public void setUp() throws Exception {
-    setObserverRead(true);
+    dfs = setObserverRead(true);
   }
 
   @After
@@ -106,8 +109,7 @@ public class TestConsistentReadsObserver {
     configuration.setBoolean(prefix
         + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
 
-    dfsCluster.restartNameNode(observerIdx);
-    dfsCluster.transitionToObserver(observerIdx);
+    NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
 
     dfs.create(testPath, (short)1).close();
     assertSentTo(0);
@@ -151,18 +153,26 @@ public class TestConsistentReadsObserver {
     assertEquals(1, readStatus.get());
   }
 
-  @Test
-  public void testMsync() throws Exception {
+  private void testMsync(boolean autoMsync, long autoMsyncPeriodMs)
+      throws Exception {
     // 0 == not completed, 1 == succeeded, -1 == failed
     AtomicInteger readStatus = new AtomicInteger(0);
     Configuration conf2 = new Configuration(conf);
 
     // Disable FS cache so two different DFS clients will be used.
     conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
+    if (autoMsync) {
+      conf2.setTimeDuration(
+          ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX
+              + "." + dfs.getUri().getHost(),
+          autoMsyncPeriodMs, TimeUnit.MILLISECONDS);
+    }
     DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem.get(conf2);
 
     // Initialize the proxies for Observer Node.
     dfs.getClient().getHAServiceState();
+    // This initialization will perform the msync-on-startup, so that another
+    // form of msync is required later
     dfs2.getClient().getHAServiceState();
 
     // Advance Observer's state ID so it is ahead of client's.
@@ -176,7 +186,12 @@ public class TestConsistentReadsObserver {
       try {
         // After msync, client should have the latest state ID from active.
         // Therefore, the subsequent getFileStatus call should succeed.
-        dfs2.getClient().msync();
+        if (!autoMsync) {
+          // If not testing auto-msync, perform an explicit one here
+          dfs2.getClient().msync();
+        } else if (autoMsyncPeriodMs > 0) {
+          Thread.sleep(autoMsyncPeriodMs);
+        }
         dfs2.getFileStatus(testPath);
         if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
           readStatus.set(1);
@@ -196,10 +211,31 @@ public class TestConsistentReadsObserver {
 
     dfsCluster.rollEditLogAndTail(0);
 
-    GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
+    GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 3000);
     assertEquals(1, readStatus.get());
   }
 
+  @Test
+  public void testExplicitMsync() throws Exception {
+    testMsync(false, -1);
+  }
+
+  @Test
+  public void testAutoMsyncPeriod0() throws Exception {
+    testMsync(true, 0);
+  }
+
+  @Test
+  public void testAutoMsyncPeriod5() throws Exception {
+    testMsync(true, 5);
+  }
+
+  @Test(expected = TimeoutException.class)
+  public void testAutoMsyncLongPeriod() throws Exception {
+    // This should fail since the auto-msync is never activated
+    testMsync(true, Long.MAX_VALUE);
+  }
+
   // A new client should first contact the active, before using an observer,
   // to ensure that it is up-to-date with the current state
   @Test
@@ -313,8 +349,8 @@ public class TestConsistentReadsObserver {
         HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
   }
 
-  private static void setObserverRead(boolean flag) throws Exception {
-    dfs = HATestUtil.configureObserverReadFs(
+  private DistributedFileSystem setObserverRead(boolean flag) throws Exception {
+    return HATestUtil.configureObserverReadFs(
         dfsCluster, conf, ObserverReadProxyProvider.class, flag);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org