You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2018/08/13 17:30:22 UTC

hadoop git commit: HDFS-13767. Add msync server implementation. Contributed by Chen Liang.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-12943 71a358131 -> 5eeba62ed


HDFS-13767. Add msync server implementation. Contributed by Chen Liang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5eeba62e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5eeba62e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5eeba62e

Branch: refs/heads/HDFS-12943
Commit: 5eeba62ed35283c8bf7a418ddbb7e593b5762e39
Parents: 71a3581
Author: Chen Liang <cl...@apache.org>
Authored: Mon Aug 13 10:30:06 2018 -0700
Committer: Chen Liang <cl...@apache.org>
Committed: Mon Aug 13 10:30:06 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/ipc/AlignmentContext.java |  9 ++-
 .../main/java/org/apache/hadoop/ipc/Server.java | 37 ++++++++++--
 .../apache/hadoop/hdfs/ClientGSIContext.java    |  6 +-
 .../ClientNamenodeProtocolTranslatorPB.java     |  1 -
 .../server/namenode/GlobalStateIdContext.java   | 20 +++++--
 .../hdfs/TestStateAlignmentContextWithHA.java   | 36 -----------
 .../server/namenode/ha/TestObserverNode.java    | 63 +++++++++++++++++---
 7 files changed, 114 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeba62e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
