You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2023/02/27 17:56:33 UTC

[hadoop] branch trunk updated: HDFS-16890: RBF: Ensures router periodically refreshes its record of a namespace's state. (#5298)

This is an automated email from the ASF dual-hosted git repository.

omalley pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 61f369c43e2 HDFS-16890: RBF: Ensures router periodically refreshes its record of a namespace's state. (#5298)
61f369c43e2 is described below

commit 61f369c43e254796f997ec034a35ca764d723e38
Author: Simbarashe Dzinamarira <sd...@linkedin.com>
AuthorDate: Mon Feb 27 09:56:24 2023 -0800

    HDFS-16890: RBF: Ensures router periodically refreshes its record of a namespace's state. (#5298)
---
 .../server/federation/router/RBFConfigKeys.java    |  4 ++
 .../server/federation/router/RouterRpcClient.java  | 58 +++++++++++++++++++---
 .../src/main/resources/hdfs-rbf-default.xml        | 10 ++++
 .../federation/router/TestObserverWithRouter.java  | 45 ++++++++++++++++-
 4 files changed, 110 insertions(+), 7 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
index 7e07d7b6549..c0ee9504597 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java
@@ -201,6 +201,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic {
       FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize";
   public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5;
 
+  public static final String DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY =
+      FEDERATION_ROUTER_PREFIX + "observer.state.id.refresh.period";
+  public static final String DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_DEFAULT = "15s";
+
   public static final String FEDERATION_STORE_SERIALIZER_CLASS =
       FEDERATION_STORE_PREFIX + "serializer";
   public static final Class<StateStoreSerializerPBImpl>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index 06e64439011..92f1fc06a81 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -57,6 +57,7 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAccumulator;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -86,6 +87,7 @@ import org.apache.hadoop.net.ConnectTimeoutException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.eclipse.jetty.util.ajax.JSON;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -136,6 +138,14 @@ public class RouterRpcClient {
   private final boolean observerReadEnabledDefault;
   /** Nameservice specific overrides of the default setting for enabling observer reads. */
   private HashSet<String> observerReadEnabledOverrides = new HashSet<>();
+  /**
+   * Period to refresh namespace stateID using active namenode.
+   * This ensures the namespace stateID is fresh even when an
+   * observer is trailing behind.
+   */
+  private long activeNNStateIdRefreshPeriodMs;
+  /** Last msync times for each namespace. */
+  private final ConcurrentHashMap<String, LongAccumulator> lastActiveNNRefreshTimes;
 
   /** Pattern to parse a stack trace line. */
   private static final Pattern STACK_TRACE_PATTERN =
@@ -211,13 +221,25 @@ public class RouterRpcClient {
     this.observerReadEnabledDefault = conf.getBoolean(
         RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY,
         RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE);
-    String[] observerReadOverrides = conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES);
+    String[] observerReadOverrides =
+        conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES);
     if (observerReadOverrides != null) {
       observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides));
     }
     if (this.observerReadEnabledDefault) {
       LOG.info("Observer read is enabled for router.");
     }
