You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/07/25 17:17:12 UTC

[hadoop] 03/50: HDFS-13331. [SBN read] Add lastSeenStateId to RpcRequestHeader. Contributed by Plamen Jeliazkov.

This is an automated email from the ASF dual-hosted git repository.

cliang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit c2585f7e281b065ce7ce8f81b8bfb6f9c7fd8727
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Wed Apr 4 15:42:39 2018 -0700

    HDFS-13331. [SBN read] Add lastSeenStateId to RpcRequestHeader. Contributed by Plamen Jeliazkov.
---
 .../org/apache/hadoop/ipc/AlignmentContext.java    | 14 ++++
 .../main/java/org/apache/hadoop/ipc/Client.java    |  2 +-
 .../main/java/org/apache/hadoop/ipc/Server.java    |  5 ++
 .../java/org/apache/hadoop/util/ProtoUtil.java     | 13 ++++
 .../hadoop-common/src/main/proto/RpcHeader.proto   |  1 +
 .../org/apache/hadoop/hdfs/ClientGCIContext.java   | 30 +++++---
 .../java/org/apache/hadoop/hdfs/DFSClient.java     | 10 ++-
 .../hdfs/server/namenode/GlobalStateIdContext.java | 26 ++++++-
 .../hadoop/hdfs/TestStateAlignmentContext.java     | 89 +++++++++++++++++++++-
 9 files changed, 173 insertions(+), 17 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
index f952325..66d6edc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 /**
@@ -48,4 +49,17 @@ public interface AlignmentContext {
    */
   void receiveResponseState(RpcResponseHeaderProto header);
 
+  /**
+   * This is the intended client method call to pull last seen state info
+   * into RPC request processing.
+   * @param header The RPC request header builder.
+   */
+  void updateRequestState(RpcRequestHeaderProto.Builder header);
+
+  /**
+   * This is the intended server method call to implement to receive
+   * client state info during RPC response header processing.
+   * @param header The RPC request header.
+   */
+  void receiveRequestState(RpcRequestHeaderProto header);
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index bb09799..b9cac6f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -1112,7 +1112,7 @@ public class Client implements AutoCloseable {
       // Items '1' and '2' are prepared here. 
       RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
           call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
-          clientId);
+          clientId, alignmentContext);
 
       final ResponseBuffer buf = new ResponseBuffer();
       header.writeDelimitedTo(buf);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index ada458e..f32a64b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -2481,6 +2481,11 @@ public abstract class Server {
         }
       }
 
+      if (alignmentContext != null) {
+        // Check incoming RPC request's state.
+        alignmentContext.receiveRequestState(header);
+      }
+
       CallerContext callerContext = null;
       if (header.hasCallerContext()) {
         callerContext =
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
index 1a5acba..9a0b05c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.util;
 import java.io.DataInput;
 import java.io.IOException;
 
+import org.apache.hadoop.ipc.AlignmentContext;
 import org.apache.hadoop.ipc.CallerContext;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
@@ -165,6 +166,13 @@ public abstract class ProtoUtil {
   public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
       RpcRequestHeaderProto.OperationProto operation, int callId,
       int retryCount, byte[] uuid) {
+    return makeRpcRequestHeader(rpcKind, operation, callId, retryCount, uuid,
+        null);
+  }
+
+  public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
+      RpcRequestHeaderProto.OperationProto operation, int callId,
+      int retryCount, byte[] uuid, AlignmentContext alignmentContext) {
     RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
     result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
         .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid));
@@ -190,6 +198,11 @@ public abstract class ProtoUtil {
       result.setCallerContext(contextBuilder);
     }
 
+    // Add alignment context if it is not null
+    if (alignmentContext != null) {
+      alignmentContext.updateRequestState(result);
+    }
+
     return result.build();
   }
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
index bfe1301..e8d8cbb 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
@@ -90,6 +90,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
   optional sint32 retryCount = 5 [default = -1];
   optional RPCTraceInfoProto traceInfo = 6; // tracing info
   optional RPCCallerContextProto callerContext = 7; // call context