index 66d6edc..0e8b960 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
@@ -60,6 +60,13 @@ public interface AlignmentContext {
    * This is the intended server method call to implement to receive
    * client state info during RPC response header processing.
    * @param header The RPC request header.
+   * @return state id of in the request header.
    */
-  void receiveRequestState(RpcRequestHeaderProto header);
+  long receiveRequestState(RpcRequestHeaderProto header);
+
+  /**
+   * Returns the last seen state id of the alignment context instance.
+   * @return the value of the last seen state id.
+   */
+  long getLastSeenStateId();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeba62e/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 7df11cc..afc05f5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -704,6 +704,7 @@ public abstract class Server {
     private boolean deferredResponse = false;
     private int priorityLevel;
     // the priority level assigned by scheduler, 0 by default
+    private long clientStateId;
 
     Call() {
       this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
@@ -734,6 +735,7 @@ public abstract class Server {
       this.clientId = clientId;
       this.traceScope = traceScope;
       this.callerContext = callerContext;
+      this.clientStateId = Long.MIN_VALUE;
     }
 
     @Override
@@ -811,6 +813,14 @@ public abstract class Server {
       this.priorityLevel = priorityLevel;
     }
 
+    public long getClientStateId() {
+      return this.clientStateId;
+    }
+
+    public void setClientStateId(long stateId) {
+      this.clientStateId = stateId;
+    }
+
     @InterfaceStability.Unstable
     public void deferResponse() {
       this.deferredResponse = true;
@@ -2493,11 +2503,6 @@ public abstract class Server {
         }
       }
 
-      if (alignmentContext != null) {
-        // Check incoming RPC request's state.
-        alignmentContext.receiveRequestState(header);
-      }
-
       CallerContext callerContext = null;
       if (header.hasCallerContext()) {
         callerContext =
@@ -2514,6 +2519,10 @@ public abstract class Server {
 
       // Save the priority level assignment by the scheduler
       call.setPriorityLevel(callQueue.getPriorityLevel(call));
+      if(alignmentContext != null) {
+        long stateId = alignmentContext.receiveRequestState(header);
+        call.setClientStateId(stateId);
+      }
 
       try {
         internalQueueCall(call);
@@ -2695,6 +2704,24 @@ public abstract class Server {
         TraceScope traceScope = null;
         try {
           final Call call = callQueue.take(); // pop the queue; maybe blocked here
+          if (alignmentContext != null && call.getClientStateId() >
+              alignmentContext.getLastSeenStateId()) {
+            /*
+             * The call processing should be postponed until the client call's
+             * state id is aligned (>=) with the server state id.
+
+             * NOTE:
+             * Inserting the call back to the queue can change the order of call
+             * execution comparing to their original placement into the queue.
+             * This is not a problem, because Hadoop RPC does not have any
+             * constraints on ordering the incoming rpc requests.
+             * In case of Observer, it handles only reads, which are
+             * commutative.
+             */
+            //Re-queue the call and continue
+            internalQueueCall(call);
+            continue;
+          }
           if (LOG.isDebugEnabled()) {
             LOG.debug(Thread.currentThread().getName() + ": " + call + " for RpcKind " + call.rpcKind);
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeba62e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
index 241ec05..10fa0e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
@@ -39,7 +39,8 @@ public class ClientGSIContext implements AlignmentContext {
   private final LongAccumulator lastSeenStateId =
       new LongAccumulator(Math::max, Long.MIN_VALUE);
 
-  long getLastSeenStateId() {
+  @Override
+  public long getLastSeenStateId() {
     return lastSeenStateId.get();
   }
 
@@ -73,7 +74,8 @@ public class ClientGSIContext implements AlignmentContext {
    * Client does not receive RPC requests therefore this does nothing.
    */
   @Override
-  public void receiveRequestState(RpcRequestHeaderProto header) {
+  public long receiveRequestState(RpcRequestHeaderProto header) {
     // Do nothing.
+    return 0;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeba62e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
index 442a59f..088a9aa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java
@@ -159,7 +159,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Mkdirs
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncRequestProto;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeba62e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
index f0ebf98..0016692 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@@ -46,7 +47,11 @@ class GlobalStateIdContext implements AlignmentContext {
    */
   @Override
   public void updateResponseState(RpcResponseHeaderProto.Builder header) {
-    header.setStateId(namesystem.getLastWrittenTransactionId());
+    // Using getCorrectLastAppliedOrWrittenTxId will acquire the lock on
+    // FSEditLog. This is needed so that ANN will return the correct state id
+    // it currently has. But this may not be necessary for Observer, may want
+    // revisit for optimization. Same goes to receiveRequestState.
+    header.setStateId(getLastSeenStateId());
   }
 
   /**
@@ -71,13 +76,20 @@ class GlobalStateIdContext implements AlignmentContext {
    * Server side implementation for processing state alignment info in requests.
    */
   @Override
-  public void receiveRequestState(RpcRequestHeaderProto header) {
-    long serverStateId = namesystem.getLastWrittenTransactionId();
+  public long receiveRequestState(RpcRequestHeaderProto header) {
+    long serverStateId =
+        namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
     long clientStateId = header.getStateId();
-    if (clientStateId > serverStateId) {
+    if (clientStateId > serverStateId &&
+        HAServiceProtocol.HAServiceState.ACTIVE.equals(namesystem.getState())) {
       FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
           ", but server state is: " + serverStateId);
     }
+    return clientStateId;
   }
 
+  @Override
+  public long getLastSeenStateId() {
+    return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeba62e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
index 4fcfd8c..ae82881 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContextWithHA.java
@@ -281,42 +281,6 @@ public class TestStateAlignmentContextWithHA {
   }
 
   /**
-   * This test mocks an AlignmentContext to send stateIds greater than
-   * server's stateId in RPC requests.
-   */
-  @Test
-  public void testClientSendsGreaterState() throws Exception {
-    ClientGSIContext alignmentContext = new ClientGSIContext();
-    ClientGSIContext spiedAlignContext = Mockito.spy(alignmentContext);
-    spy = spiedAlignContext;
-
-    try (DistributedFileSystem clearDfs =
-             (DistributedFileSystem) FileSystem.get(CONF)) {
-
-      // Make every client call have a stateId > server's stateId.
-      Mockito.doAnswer(a -> {
-        Object[] arguments = a.getArguments();
-        RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
-            (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
-        try {
-          return a.callRealMethod();
-        } finally {
-          header.setStateId(Long.MAX_VALUE);
-        }
-      }).when(spiedAlignContext).updateRequestState(Mockito.any());
-
-      GenericTestUtils.LogCapturer logCapturer =
-          GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
-
-      DFSTestUtil.writeFile(clearDfs, new Path("/testFile4"), "shv");
-      logCapturer.stopCapturing();
-
-      String output = logCapturer.getOutput();
-      assertThat(output, containsString("A client sent stateId: "));
-    }
-  }
-
-  /**
    * This test checks if after a client writes we can see the state id in
    * updated via the response.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5eeba62e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
index 98ffefd..de34454 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java
@@ -32,15 +32,21 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Proxy;
 import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+
 // Main unit tests for ObserverNode
 public class TestObserverNode {
   private Configuration conf;
@@ -58,7 +64,9 @@ public class TestObserverNode {
   @Before
   public void setUp() throws Exception {
     conf = new Configuration();
-    setUpCluster(1);
+    conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, true);
+    conf.setTimeDuration(
+        DFS_HA_TAILEDITS_PERIOD_KEY, 100, TimeUnit.MILLISECONDS);
 
     testPath = new Path("/test");
     testPath2 = new Path("/test2");
@@ -74,18 +82,12 @@ public class TestObserverNode {
 
   @Test
   public void testSimpleRead() throws Exception {
+    setUpCluster(1);
     setObserverRead(true);
 
     dfs.mkdir(testPath, FsPermission.getDefault());
     assertSentTo(0);
 
-    try {
-      dfs.getFileStatus(testPath);
-      fail("Should throw FileNotFoundException");
-    } catch (FileNotFoundException e) {
-      // Pass
-    }
-
     rollEditLogAndTail(0);
     dfs.getFileStatus(testPath);
     assertSentTo(2);
@@ -96,6 +98,7 @@ public class TestObserverNode {
 
   @Test
   public void testFailover() throws Exception {
+    setUpCluster(1);
     setObserverRead(false);
 
     dfs.mkdir(testPath, FsPermission.getDefault());
@@ -115,6 +118,7 @@ public class TestObserverNode {
 
   @Test
   public void testDoubleFailover() throws Exception {
+    setUpCluster(1);
     setObserverRead(true);
 
     dfs.mkdir(testPath, FsPermission.getDefault());
@@ -180,6 +184,7 @@ public class TestObserverNode {
 
   @Test
   public void testObserverShutdown() throws Exception {
+    setUpCluster(1);
     setObserverRead(true);
 
     dfs.mkdir(testPath, FsPermission.getDefault());
@@ -201,6 +206,7 @@ public class TestObserverNode {
 
   @Test
   public void testObserverFailOverAndShutdown() throws Exception {
+    setUpCluster(1);
     // Test the case when there is a failover before ONN shutdown
     setObserverRead(true);
 
@@ -273,6 +279,7 @@ public class TestObserverNode {
 
   @Test
   public void testBootstrap() throws Exception {
+    setUpCluster(1);
     for (URI u : dfsCluster.getNameDirs(2)) {
       File dir = new File(u.getPath());
       assertTrue(FileUtil.fullyDelete(dir));
@@ -284,6 +291,44 @@ public class TestObserverNode {
     assertEquals(0, rc);
   }
 
+  @Test
+  public void testMsyncSimple() throws Exception {
+    // disable fast path here because this test's assertions are based on the
+    // timing of explicitly called rollEditLogAndTail. Although this means this
+    // test takes some time to run
+    // TODO: revisit if there is a better way.
+    conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
+    conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 60, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+        DFS_HA_TAILEDITS_PERIOD_KEY, 30, TimeUnit.SECONDS);
+    setUpCluster(1);
+    setObserverRead(true);
+
+    AtomicBoolean readSucceed = new AtomicBoolean(false);
+
+    dfs.mkdir(testPath, FsPermission.getDefault());
+    assertSentTo(0);
+
+    Thread reader = new Thread(() -> {
+      try {
+        // this read will block until roll and tail edits happen.
+        dfs.getFileStatus(testPath);
+        readSucceed.set(true);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    });
+
+    reader.start();
+    // the reader is still blocking, not succeeded yet.
+    assertFalse(readSucceed.get());
+    rollEditLogAndTail(0);
+    // wait a while for all the change to be done
+    Thread.sleep(100);
+    // the reader should have succeed.
+    assertTrue(readSucceed.get());
+  }
+
   private void setUpCluster(int numObservers) throws Exception {
     qjmhaCluster = new MiniQJMHACluster.Builder(conf)
         .setNumNameNodes(2 + numObservers)


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org