+    this.activeNNStateIdRefreshPeriodMs = conf.getTimeDuration(
+        RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY,
+        RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_DEFAULT,
+        TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
+    if (activeNNStateIdRefreshPeriodMs < 0) {
+      LOG.info("Periodic stateId freshness check is disabled"
+              + " since '{}' is {}ms, which is less than 0.",
+          RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY,
+          activeNNStateIdRefreshPeriodMs);
+    }
+    this.lastActiveNNRefreshTimes = new ConcurrentHashMap<>();
   }
 
   /**
@@ -1707,10 +1729,13 @@ public class RouterRpcClient {
       boolean isObserverRead) throws IOException {
     final List<? extends FederationNamenodeContext> namenodes;
 
-    if (RouterStateIdContext.getClientStateIdFromCurrentCall(nsId) > Long.MIN_VALUE) {
-      namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, isObserverRead);
-    } else {
-      namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, false);
+    boolean listObserverNamenodesFirst = isObserverRead
+        && isNamespaceStateIdFresh(nsId)
+        && (RouterStateIdContext.getClientStateIdFromCurrentCall(nsId) > Long.MIN_VALUE);
+    namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, listObserverNamenodesFirst);
+    if (!listObserverNamenodesFirst) {
+      // Refresh time of last call to active NameNode.
+      getTimeOfLastCallToActive(nsId).accumulate(Time.monotonicNow());
     }
 
     if (namenodes == null || namenodes.isEmpty()) {
@@ -1721,7 +1746,8 @@ public class RouterRpcClient {
   }
 
   private boolean isObserverReadEligible(String nsId, Method method) {
-    boolean isReadEnabledForNamespace = observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
+    boolean isReadEnabledForNamespace =
+        observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId);
     return isReadEnabledForNamespace && isReadCall(method);
   }
 
@@ -1735,4 +1761,24 @@ public class RouterRpcClient {
     }
     return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly();
   }
+
+  /**
+   * Checks and sets last refresh time for a namespace's stateId.
+   * Returns true if refresh time is newer than threshold.
+   * Otherwise, return false and call should be handled by active namenode.
+   * @param nsId namespaceID
+   */
+  @VisibleForTesting
+  boolean isNamespaceStateIdFresh(String nsId) {
+    if (activeNNStateIdRefreshPeriodMs < 0) {
+      return true;
+    }
+    long timeSinceRefreshMs = Time.monotonicNow() - getTimeOfLastCallToActive(nsId).get();
+    return (timeSinceRefreshMs <= activeNNStateIdRefreshPeriodMs);
+  }
+
+  private LongAccumulator getTimeOfLastCallToActive(String namespaceId) {
+    return lastActiveNNRefreshTimes
+        .computeIfAbsent(namespaceId, key -> new LongAccumulator(Math::max, 0));
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
index b5096cd253d..79a16cc2022 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml
@@ -884,4 +884,14 @@
       of namespaces in use and the latency of the msync requests.
     </description>
   </property>
+
+  <property>
+    <name>dfs.federation.router.observer.state.id.refresh.period</name>
+    <value>15s</value>
+    <description>
+      Period to refresh namespace stateID using active namenode. This ensures the
+      namespace stateID is refresh even when an observer is trailing behind.
+      If this is below 0, the auto-refresh is disabled.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
index 45001b461ba..72e8f8f66d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java
@@ -34,9 +34,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAccumulator;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.ClientGSIContext;
+import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RouterFederatedStateProto;
@@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServi
 import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.AfterEach;
@@ -95,7 +98,9 @@ public class TestObserverWithRouter {
     conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");
     conf.setBoolean(DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY, true);
     if (confOverrides != null) {
-      conf.addResource(confOverrides);
+      confOverrides
+          .iterator()
+          .forEachRemaining(entry -> conf.set(entry.getKey(), entry.getValue()));
     }
     cluster = new MiniRouterDFSCluster(true, 2, numberOfNamenode);
     cluster.addNamenodeOverrides(conf);
@@ -639,4 +644,42 @@ public class TestObserverWithRouter {
     assertEquals("ns0", namespace1.get(0));
     assertTrue(namespace2.isEmpty());
   }
+
+  @Test
+  @Tag(SKIP_BEFORE_EACH_CLUSTER_STARTUP)
+  public void testPeriodicStateRefreshUsingActiveNamenode() throws Exception {
+    Path rootPath = new Path("/");
+
+    Configuration confOverride = new Configuration(false);
+    confOverride.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_STATE_ID_REFRESH_PERIOD_KEY, "500ms");
+    confOverride.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "3s");
+    startUpCluster(1, confOverride);
+
+    fileSystem  = routerContext.getFileSystem(getConfToEnableObserverReads());
+    fileSystem.listStatus(rootPath);
+    int initialLengthOfRootListing = fileSystem.listStatus(rootPath).length;
+
+    DFSClient activeClient = cluster.getNamenodes("ns0")
+        .stream()
+        .filter(nnContext -> nnContext.getNamenode().isActiveState())
+        .findFirst().orElseThrow(() -> new IllegalStateException("No active namenode."))
+        .getClient();
+
+    for (int i = 0; i < 10; i++) {
+      activeClient.mkdirs("/dir" + i, null, false);
+    }
+    activeClient.close();
+
+    // Wait long enough for state in router to be considered stale.
+    GenericTestUtils.waitFor(
+        () -> !routerContext
+            .getRouterRpcClient()
+            .isNamespaceStateIdFresh("ns0"),
+        100,
+        10000,
+        "Timeout: Namespace state was never considered stale.");
+    FileStatus[] rootFolderAfterMkdir = fileSystem.listStatus(rootPath);
+    assertEquals("List-status should show newly created directories.",
+        initialLengthOfRootListing + 10, rootFolderAfterMkdir.length);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org