You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2023/02/27 17:56:33 UTC
[hadoop] branch trunk updated: HDFS-16890: RBF: Ensures router periodically refreshes its record of a namespace's state. (#5298)
This is an automated email from the ASF dual-hosted git repository.
omalley pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 61f369c43e2 HDFS-16890: RBF: Ensures router periodically refreshes its record of a namespace's state. (#5298)
61f369c43e2 is described below
commit 61f369c43e254796f997ec034a35ca764d723e38
Author: Simbarashe Dzinamarira <sd...@linkedin.com>
AuthorDate: Mon Feb 27 09:56:24 2023 -0800
HDFS-16890: RBF: Ensures router periodically refreshes its record of a namespace's state. (#5298)
---
.../server/federation/router/RBFConfigKeys.java | 4 ++
.../server/federation/router/RouterRpcClient.java | 58 +++++++++++++++++++---
.../src/main/resources/hdfs-rbf-default.xml | 10 ++++
.../federation/router/TestObserverWithRouter.java | 45 ++++++++++++++++-
4 files changed, 110 insertions(+), 7 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 7e07d7b6549..c0ee9504597 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -201,6 +201,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize";
public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5;
+ public static final String DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY =
+ FEDERATION_ROUTER_PREFIX + "observer.state.id.refresh.period";
+ public static final String DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_DEFAULT = "15s";
+
public static final String FEDERATION_STORE_SERIALIZER_CLASS =
FEDERATION_STORE_PREFIX + "serializer";
public static final Class<StateStoreSerializerPBImpl>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 06e64439011..92f1fc06a81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -57,6 +57,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -86,6 +87,7 @@ import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
import org.eclipse.jetty.util.ajax.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,6 +138,14 @@ public class RouterRpcClient {
private final boolean observerReadEnabledDefault;
/** Nameservice specific overrides of the default setting for enabling observer reads. */
private HashSet<String> observerReadEnabledOverrides = new HashSet<>();
+ /**
+ * Period to refresh namespace stateID using active namenode.
+ * This ensures the namespace stateID is fresh even when an
+ * observer is trailing behind.
+ */
+ private long activeNNStateIdRefreshPeriodMs;
+ /** Last msync times for each namespace. */
+ private final ConcurrentHashMap<String, LongAccumulator> lastActiveNNRefreshTimes;
/** Pattern to parse a stack trace line. */
private static final Pattern STACK_TRACE_PATTERN =
@@ -211,13 +221,25 @@ public class RouterRpcClient {
this.observerReadEnabledDefault = conf.getBoolean(
RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY,
RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE);
- String[] observerReadOverrides = conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES);
+ String[] observerReadOverrides =
+ conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES);
if (observerReadOverrides != null) {
observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides));
}
if (this.observerReadEnabledDefault) {
LOG.info("Observer read is enabled for router.");
}
+ this.activeNNStateIdRefreshPeriodMs = conf.getTimeDuration(
+ RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY,
+ RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_DEFAULT,
+ TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
+ if (activeNNStateIdRefreshPeriodMs < 0) {
+ LOG.info("Periodic stateId freshness check is disabled"
+ + " since '{}' is {}ms, which is less than 0.",
+ RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY,
+ activeNNStateIdRefreshPeriodMs);
+ }
+ this.lastActiveNNRefreshTimes = new ConcurrentHashMap<>();
}
/**
@@ -1707,10 +1729,13 @@ public class RouterRpcClient {
boolean isObserverRead) throws IOException {
final List<? extends FederationNamenodeContext> namenodes;
- if (RouterStateIdContext.getClientStateIdFromCurrentCall(nsId) > Long.MIN_VALUE) {
- namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, isObserverRead);
- } else {
- namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, false);
+ boolean listObserverNamenodesFirst = isObserverRead
+ && isNamespaceStateIdFresh(nsId)
+ && (RouterStateIdContext.getClientStateIdFromCurrentCall(nsId) > Long.MIN_VALUE);
+ namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, listObserverNamenodesFirst);
+ if (!listObserverNamenodesFirst) {
+ // Refresh time of last call to active NameNode.
+ getTimeOfLastCallToActive(nsId).accumulate(Time.monotonicNow());
}
if (namenodes == null || namenodes.isEmpty()) {
@@ -1721,7 +1746,8 @@ public class RouterRpcClient {
}
private boolean isObserverReadEligible(String nsId, Method method) {
- boolean isReadEnabledForNamespace = observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
+ boolean isReadEnabledForNamespace =
+ observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
return isReadEnabledForNamespace && isReadCall(method);
}
@@ -1735,4 +1761,24 @@ public class RouterRpcClient {
}
return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly();
}
+
+ /**
+ * Checks and sets last refresh time for a namespace's stateId.
+ * Returns true if refresh time is newer than threshold.
+ * Otherwise, return false and call should be handled by active namenode.
+ * @param nsId namespaceID
+ */
+ @VisibleForTesting
+ boolean isNamespaceStateIdFresh(String nsId) {
+ if (activeNNStateIdRefreshPeriodMs < 0) {
+ return true;
+ }
+ long timeSinceRefreshMs = Time.monotonicNow() - getTimeOfLastCallToActive(nsId).get();
+ return (timeSinceRefreshMs <= activeNNStateIdRefreshPeriodMs);
+ }
+
+ private LongAccumulator getTimeOfLastCallToActive(String namespaceId) {
+ return lastActiveNNRefreshTimes
+ .computeIfAbsent(namespaceId, key -> new LongAccumulator(Math::max, 0));
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index b5096cd253d..79a16cc2022 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -884,4 +884,14 @@
of namespaces in use and the latency of the msync requests.
</description>
</property>
+
+ <property>
+ <name>dfs.federation.router.observer.state.id.refresh.period</name>
+ <value>15s</value>
+ <description>
+ Period to refresh namespace stateID using active namenode. This ensures the
+ namespace stateID is refresh even when an observer is trailing behind.
+ If this is below 0, the auto-refresh is disabled.
+ </description>
+ </property>
</configuration>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
index 45001b461ba..72e8f8f66d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
@@ -34,9 +34,11 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.ClientGSIContext;
+import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServi
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
@@ -95,7 +98,9 @@ public class TestObserverWithRouter {
conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
if (confOverrides != null) {
- conf.addResource(confOverrides);
+ confOverrides
+ .iterator()
+ .forEachRemaining(entry -> conf.set(entry.getKey(), entry.getValue()));
}
cluster = new MiniRouterDFSCluster(true, 2, numberOfNamenode);
cluster.addNamenodeOverrides(conf);
@@ -639,4 +644,42 @@ public class TestObserverWithRouter {
assertEquals("ns0", namespace1.get(0));
assertTrue(namespace2.isEmpty());
}
+
+ @Test
+ @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
+ public void testPeriodicStateRefreshUsingActiveNamenode() throws Exception {
+ Path rootPath = new Path("/");
+
+ Configuration confOverride = new Configuration(false);
+ confOverride.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY, "500ms");
+ confOverride.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "3s");
+ startUpCluster(1, confOverride);
+
+ fileSystem = routerContext.getFileSystem(getConfToEnableObserverReads());
+ fileSystem.listStatus(rootPath);
+ int initialLengthOfRootListing = fileSystem.listStatus(rootPath).length;
+
+ DFSClient activeClient = cluster.getNamenodes("ns0")
+ .stream()
+ .filter(nnContext -> nnContext.getNamenode().isActiveState())
+ .findFirst().orElseThrow(() -> new IllegalStateException("No active namenode."))
+ .getClient();
+
+ for (int i = 0; i < 10; i++) {
+ activeClient.mkdirs("/dir" + i, null, false);
+ }
+ activeClient.close();
+
+ // Wait long enough for state in router to be considered stale.
+ GenericTestUtils.waitFor(
+ () -> !routerContext
+ .getRouterRpcClient()
+ .isNamespaceStateIdFresh("ns0"),
+ 100,
+ 10000,
+ "Timeout: Namespace state was never considered stale.");
+ FileStatus[] rootFolderAfterMkdir = fileSystem.listStatus(rootPath);
+ assertEquals("List-status should show newly created directories.",
+ initialLengthOfRootListing + 10, rootFolderAfterMkdir.length);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org