You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2019/06/28 23:06:40 UTC
[hadoop] 37/50: HDFS-13873. [SBN read] ObserverNode should reject
read requests when it is too far behind. Contributed by Konstantin
Shvachko.
This is an automated email from the ASF dual-hosted git repository.
cliang pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit c1c061d767381023eed829ddbb0af4d32db2493b
Author: Erik Krogen <xk...@apache.org>
AuthorDate: Thu Dec 13 14:31:41 2018 -0800
HDFS-13873. [SBN read] ObserverNode should reject read requests when it is too far behind. Contributed by Konstantin Shvachko.
---
.../org/apache/hadoop/ipc/AlignmentContext.java | 12 ++++-
.../main/java/org/apache/hadoop/ipc/Server.java | 27 ++++++-----
.../org/apache/hadoop/hdfs/ClientGSIContext.java | 7 ++-
.../hdfs/server/namenode/GlobalStateIdContext.java | 52 +++++++++++++++++++---
.../hadoop/hdfs/server/namenode/ha/HATestUtil.java | 20 +++++++++
.../server/namenode/ha/TestMultiObserverNode.java | 14 ++++++
6 files changed, 111 insertions(+), 21 deletions(-)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
index a435ff6..bcddfbf 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.ipc;
+import java.io.IOException;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
@@ -64,9 +66,15 @@ public interface AlignmentContext {
* client state info during RPC response header processing.
*
* @param header The RPC request header.
- * @return state id of in the request header.
+ * @param threshold a parameter to verify a condition when server
+ * should reject client request due to its state being too far
+ * misaligned with the client state.
+ * See implementation for more details.
+ * @return state id required for the server to execute the call.
+ * @throws IOException
*/
- long receiveRequestState(RpcRequestHeaderProto header);
+ long receiveRequestState(RpcRequestHeaderProto header, long threshold)
+ throws IOException;
/**
* Returns the last seen state id of the alignment context instance.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 20fee61..4f11541 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -2536,6 +2536,7 @@ public abstract class Server {
// Save the priority level assignment by the scheduler
call.setPriorityLevel(callQueue.getPriorityLevel(call));
+ call.markCallCoordinated(false);
if(alignmentContext != null && call.rpcRequest != null &&
(call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) {
// if call.rpcRequest is not RpcProtobufRequest, will skip the following
@@ -2544,23 +2545,21 @@ public abstract class Server {
// coordinated.
String methodName;
String protoName;
+ ProtobufRpcEngine.RpcProtobufRequest req =
+ (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
try {
- ProtobufRpcEngine.RpcProtobufRequest req =
- (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
methodName = req.getRequestHeader().getMethodName();
protoName = req.getRequestHeader().getDeclaringClassProtocolName();
+ if (alignmentContext.isCoordinatedCall(protoName, methodName)) {
+ call.markCallCoordinated(true);
+ long stateId;
+ stateId = alignmentContext.receiveRequestState(
+ header, getMaxIdleTime());
+ call.setClientStateId(stateId);
+ }
} catch (IOException ioe) {
- throw new RpcServerException("Rpc request header check fail", ioe);
- }
- if (!alignmentContext.isCoordinatedCall(protoName, methodName)) {
- call.markCallCoordinated(false);
- } else {
- call.markCallCoordinated(true);
- long stateId = alignmentContext.receiveRequestState(header);
- call.setClientStateId(stateId);
+ throw new RpcServerException("Processing RPC request caught ", ioe);
}
- } else {
- call.markCallCoordinated(false);
}
try {
@@ -3613,6 +3612,10 @@ public abstract class Server {
}
}
+ protected int getMaxIdleTime() {
+ return connectionManager.maxIdleTime;
+ }
+
public String getServerName() {
return serverName;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
index 6d366a6..a7bdd14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
+import java.io.IOException;
import java.util.concurrent.atomic.LongAccumulator;
/**
@@ -60,7 +61,8 @@ public class ClientGSIContext implements AlignmentContext {
}
/**
- * Client side implementation for receiving state alignment info in responses.
+ * Client side implementation for receiving state alignment info
+ * in responses.
*/
@Override
public void receiveResponseState(RpcResponseHeaderProto header) {
@@ -80,7 +82,8 @@ public class ClientGSIContext implements AlignmentContext {
* Client does not receive RPC requests therefore this does nothing.
*/
@Override
- public long receiveRequestState(RpcRequestHeaderProto header) {
+ public long receiveRequestState(RpcRequestHeaderProto header, long threshold)
+ throws IOException {
// Do nothing.
return 0;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
index ecb9fd3..2e48654 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java
@@ -20,12 +20,15 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.lang.reflect.Method;
import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
import org.apache.hadoop.ipc.AlignmentContext;
+import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@@ -36,8 +39,23 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
@InterfaceAudience.Private
@InterfaceStability.Stable
class GlobalStateIdContext implements AlignmentContext {
- private final FSNamesystem namesystem;
+ /**
+ * Estimated number of journal transactions a typical NameNode can execute
+ * per second. The number is used to estimate how long a client's
+ * RPC request will wait in the call queue before the Observer catches up
+ * with its state id.
+ */
+ private static final long ESTIMATED_TRANSACTIONS_PER_SECOND = 10000L;
+ /**
+ * The client wait time on an RPC request is composed of
+ * the server execution time plus the communication time.
+ * This is an expected fraction of the total wait time spent on
+ * server execution.
+ */
+ private static final float ESTIMATED_SERVER_TIME_MULTIPLIER = 0.8f;
+
+ private final FSNamesystem namesystem;
private final HashSet<String> coordinatedMethods;
/**
@@ -88,17 +106,41 @@ class GlobalStateIdContext implements AlignmentContext {
}
/**
- * Server side implementation for processing state alignment info in requests.
+ * Server-side implementation for processing state alignment info in
+ * requests.
+ * For Observer it compares the client and the server states and determines
+ * if it makes sense to wait until the server catches up with the client
+ * state. If not the server throws RetriableException so that the client
+ * could retry the call according to the retry policy with another Observer
+ * or the Active NameNode.
+ *
+ * @param header The RPC request header.
+ * @param clientWaitTime time in milliseconds indicating how long client
+ * waits for the server response. It is used to verify if the client's
+ * state is too far ahead of the server's
+ * @return the minimum of the state ids of the client or the server.
+ * @throws RetriableException if Observer is too far behind.
*/
@Override
- public long receiveRequestState(RpcRequestHeaderProto header) {
+ public long receiveRequestState(RpcRequestHeaderProto header,
+ long clientWaitTime) throws RetriableException {
long serverStateId =
namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
long clientStateId = header.getStateId();
if (clientStateId > serverStateId &&
- HAServiceProtocol.HAServiceState.ACTIVE.equals(namesystem.getState())) {
+ HAServiceState.ACTIVE.equals(namesystem.getState())) {
FSNamesystem.LOG.warn("A client sent stateId: " + clientStateId +
", but server state is: " + serverStateId);
+ return serverStateId;
+ }
+ if (HAServiceState.OBSERVER.equals(namesystem.getState()) &&
+ clientStateId - serverStateId >
+ ESTIMATED_TRANSACTIONS_PER_SECOND
+ * TimeUnit.MILLISECONDS.toSeconds(clientWaitTime)
+ * ESTIMATED_SERVER_TIME_MULTIPLIER) {
+ throw new RetriableException(
+ "Observer Node is too far behind: serverStateId = "
+ + serverStateId + " clientStateId = " + clientStateId);
}
return clientStateId;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
index d1095ad..9e83fc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java
@@ -25,6 +25,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSUtil.createUri;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.URI;
@@ -34,6 +35,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.LongAccumulator;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
@@ -43,6 +45,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -334,4 +337,21 @@ public abstract class HATestUtil {
}
}
}
+
+ /**
+ * Customize stateId of the client AlignmentContext for testing.
+ */
+ public static long setACStateId(DistributedFileSystem dfs,
+ long stateId) throws Exception {
+ ObserverReadProxyProvider<?> provider = (ObserverReadProxyProvider<?>)
+ ((RetryInvocationHandler<?>) Proxy.getInvocationHandler(
+ dfs.getClient().getNamenode())).getProxyProvider();
+ ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext());
+ Field f = ac.getClass().getDeclaredField("lastSeenStateId");
+ f.setAccessible(true);
+ LongAccumulator lastSeenStateId = (LongAccumulator)f.get(ac);
+ long currentStateId = lastSeenStateId.getThenReset();
+ lastSeenStateId.accumulate(stateId);
+ return currentStateId;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java
index 4aa3133..a8e1245 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestMultiObserverNode.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -138,6 +139,19 @@ public class TestMultiObserverNode {
dfsCluster.transitionToObserver(3);
}
+ @Test
+ public void testObserverFallBehind() throws Exception {
+ dfs.mkdir(testPath, FsPermission.getDefault());
+ assertSentTo(0);
+
+ // Set large state Id on the client
+ long realStateId = HATestUtil.setACStateId(dfs, 500000);
+ dfs.getFileStatus(testPath);
+ // Should end up on ANN
+ assertSentTo(0);
+ HATestUtil.setACStateId(dfs, realStateId);
+ }
+
private void assertSentTo(int... nnIndices) throws IOException {
assertTrue("Request was not sent to any of the expected namenodes.",
HATestUtil.isSentToAnyOfNameNodes(dfs, dfsCluster, nnIndices));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org