You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by fe...@apache.org on 2021/09/10 02:31:15 UTC

[hadoop] branch branch-3.3 updated: HDFS-16210. RBF: Add the option of refreshCallQueue to RouterAdmin (#3379)

This is an automated email from the ASF dual-hosted git repository.

ferhui pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 8affaa6  HDFS-16210. RBF: Add the option of refreshCallQueue to RouterAdmin (#3379)
8affaa6 is described below

commit 8affaa6312300a89ed1eb12bcae3374f15a30e19
Author: Symious <yi...@foxmail.com>
AuthorDate: Thu Sep 9 09:57:27 2021 +0800

    HDFS-16210. RBF: Add the option of refreshCallQueue to RouterAdmin (#3379)
    
    (cherry picked from commit c0890e6d04dda6f2716d07427816721fbdf9c3b4)
---
 .../federation/router/RouterAdminServer.java       | 22 ++++++++++++-
 .../hadoop/hdfs/tools/federation/RouterAdmin.java  | 37 ++++++++++++++++++++++
 .../federation/router/TestRouterAdminCLI.java      |  9 ++++++
 3 files changed, 67 insertions(+), 1 deletion(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
index 159b103..4dd0693 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterAdminServer.java
@@ -78,11 +78,15 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.ProtobufRpcEngine2;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
 import org.apache.hadoop.ipc.RefreshRegistry;
 import org.apache.hadoop.ipc.RefreshResponse;
 import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos;
+import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos;
 import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
 import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ProxyUsers;
@@ -98,7 +102,7 @@ import org.apache.hadoop.thirdparty.protobuf.BlockingService;
  * router. It is created, started, and stopped by {@link Router}.
  */
 public class RouterAdminServer extends AbstractService
-    implements RouterAdminProtocol {
+    implements RouterAdminProtocol, RefreshCallQueueProtocol {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(RouterAdminServer.class);
@@ -184,8 +188,16 @@ public class RouterAdminServer extends AbstractService
         GenericRefreshProtocolProtos.GenericRefreshProtocolService.
         newReflectiveBlockingService(genericRefreshXlator);
 
+    RefreshCallQueueProtocolServerSideTranslatorPB refreshCallQueueXlator =
+        new RefreshCallQueueProtocolServerSideTranslatorPB(this);
+    BlockingService refreshCallQueueService =
+        RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService.
+        newReflectiveBlockingService(refreshCallQueueXlator);
+
     DFSUtil.addPBProtocol(conf, GenericRefreshProtocolPB.class,
         genericRefreshService, adminServer);
+    DFSUtil.addPBProtocol(conf, RefreshCallQueueProtocolPB.class,
+        refreshCallQueueService, adminServer);
   }
 
   /**
@@ -643,4 +655,12 @@ public class RouterAdminServer extends AbstractService
     ProxyUsers.refreshSuperUserGroupsConfiguration();
     return true;
   }
+
+  @Override // RefreshCallQueueProtocol
+  public void refreshCallQueue() throws IOException {
+    LOG.info("Refreshing call queue.");
+
+    Configuration configuration = new Configuration();
+    router.getRpcServer().getServer().refreshCallQueue(configuration);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
index 7422989..deadf3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/tools/federation/RouterAdmin.java
@@ -77,6 +77,8 @@ import org.apache.hadoop.ipc.RefreshResponse;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -388,6 +390,8 @@ public class RouterAdmin extends Configured implements Tool {
         exitCode = genericRefresh(argv, i);
       } else if ("-refreshSuperUserGroupsConfiguration".equals(cmd)) {
         exitCode = refreshSuperUserGroupsConfiguration();
+      } else if ("-refreshCallQueue".equals(cmd)) {
+        exitCode = refreshCallQueue();
       } else {
         throw new IllegalArgumentException("Unknown Command: " + cmd);
       }
@@ -1259,6 +1263,39 @@ public class RouterAdmin extends Configured implements Tool {
   }
 
   /**
+   * Refresh Router's call Queue.
+   *
+   * @throws IOException if the operation was not successful.
+   */
+  private int refreshCallQueue() throws IOException {
+    Configuration conf = getConf();
+    String hostport =  getConf().getTrimmed(
+        RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY,
+        RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_DEFAULT);
+
+    // Create the client
+    Class<?> xface = RefreshCallQueueProtocolPB.class;
+    InetSocketAddress address = NetUtils.createSocketAddr(hostport);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+    RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine2.class);
+    RefreshCallQueueProtocolPB proxy = (RefreshCallQueueProtocolPB)RPC.getProxy(
+        xface, RPC.getProtocolVersion(xface), address, ugi, conf,
+        NetUtils.getDefaultSocketFactory(conf), 0);
+
+    int returnCode = -1;
+    try (RefreshCallQueueProtocolClientSideTranslatorPB xlator =
+        new RefreshCallQueueProtocolClientSideTranslatorPB(proxy)) {
+      xlator.refreshCallQueue();
+      System.out.println("Refresh call queue successfully for " + hostport);
+      returnCode = 0;
+    } catch (IOException ioe){
+      System.out.println("Refresh call queue unsuccessfully for " + hostport);
+    }
+    return returnCode;
+  }
+
+  /**
    * Normalize a path for that filesystem.
    *
    * @param str Path to normalize. The path doesn't have scheme or authority.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
index 837607c..14a5965 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterAdminCLI.java
@@ -1559,6 +1559,15 @@ public class TestRouterAdminCLI {
     assertEquals(0, ToolRunner.run(admin, argv));
   }
 
+  @Test
+  public void testRefreshCallQueue() throws Exception {
+
+    System.setOut(new PrintStream(out));
+    String[] argv = new String[]{"-refreshCallQueue"};
+    assertEquals(0, ToolRunner.run(admin, argv));
+    assertTrue(out.toString().contains("Refresh call queue successfully"));
+  }
+
   private void addMountTable(String src, String nsId, String dst)
       throws Exception {
     String[] argv = new String[] {"-add", src, nsId, dst};

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org