You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xk...@apache.org on 2019/03/01 20:59:14 UTC
[hadoop] branch trunk updated: HDFS-14272. [SBN read] Make
ObserverReadProxyProvider initialize its state ID against the active NN on
startup. Contributed by Erik Krogen.
This is an automated email from the ASF dual-hosted git repository.
xkrogen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5a15f7b HDFS-14272. [SBN read] Make ObserverReadProxyProvider initialize its state ID against the active NN on startup. Contributed by Erik Krogen.
5a15f7b is described below
commit 5a15f7b3f47e61905bf41b40cf5243ab96bd3448
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Fri Mar 1 12:58:55 2019 -0800
HDFS-14272. [SBN read] Make ObserverReadProxyProvider initialize its state ID against the active NN on startup. Contributed by Erik Krogen.
---
.../namenode/ha/ObserverReadProxyProvider.java | 34 ++++++++++
.../hdfs/server/namenode/NameNodeRpcServer.java | 3 +-
.../namenode/ha/TestConsistentReadsObserver.java | 73 +++++++++++++++++++++-
3 files changed, 107 insertions(+), 3 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
index 3cf14cb..a17c640 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ObserverReadProxyProvider.java
@@ -88,6 +88,15 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
private boolean observerReadEnabled;
/**
+ * A client using an ObserverReadProxyProvider should first sync with the
+ * active NameNode on startup. This ensures that the client reads data which
+ * is consistent with the state of the world as of the time of its
+ * instantiation. This variable will be true after this initial sync has
+ * been performed.
+ */
+ private volatile boolean msynced = false;
+
+ /**
* The index into the nameNodeProxies list currently being used. Should only
* be accessed in synchronized methods.
*/
@@ -225,6 +234,22 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
}
/**
+ * This will call {@link ClientProtocol#msync()} on the active NameNode
+ * (via the {@link #failoverProxy}) to initialize the state of this client.
+ * Calling it multiple times is a no-op; only the first will perform an
+ * msync.
+ *
+ * @see #msynced
+ */
+ private synchronized void initializeMsync() throws IOException {
+ if (msynced) {
+ return; // No need for an msync
+ }
+ failoverProxy.getProxy().proxy.msync();
+ msynced = true;
+ }
+
+ /**
* An InvocationHandler to handle incoming requests. This class's invoke
* method contains the primary logic for redirecting to observers.
*
@@ -244,6 +269,12 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
Object retVal;
if (observerReadEnabled && isRead(method)) {
+ if (!msynced) {
+ // An msync() must first be performed to ensure that this client is
+ // up-to-date with the active's state. This will only be done once.
+ initializeMsync();
+ }
+
int failedObserverCount = 0;
int activeCount = 0;
int standbyCount = 0;
@@ -315,6 +346,9 @@ public class ObserverReadProxyProvider<T extends ClientProtocol>
// This exception will be handled by higher layers
throw e.getCause();
}
+ // If this was reached, the request reached the active, so the
+ // state is up-to-date with active and no further msync is needed.
+ msynced = true;
lastProxy = activeProxy;
return retVal;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index f50648d..525d9c8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1409,7 +1409,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void msync() throws IOException {
- // TODO : need to be filled up if needed. May be a no-op here.
+ // Check for write access to ensure that msync only happens on active
+ namesystem.checkOperation(OperationCategory.WRITE);
}
@Override // ClientProtocol
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
index 2845670..2bed37c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestConsistentReadsObserver.java
@@ -178,8 +178,12 @@ public class TestConsistentReadsObserver {
// Therefore, the subsequent getFileStatus call should succeed.
dfs2.getClient().msync();
dfs2.getFileStatus(testPath);
- readStatus.set(1);
- } catch (IOException e) {
+ if (HATestUtil.isSentToAnyOfNameNodes(dfs2, dfsCluster, 2)) {
+ readStatus.set(1);
+ } else {
+ readStatus.set(-1);
+ }
+ } catch (Exception e) {
e.printStackTrace();
readStatus.set(-1);
}
@@ -196,6 +200,71 @@ public class TestConsistentReadsObserver {
assertEquals(1, readStatus.get());
}
+ // A new client should first contact the active, before using an observer,
+ // to ensure that it is up-to-date with the current state
+ @Test
+ public void testCallFromNewClient() throws Exception {
+ // Set the order of nodes: Observer, Standby, Active
+ // This is to ensure that test doesn't pass trivially because the active is
+ // the first node contacted
+ dfsCluster.transitionToStandby(0);
+ dfsCluster.transitionToObserver(0);
+ dfsCluster.transitionToStandby(2);
+ dfsCluster.transitionToActive(2);
+ try {
+ // 0 == not completed, 1 == succeeded, -1 == failed
+ AtomicInteger readStatus = new AtomicInteger(0);
+
+ // Initialize the proxies for Observer Node.
+ dfs.getClient().getHAServiceState();
+
+ // Advance Observer's state ID so it is ahead of client's.
+ dfs.mkdir(new Path("/test"), FsPermission.getDefault());
+ dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
+ dfsCluster.getNameNode(0)
+ .getNamesystem().getEditLogTailer().doTailEdits();
+
+ dfs.mkdir(testPath, FsPermission.getDefault());
+ assertSentTo(2);
+
+ Configuration conf2 = new Configuration(conf);
+
+ // Disable FS cache so two different DFS clients will be used.
+ conf2.setBoolean("fs.hdfs.impl.disable.cache", true);
+ DistributedFileSystem dfs2 =
+ (DistributedFileSystem) FileSystem.get(conf2);
+ dfs2.getClient().getHAServiceState();
+
+ Thread reader = new Thread(() -> {
+ try {
+ dfs2.getFileStatus(testPath);
+ readStatus.set(1);
+ } catch (Exception e) {
+ e.printStackTrace();
+ readStatus.set(-1);
+ }
+ });
+
+ reader.start();
+
+ Thread.sleep(100);
+ assertEquals(0, readStatus.get());
+
+ dfsCluster.getNameNode(2).getRpcServer().rollEditLog();
+ dfsCluster.getNameNode(0)
+ .getNamesystem().getEditLogTailer().doTailEdits();
+
+ GenericTestUtils.waitFor(() -> readStatus.get() != 0, 100, 10000);
+ assertEquals(1, readStatus.get());
+ } finally {
+ // Put the cluster back the way it was when the test started
+ dfsCluster.transitionToStandby(2);
+ dfsCluster.transitionToObserver(2);
+ dfsCluster.transitionToStandby(0);
+ dfsCluster.transitionToActive(0);
+ }
+ }
+
@Test
public void testUncoordinatedCall() throws Exception {
// make a write call so that client will be ahead of
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org