You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:51 UTC
[hadoop] 48/50: HDFS-14211. [SBN Read]. Add a configurable flag to
enable always-msync mode to ObserverReadProxyProvider. Contributed by Erik
Krogen.
This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 3c6b74a6a4eade993cb4e9efd7eb58a9394bec35
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Fri Mar 8 10:35:31 2019 -0800
HDFS-14211. [SBN Read]. Add a configurable flag to enable always-msync mode to ObserverReadProxyProvider. Contributed by Erik Krogen.
---
.../namenode/ha/ObserverReadProxyProvider.java | 64 ++++++++++++++++++++++
.../src/site/markdown/ObserverNameNode.md | 48 +++++++++++++++-
.../namenode/ha/TestConsistentReadsObserver.java | 56 +++++++++++++++----
3 files changed, 156 insertions(+), 12 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index fe867c5..31c2ddf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.URI;
+import java.util.concurrent.TimeUnit;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -43,6 +44,7 @@ import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RpcInvocationHandler;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +70,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
private static final Logger LOG = LoggerFactory.getLogger(
ObserverReadProxyProvider.class);
+ /** Configuration key for {@link #autoMsyncPeriodMs}. */
+ static final String AUTO_MSYNC_PERIOD_KEY_PREFIX =
+ HdfsClientConfigKeys.Failover.PREFIX + "observer.auto-msync-period";
+ /** Auto-msync disabled by default. */
+ static final long AUTO_MSYNC_PERIOD_DEFAULT = -1;
+
/** Client-side context for syncing with the NameNode server side. */
private final AlignmentContext alignmentContext;
@@ -88,6 +96,24 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
private boolean observerReadEnabled;
/**
+ * This adjusts how frequently this proxy provider should auto-msync to the
+ * Active NameNode, automatically performing an msync() call to the active
+ * to fetch the current transaction ID before submitting read requests to
+ * observer nodes. See HDFS-14211 for more description of this feature.
+ * If this is below 0, never auto-msync. If this is 0, perform an msync on
+ * every read operation. If this is above 0, perform an msync after this many
+ * ms have elapsed since the last msync.
+ */
+ private final long autoMsyncPeriodMs;
+
+ /**
+ * The time, in millisecond epoch, that the last msync operation was
+ * performed. This includes any implicit msync (any operation which is
+ * serviced by the Active NameNode).
+ */
+ private volatile long lastMsyncTimeMs = -1;
+
+ /**
* A client using an ObserverReadProxyProvider should first sync with the
* active NameNode on startup. This ensures that the client reads data which
* is consistent with the state of the world as of the time of its
@@ -154,6 +180,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
ObserverReadInvocationHandler.class.getClassLoader(),
new Class<?>[] {xface}, new ObserverReadInvocationHandler());
combinedProxy = new ProxyInfo<>(wrappedProxy, combinedInfo.toString());
+
+ autoMsyncPeriodMs = conf.getTimeDuration(
+ // The host of the URI is the nameservice ID
+ AUTO_MSYNC_PERIOD_KEY_PREFIX + "." + uri.getHost(),
+ AUTO_MSYNC_PERIOD_DEFAULT, TimeUnit.MILLISECONDS);
+
// TODO : make this configurable or remove this variable
this.observerReadEnabled = true;
}
@@ -247,6 +279,35 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
}
failoverProxy.getProxy().proxy.msync();
msynced = true;
+ lastMsyncTimeMs = Time.monotonicNow();
+ }
+
+ /**
+ * This will call {@link ClientProtocol#msync()} on the active NameNode
+ * (via the {@link #failoverProxy}) to update the state of this client, only
+ * if at least {@link #autoMsyncPeriodMs} ms has elapsed since the last time
+ * an msync was performed.
+ *
+ * @see #autoMsyncPeriodMs
+ */
+ private void autoMsyncIfNecessary() throws IOException {
+ if (autoMsyncPeriodMs == 0) {
+ // Always msync
+ failoverProxy.getProxy().proxy.msync();
+ } else if (autoMsyncPeriodMs > 0) {
+ if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+ synchronized (this) {
+ // Use a synchronized block so that only one thread will msync
+ // if many operations are submitted around the same time.
+ // Re-check the entry criterion since the status may have changed
+ // while waiting for the lock.
+ if (Time.monotonicNow() - lastMsyncTimeMs > autoMsyncPeriodMs) {
+ failoverProxy.getProxy().proxy.msync();
+ lastMsyncTimeMs = Time.monotonicNow();
+ }
+ }
+ }
+ }
}
/**
@@ -273,6 +334,8 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
// An msync() must first be performed to ensure that this client is
// up-to-date with the active's state. This will only be done once.
initializeMsync();
+ } else {
+ autoMsyncIfNecessary();
}
int failedObserverCount = 0;
@@ -349,6 +412,7 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
// If this was reached, the request reached the active, so the
// state is up-to-date with active and no further msync is needed.
msynced = true;
+ lastMsyncTimeMs = Time.monotonicNow();
lastProxy = activeProxy;
return retVal;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md
index d93256c..b212f00 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ObserverNameNode.md
@@ -61,9 +61,12 @@ ID, which is implemented using transaction ID within NameNode, is
introduced in RPC headers. When a client performs write through Active
NameNode, it updates its state ID using the latest transaction ID from
the NameNode. When performing a subsequent read, the client passes this
-state ID to Observe NameNode, which will then check against its own
+state ID to Observer NameNode, which will then check against its own
transaction ID, and will ensure its own transaction ID has caught up
-with the request's state ID, before serving the read request.
+with the request's state ID, before serving the read request. This ensures
+"read your own writes" semantics from a single client. Maintaining
+consistency between multiple clients in the face of out-of-band communication
+is discussed in the "Maintaining Client Consistency" section below.
Edit log tailing is critical for Observer NameNode as it directly affects
the latency between when a transaction is applied in Active NameNode and
@@ -83,6 +86,32 @@ available in the cluster, and only fall back to Active NameNode if all
of the former failed. Similarly, ObserverReadProxyProviderWithIPFailover
is introduced to replace IPFailoverProxyProvider in a IP failover setup.
+### Maintaining Client Consistency
+
+As discussed above, a client 'foo' will update its state ID upon every request
+to the Active NameNode, which includes all write operations. Any request
+directed to an Observer NameNode will wait until the Observer has seen
+this transaction ID, ensuring that the client is able to read all of its own
+writes. However, if 'foo' sends an out-of-band (i.e., non-HDFS) message to
+client 'bar' telling it that a write has been performed, a subsequent read by
+'bar' may not see the recent write by 'foo'. To prevent this inconsistent
+behavior, a new `msync()`, or "metadata sync", command has been added. When
+`msync()` is called on a client, it will update its state ID against the
+Active NameNode -- a very lightweight operation -- so that subsequent reads
+are guaranteed to be consistent up to the point of the `msync()`. Thus as long
+as 'bar' calls `msync()` before performing its read, it is guaranteed to see
+the write made by 'foo'.
+
+To make use of `msync()`, an application does not necessarily have to make any
+code changes. Upon startup, a client will automatically call `msync()` before
+performing any reads against an Observer, so that any writes performed prior
+to the initialization of the client will be visible. In addition, there is
+a configurable "auto-msync" mode supported by ObserverReadProxyProvider which
+will automatically perform an `msync()` at some configurable interval, to
+prevent a client from ever seeing data that is more stale than a time bound.
+There is some overhead associated with this, as each refresh requires an RPC
+to the Active NameNode, so it is disabled by default.
+
Deployment
-----------
@@ -185,3 +214,18 @@ implementation, in the client-side **hdfs-site.xml** configuration file:
Clients who do not wish to use Observer NameNode can still use the
existing ConfiguredFailoverProxyProvider and should not see any behavior
change.
+
+Clients who wish to make use of the "auto-msync" functionality should adjust
+the configuration below. This will specify some time period after which,
+if the client's state ID has not been updated from the Active NameNode, an
+`msync()` will automatically be performed. If this is specified as 0, an
+`msync()` will be performed before _every_ read operation. If this is a
+positive time duration, an `msync()` will be performed every time a read
+operation is requested and the Active has not been contacted for longer than
+that period. If this is negative (the default), no automatic `msync()` will
+be performed.
+
+ <property>
+ <name>dfs.client.failover.observer.auto-msync-period.<nameservice></name>
+ <value>500ms</value>
+ </property>
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
index 2bed37c..1ec47ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -34,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.RpcScheduler;
import org.apache.hadoop.ipc.Schedulable;
import org.apache.hadoop.test.GenericTestUtils;
@@ -57,7 +60,7 @@ public class TestConsistentReadsObserver {
private static Configuration conf;
private static MiniQJMHACluster qjmhaCluster;
private static MiniDFSCluster dfsCluster;
- private static DistributedFileSystem dfs;
+ private DistributedFileSystem dfs;
private final Path testPath= new Path("/TestConsistentReadsObserver");
@@ -74,7 +77,7 @@ public class TestConsistentReadsObserver {
@Before
public void setUp() throws Exception {
- setObserverRead(true);
+ dfs = setObserverRead(true);
}
@After
@@ -106,8 +109,7 @@ public class TestConsistentReadsObserver {
configuration.setBoolean(prefix
+ CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
- dfsCluster.restartNameNode(observerIdx);
- dfsCluster.transitionToObserver(observerIdx);
+ NameNodeAdapter.getRpcServer(nn).refreshCallQueue(configuration);
dfs.create(testPath, (short)1).close();
assertSentTo(0);
@@ -151,18 +153,26 @@ public class TestConsistentReadsObserver {
assertEquals(1, readStatus.get());
}
- @Test
- public void testMsync() throws Exception {
+ private void testMsync(boolean autoMsync, long autoMsyncPeriodMs)
+ throws Exception {
// 0 == not completed, 1 == succeeded, -1 == failed
AtomicInteger readStatus = new AtomicInteger(0);
Configuration conf2 = new Configuration(conf);
// Disable FS cache so two different DFS clients will be used.
conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
+ if (autoMsync) {
+ conf2.setTimeDuration(
+ ObserverReadProxyProvider.AUTO_MSYNC_PERIOD_KEY_PREFIX
+ + "." + dfs.getUri().getHost(),
+ autoMsyncPeriodMs, TimeUnit.MILLISECONDS);
+ }
DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem.get(conf2);
// Initialize the proxies for Observer Node.
dfs.getClient().getHAServiceState();
+ // This initialization will perform the msync-on-startup, so that another
+ // form of msync is required later
dfs2.getClient().getHAServiceState();
// Advance Observer's state ID so it is ahead of client's.
@@ -176,7 +186,12 @@ public class TestConsistentReadsObserver {
try {
// After msync, client should have the latest state ID from active.
// Therefore, the subsequent getFileStatus call should succeed.
- dfs2.getClient().msync();
+ if (!autoMsync) {
+ // If not testing auto-msync, perform an explicit one here
+ dfs2.getClient().msync();
+ } else if (autoMsyncPeriodMs > 0) {
+ Thread.sleep(autoMsyncPeriodMs);
+ }
dfs2.getFileStatus(testPath);
if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
readStatus.set(1);
@@ -196,10 +211,31 @@ public class TestConsistentReadsObserver {
dfsCluster.rollEditLogAndTail(0);
- GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
+ GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 3000);
assertEquals(1, readStatus.get());
}
+ @Test
+ public void testExplicitMsync() throws Exception {
+ testMsync(false, -1);
+ }
+
+ @Test
+ public void testAutoMsyncPeriod0() throws Exception {
+ testMsync(true, 0);
+ }
+
+ @Test
+ public void testAutoMsyncPeriod5() throws Exception {
+ testMsync(true, 5);
+ }
+
+ @Test(expected = TimeoutException.class)
+ public void testAutoMsyncLongPeriod() throws Exception {
+ // This should fail since the auto-msync is never activated
+ testMsync(true, Long.MAX_VALUE);
+ }
+
// A new client should first contact the active, before using an observer,
// to ensure that it is up-to-date with the current state
@Test
@@ -313,8 +349,8 @@ public class TestConsistentReadsObserver {
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIdx));
}
- private static void setObserverRead(boolean flag) throws Exception {
- dfs = HATestUtil.configureObserverReadFs(
+ private DistributedFileSystem setObserverRead(boolean flag) throws Exception {
+ return HATestUtil.configureObserverReadFs(
dfsCluster, conf, ObserverReadProxyProvider.class, flag);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org