+  optional int64 stateId = 8; // The last seen Global State ID
 }
 
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
index 3d722f8..0d0bd25 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 import java.util.concurrent.atomic.LongAccumulator;
@@ -33,16 +34,11 @@ import java.util.concurrent.atomic.LongAccumulator;
 @InterfaceStability.Stable
 class ClientGCIContext implements AlignmentContext {
 
-  private final DFSClient dfsClient;
   private final LongAccumulator lastSeenStateId =
       new LongAccumulator(Math::max, Long.MIN_VALUE);
 
-  /**
-   * Client side constructor.
-   * @param dfsClient client side state receiver
-   */
-  ClientGCIContext(DFSClient dfsClient) {
-    this.dfsClient = dfsClient;
+  long getLastSeenStateId() {
+    return lastSeenStateId.get();
   }
 
   /**
@@ -55,11 +51,27 @@ class ClientGCIContext implements AlignmentContext {
   }
 
   /**
-   * Client side implementation for receiving state alignment info.
+   * Client side implementation for receiving state alignment info in responses.
    */
   @Override
   public void receiveResponseState(RpcResponseHeaderProto header) {
     lastSeenStateId.accumulate(header.getStateId());
-    dfsClient.lastSeenStateId = lastSeenStateId.get();
+  }
+
+  /**
+   * Client side implementation for providing state alignment info in requests.
+   */
+  @Override
+  public void updateRequestState(RpcRequestHeaderProto.Builder header) {
+    header.setStateId(lastSeenStateId.longValue());
+  }
+
+  /**
+   * Client side implementation only provides state alignment info in requests.
+   * Client does not receive RPC requests therefore this does nothing.
+   */
+  @Override
+  public void receiveRequestState(RpcRequestHeaderProto header) {
+    // Do nothing.
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 08f0238..bd03e42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -214,7 +214,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   final UserGroupInformation ugi;
   volatile boolean clientRunning = true;
   volatile long lastLeaseRenewal;
-  volatile long lastSeenStateId;
   private volatile FsServerDefaults serverDefaults;
   private volatile long serverDefaultsLastUpdate;
   final String clientName;
@@ -237,6 +236,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
   private final int smallBufferSize;
   private final long serverDefaultsValidityPeriod;
+  private final ClientGCIContext alignmentContext;
 
   public DfsClientConf getConf() {
     return dfsClientConf;
@@ -392,7 +392,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     this.saslClient = new SaslDataTransferClient(
         conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
         TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
-    Client.setAlignmentContext(new ClientGCIContext(this));
+    this.alignmentContext = new ClientGCIContext();
+    Client.setAlignmentContext(alignmentContext);
   }
 
   /**
@@ -541,6 +542,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     return clientRunning;
   }
 
+  @VisibleForTesting
+  ClientGCIContext getAlignmentContext() {
+    return alignmentContext;
+  }
+
   long getLastLeaseRenewal() {
     return lastLeaseRenewal;
   }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
index 2d7d94e..f0ebf98 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
 
 /**
@@ -41,7 +42,7 @@ class GlobalStateIdContext implements AlignmentContext {
   }
 
   /**
-   * Server side implementation for providing state alignment info.
+   * Server side implementation for providing state alignment info in responses.
    */
   @Override
   public void updateResponseState(RpcResponseHeaderProto.Builder header) {
@@ -56,4 +57,27 @@ class GlobalStateIdContext implements AlignmentContext {
   public void receiveResponseState(RpcResponseHeaderProto header) {
     // Do nothing.
   }
+
+  /**
+   * Server side implementation only receives state alignment info.
+   * It does not build RPC requests therefore this does nothing.
+   */
+  @Override
+  public void updateRequestState(RpcRequestHeaderProto.Builder header) {
+    // Do nothing.
+  }
+
+  /**
+   * Server side implementation for processing state alignment info in requests.
+   */
+  @Override
+  public void receiveRequestState(RpcRequestHeaderProto header) {
+    long serverStateId = namesystem.getLastWrittenTransactionId();
+    long clientStateId = header.getStateId();
+    if (clientStateId > serverStateId) {
+      FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
+          ", but server state is: " + serverStateId);
+    }
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
index 590f702..ce4639f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
@@ -18,20 +18,30 @@
 
 package org.apache.hadoop.hdfs;
 
+import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertThat;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Class is used to test server sending state alignment information to clients
@@ -91,7 +101,7 @@ public class TestStateAlignmentContext {
   public void testStateTransferOnWrite() throws Exception {
     long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
     DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
-    long clientState = dfs.dfs.lastSeenStateId;
+    long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
     long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
     // Write(s) should have increased state. Check for greater than.
     assertThat(clientState > preWriteState, is(true));
@@ -109,7 +119,8 @@ public class TestStateAlignmentContext {
     long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
     DFSTestUtil.readFile(dfs, new Path("/testFile2"));
     // Read should catch client up to last written state.
-    assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId));
+    long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
+    assertThat(clientState, is(lastWrittenId));
   }
 
   /**
@@ -122,10 +133,80 @@ public class TestStateAlignmentContext {
     long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
     try (DistributedFileSystem clearDfs =
              (DistributedFileSystem) FileSystem.get(CONF)) {
-      assertThat(clearDfs.dfs.lastSeenStateId, is(0L));
+      ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext();
+      assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
       DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
-      assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId));
+      assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
     }
   }
 
+  /**
+   * This test mocks an AlignmentContext and ensures that DFSClient
+   * writes its lastSeenStateId into RPC requests.
+   */
+  @Test
+  public void testClientSendsState() throws Exception {
+    AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
+    AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
+    Client.setAlignmentContext(spiedAlignContext);
+
+    // Collect RpcRequestHeaders for verification later.
+    final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> collectedHeaders =
+        new ArrayList<>();
+    Mockito.doAnswer(a -> {
+      Object[] arguments = a.getArguments();
+      RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+          (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+      collectedHeaders.add(header);
+      return a.callRealMethod();
+    }).when(spiedAlignContext).updateRequestState(Mockito.any());
+
+    DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
+
+    // Ensure first header and last header have different state.
+    assertThat(collectedHeaders.size() > 1, is(true));
+    assertThat(collectedHeaders.get(0).getStateId(),
+        is(not(collectedHeaders.get(collectedHeaders.size() - 1))));
+
+    // Ensure collected RpcRequestHeaders are in increasing order.
+    long lastHeader = collectedHeaders.get(0).getStateId();
+    for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
+        collectedHeaders.subList(1, collectedHeaders.size())) {
+      long currentHeader = header.getStateId();
+      assertThat(currentHeader >= lastHeader, is(true));
+      lastHeader = header.getStateId();
+    }
+  }
+
+  /**
+   * This test mocks an AlignmentContext to send stateIds greater than
+   * server's stateId in RPC requests.
+   */
+  @Test
+  public void testClientSendsGreaterState() throws Exception {
+    AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
+    AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
+    Client.setAlignmentContext(spiedAlignContext);
+
+    // Make every client call have a stateId > server's stateId.
+    Mockito.doAnswer(a -> {
+      Object[] arguments = a.getArguments();
+      RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+          (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+      try {
+        return a.callRealMethod();
+      } finally {
+        header.setStateId(Long.MAX_VALUE);
+      }
+    }).when(spiedAlignContext).updateRequestState(Mockito.any());
+
+    GenericTestUtils.LogCapturer logCapturer =
+        GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
+    DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
+    logCapturer.stopCapturing();
+
+    String output = logCapturer.getOutput();
+    assertThat(output, containsString("A client sent stateId: "));
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org