You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/07/25 17:17:12 UTC
[hadoop] 03/50: HDFS-13331. [SBN read] Add lastSeenStateId to
RpcRequestHeader. Contributed by Plamen Jeliazkov.
This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit c2585f7e281b065ce7ce8f81b8bfb6f9c7fd8727
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Wed Apr 4 15:42:39 2018 -0700
HDFS-13331. [SBN read] Add lastSeenStateId to RpcRequestHeader. Contributed by Plamen Jeliazkov.
---
.../org/apache/hadoop/ipc/AlignmentContext.java | 14 ++++
.../main/java/org/apache/hadoop/ipc/Client.java | 2 +-
.../main/java/org/apache/hadoop/ipc/Server.java | 5 ++
.../java/org/apache/hadoop/util/ProtoUtil.java | 13 ++++
.../hadoop-common/src/main/proto/RpcHeader.proto | 1 +
.../org/apache/hadoop/hdfs/ClientGCIContext.java | 30 +++++---
.../java/org/apache/hadoop/hdfs/DFSClient.java | 10 ++-
.../hdfs/server/namenode/GlobalStateIdContext.java | 26 ++++++-
.../hadoop/hdfs/TestStateAlignmentContext.java | 89 +++++++++++++++++++++-
9 files changed, 173 insertions(+), 17 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
index f952325..66d6edc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
/**
@@ -48,4 +49,17 @@ public interface AlignmentContext {
*/
void receiveResponseState(RpcResponseHeaderProto header);
+ /**
+ * This is the intended client method call to pull last seen state info
+ * into RPC request processing.
+ * @param header The RPC request header builder.
+ */
+ void updateRequestState(RpcRequestHeaderProto.Builder header);
+
+ /**
+ * This is the intended server method call to implement to receive
+ * client state info during RPC response header processing.
+ * @param header The RPC request header.
+ */
+ void receiveRequestState(RpcRequestHeaderProto header);
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index bb09799..b9cac6f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -1112,7 +1112,7 @@ public class Client implements AutoCloseable {
// Items '1' and '2' are prepared here.
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
- clientId);
+ clientId, alignmentContext);
final ResponseBuffer buf = new ResponseBuffer();
header.writeDelimitedTo(buf);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index ada458e..f32a64b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -2481,6 +2481,11 @@ public abstract class Server {
}
}
+ if (alignmentContext != null) {
+ // Check incoming RPC request's state.
+ alignmentContext.receiveRequestState(header);
+ }
+
CallerContext callerContext = null;
if (header.hasCallerContext()) {
callerContext =
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
index 1a5acba..9a0b05c 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.util;
import java.io.DataInput;
import java.io.IOException;
+import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.CallerContext;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
@@ -165,6 +166,13 @@ public abstract class ProtoUtil {
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
RpcRequestHeaderProto.OperationProto operation, int callId,
int retryCount, byte[] uuid) {
+ return makeRpcRequestHeader(rpcKind, operation, callId, retryCount, uuid,
+ null);
+ }
+
+ public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
+ RpcRequestHeaderProto.OperationProto operation, int callId,
+ int retryCount, byte[] uuid, AlignmentContext alignmentContext) {
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
.setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid));
@@ -190,6 +198,11 @@ public abstract class ProtoUtil {
result.setCallerContext(contextBuilder);
}
+ // Add alignment context if it is not null
+ if (alignmentContext != null) {
+ alignmentContext.updateRequestState(result);
+ }
+
return result.build();
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
index bfe1301..e8d8cbb 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto
@@ -90,6 +90,7 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
optional sint32 retryCount = 5 [default = -1];
optional RPCTraceInfoProto traceInfo = 6; // tracing info
optional RPCCallerContextProto callerContext = 7; // call context
+ optional int64 stateId = 8; // The last seen Global State ID
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
index 3d722f8..0d0bd25 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGCIContext.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
import java.util.concurrent.atomic.LongAccumulator;
@@ -33,16 +34,11 @@ import java.util.concurrent.atomic.LongAccumulator;
@InterfaceStability.Stable
class ClientGCIContext implements AlignmentContext {
- private final DFSClient dfsClient;
private final LongAccumulator lastSeenStateId =
new LongAccumulator(Math::max, Long.MIN_VALUE);
- /**
- * Client side constructor.
- * @param dfsClient client side state receiver
- */
- ClientGCIContext(DFSClient dfsClient) {
- this.dfsClient = dfsClient;
+ long getLastSeenStateId() {
+ return lastSeenStateId.get();
}
/**
@@ -55,11 +51,27 @@ class ClientGCIContext implements AlignmentContext {
}
/**
- * Client side implementation for receiving state alignment info.
+ * Client side implementation for receiving state alignment info in responses.
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
lastSeenStateId.accumulate(header.getStateId());
- dfsClient.lastSeenStateId = lastSeenStateId.get();
+ }
+
+ /**
+ * Client side implementation for providing state alignment info in requests.
+ */
+ @Override
+ public void updateRequestState(RpcRequestHeaderProto.Builder header) {
+ header.setStateId(lastSeenStateId.longValue());
+ }
+
+ /**
+ * Client side implementation only provides state alignment info in requests.
+ * Client does not receive RPC requests therefore this does nothing.
+ */
+ @Override
+ public void receiveRequestState(RpcRequestHeaderProto header) {
+ // Do nothing.
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 08f0238..bd03e42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -214,7 +214,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final UserGroupInformation ugi;
volatile boolean clientRunning = true;
volatile long lastLeaseRenewal;
- volatile long lastSeenStateId;
private volatile FsServerDefaults serverDefaults;
private volatile long serverDefaultsLastUpdate;
final String clientName;
@@ -237,6 +236,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
private static volatile ThreadPoolExecutor STRIPED_READ_THREAD_POOL;
private final int smallBufferSize;
private final long serverDefaultsValidityPeriod;
+ private final ClientGCIContext alignmentContext;
public DfsClientConf getConf() {
return dfsClientConf;
@@ -392,7 +392,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);
- Client.setAlignmentContext(new ClientGCIContext(this));
+ this.alignmentContext = new ClientGCIContext();
+ Client.setAlignmentContext(alignmentContext);
}
/**
@@ -541,6 +542,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return clientRunning;
}
+ @VisibleForTesting
+ ClientGCIContext getAlignmentContext() {
+ return alignmentContext;
+ }
+
long getLastLeaseRenewal() {
return lastLeaseRenewal;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
index 2d7d94e..f0ebf98 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
/**
@@ -41,7 +42,7 @@ class GlobalStateIdContext implements AlignmentContext {
}
/**
- * Server side implementation for providing state alignment info.
+ * Server side implementation for providing state alignment info in responses.
*/
@Override
public void updateResponseState(RpcResponseHeaderProto.Builder header) {
@@ -56,4 +57,27 @@ class GlobalStateIdContext implements AlignmentContext {
public void receiveResponseState(RpcResponseHeaderProto header) {
// Do nothing.
}
+
+ /**
+ * Server side implementation only receives state alignment info.
+ * It does not build RPC requests therefore this does nothing.
+ */
+ @Override
+ public void updateRequestState(RpcRequestHeaderProto.Builder header) {
+ // Do nothing.
+ }
+
+ /**
+ * Server side implementation for processing state alignment info in requests.
+ */
+ @Override
+ public void receiveRequestState(RpcRequestHeaderProto header) {
+ long serverStateId = namesystem.getLastWrittenTransactionId();
+ long clientStateId = header.getStateId();
+ if (clientStateId > serverStateId) {
+ FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
+ ", but server state is: " + serverStateId);
+ }
+ }
+
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
index 590f702..ce4639f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestStateAlignmentContext.java
@@ -18,20 +18,30 @@
package org.apache.hadoop.hdfs;
+import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertThat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
/**
* Class is used to test server sending state alignment information to clients
@@ -91,7 +101,7 @@ public class TestStateAlignmentContext {
public void testStateTransferOnWrite() throws Exception {
long preWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.writeFile(dfs, new Path("/testFile1"), "abc");
- long clientState = dfs.dfs.lastSeenStateId;
+ long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
long postWriteState = cluster.getNamesystem().getLastWrittenTransactionId();
// Write(s) should have increased state. Check for greater than.
assertThat(clientState > preWriteState, is(true));
@@ -109,7 +119,8 @@ public class TestStateAlignmentContext {
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
DFSTestUtil.readFile(dfs, new Path("/testFile2"));
// Read should catch client up to last written state.
- assertThat(dfs.dfs.lastSeenStateId, is(lastWrittenId));
+ long clientState = dfs.dfs.getAlignmentContext().getLastSeenStateId();
+ assertThat(clientState, is(lastWrittenId));
}
/**
@@ -122,10 +133,80 @@ public class TestStateAlignmentContext {
long lastWrittenId = cluster.getNamesystem().getLastWrittenTransactionId();
try (DistributedFileSystem clearDfs =
(DistributedFileSystem) FileSystem.get(CONF)) {
- assertThat(clearDfs.dfs.lastSeenStateId, is(0L));
+ ClientGCIContext clientState = clearDfs.dfs.getAlignmentContext();
+ assertThat(clientState.getLastSeenStateId(), is(Long.MIN_VALUE));
DFSTestUtil.readFile(clearDfs, new Path("/testFile3"));
- assertThat(clearDfs.dfs.lastSeenStateId, is(lastWrittenId));
+ assertThat(clientState.getLastSeenStateId(), is(lastWrittenId));
}
}
+ /**
+ * This test mocks an AlignmentContext and ensures that DFSClient
+ * writes its lastSeenStateId into RPC requests.
+ */
+ @Test
+ public void testClientSendsState() throws Exception {
+ AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
+ AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
+ Client.setAlignmentContext(spiedAlignContext);
+
+ // Collect RpcRequestHeaders for verification later.
+ final List<RpcHeaderProtos.RpcRequestHeaderProto.Builder> collectedHeaders =
+ new ArrayList<>();
+ Mockito.doAnswer(a -> {
+ Object[] arguments = a.getArguments();
+ RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+ (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+ collectedHeaders.add(header);
+ return a.callRealMethod();
+ }).when(spiedAlignContext).updateRequestState(Mockito.any());
+
+ DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
+
+ // Ensure first header and last header have different state.
+ assertThat(collectedHeaders.size() > 1, is(true));
+ assertThat(collectedHeaders.get(0).getStateId(),
+ is(not(collectedHeaders.get(collectedHeaders.size() - 1))));
+
+ // Ensure collected RpcRequestHeaders are in increasing order.
+ long lastHeader = collectedHeaders.get(0).getStateId();
+ for(RpcHeaderProtos.RpcRequestHeaderProto.Builder header :
+ collectedHeaders.subList(1, collectedHeaders.size())) {
+ long currentHeader = header.getStateId();
+ assertThat(currentHeader >= lastHeader, is(true));
+ lastHeader = header.getStateId();
+ }
+ }
+
+ /**
+ * This test mocks an AlignmentContext to send stateIds greater than
+ * server's stateId in RPC requests.
+ */
+ @Test
+ public void testClientSendsGreaterState() throws Exception {
+ AlignmentContext alignmentContext = dfs.dfs.getAlignmentContext();
+ AlignmentContext spiedAlignContext = Mockito.spy(alignmentContext);
+ Client.setAlignmentContext(spiedAlignContext);
+
+ // Make every client call have a stateId > server's stateId.
+ Mockito.doAnswer(a -> {
+ Object[] arguments = a.getArguments();
+ RpcHeaderProtos.RpcRequestHeaderProto.Builder header =
+ (RpcHeaderProtos.RpcRequestHeaderProto.Builder) arguments[0];
+ try {
+ return a.callRealMethod();
+ } finally {
+ header.setStateId(Long.MAX_VALUE);
+ }
+ }).when(spiedAlignContext).updateRequestState(Mockito.any());
+
+ GenericTestUtils.LogCapturer logCapturer =
+ GenericTestUtils.LogCapturer.captureLogs(FSNamesystem.LOG);
+ DFSTestUtil.writeFile(dfs, new Path("/testFile4"), "shv");
+ logCapturer.stopCapturing();
+
+ String output = logCapturer.getOutput();
+ assertThat(output, containsString("A client sent stateId: "));
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org