You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/04/18 04:12:16 UTC

[GitHub] [hadoop] ferhui commented on a diff in pull request #4168: HDFS-16539. RBF: Support refreshing/changing router fairness policy controller without rebooting router

ferhui commented on code in PR #4168:
URL: https://github.com/apache/hadoop/pull/4168#discussion_r851862476


##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java:
##########
@@ -1585,21 +1591,20 @@ private void acquirePermit(
   /**
    * Release permit for specific nsId after processing against downstream
    * nsId is completed.
-   *
-   * @param nsId Identifier of the block pool.
+   *  @param nsId Identifier of the block pool.
    * @param ugi UserGroupIdentifier associated with the user.
    * @param m Remote method that needs to be invoked.
+   * @param controller fairness policy controller to release permit from
    */
-  private void releasePermit(
-      final String nsId, final UserGroupInformation ugi, final RemoteMethod m) {
-    if (routerRpcFairnessPolicyController != null) {
-      routerRpcFairnessPolicyController.releasePermit(nsId);
+  private void releasePermit(final String nsId, final UserGroupInformation ugi,
+      final RemoteMethod m, RouterRpcFairnessPolicyController controller) {
+    if (controller != null) {
+      controller.releasePermit(nsId);
       LOG.trace("Permit released for ugi: {} for method: {}", ugi,
           m.getMethodName());
     }
   }
 
-  @VisibleForTesting
   public RouterRpcFairnessPolicyController
       getRouterRpcFairnessPolicyController() {
     return routerRpcFairnessPolicyController;

Review Comment:
   routerRpcFairnessPolicyController should be volatile?



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java:
##########
@@ -1622,4 +1627,9 @@ public Long getAcceptedPermitForNs(String ns) {
     return acceptedPermitsPerNs.containsKey(ns) ?
         acceptedPermitsPerNs.get(ns).longValue() : 0L;
   }
+
+  public String refreshFairnessPolicyController(Configuration conf) {
+    routerRpcFairnessPolicyController = FederationUtil.newFairnessPolicyController(conf);

Review Comment:
   newInstance can return null
   maybe we need to check it and assign it to routerRpcFairnessPolicyController if it is not null



##########
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java:
##########
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.federation.fairness;
+
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
+import org.apache.hadoop.hdfs.server.federation.router.RemoteMethod;
+import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
+
+import static org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessConstants.CONCURRENT_NS;
+import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestRouterRefreshFairnessPolicyController {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestRouterRefreshFairnessPolicyController.class);
+
+  private StateStoreDFSCluster cluster;
+
+  @After
+  public void cleanup() {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Before
+  public void setupCluster() throws Exception {
+    cluster = new StateStoreDFSCluster(false, 1);
+    Configuration conf = new RouterConfigBuilder().stateStore().rpc().build();
+
+    // Handlers concurrent:ns0 = 3:3
+    conf.setClass(RBFConfigKeys.DFS_ROUTER_FAIRNESS_POLICY_CONTROLLER_CLASS,
+        StaticRouterRpcFairnessPolicyController.class, RouterRpcFairnessPolicyController.class);
+    conf.setInt(RBFConfigKeys.DFS_ROUTER_HANDLER_COUNT_KEY, 6);
+
+    // Datanodes not needed for this test.
+    cluster.setNumDatanodesPerNameservice(0);
+
+    cluster.addRouterOverrides(conf);
+    cluster.startCluster();
+    cluster.startRouters();
+    cluster.waitClusterUp();
+  }
+
+  @Test
+  public void testRefreshStaticChangeHandlers() throws Exception {
+    MiniRouterDFSCluster.RouterContext routerContext = cluster.getRandomRouter();
+    RemoteMethod dummyMethod = Mockito.mock(RemoteMethod.class);
+    RouterRpcClient client = Mockito.spy(routerContext.getRouterRpcClient());
+    final long sleepTime = 3000;
+    Mockito.doAnswer(invocationOnMock -> {
+      Thread.sleep(sleepTime);
+      return null;
+    }).when(client)
+        .invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any());
+
+    final int nThreads = 3;
+    Thread[] threadAcquirePermits = new Thread[nThreads];
+    for (int i = 0; i < nThreads; i++) {
+      Thread threadAcquirePermit = new Thread(() -> {
+        try {
+          client.invokeSingle("ns0", dummyMethod);
+        } catch (IOException e) {
+          e.printStackTrace();
+        }
+      });
+      threadAcquirePermits[i] = threadAcquirePermit;
+      threadAcquirePermits[i].start();
+    }
+
+    Thread.sleep(1000);
+
+    Configuration conf = routerContext.getConf();
+    final int newNs0Permits = 1; // Set to smaller than current handler count (3)
+    conf.setInt(DFS_ROUTER_FAIR_HANDLER_COUNT_KEY_PREFIX + "ns0", newNs0Permits);
+    Thread threadRefreshController = new Thread(() -> {
+      client.refreshFairnessPolicyController(routerContext.getConf());
+    });
+    threadRefreshController.start();
+    threadRefreshController.join();
+
+    StaticRouterRpcFairnessPolicyController controller =
+        (StaticRouterRpcFairnessPolicyController) client.getRouterRpcFairnessPolicyController();
+    for (int i = 0; i < nThreads; i++) {
+      threadAcquirePermits[i].join();
+    }
+
+    // Controller should now have 5:1 handlers for concurrent:ns0
+    for (int i = 0; i < 5; i++) {
+      assertTrue(controller.acquirePermit(CONCURRENT_NS));
+    }
+    // Invocations before refresh should not interfere with invocations after
+    assertTrue(controller.acquirePermit("ns0"));

Review Comment:
   It's better to use metrics to show that it works.
   can we add a case to show that the requests can be handled by the orignal handler after we change it to a new one?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org