You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/09/20 16:46:31 UTC

[GitHub] [hadoop] goiri commented on a diff in pull request #4904: YARN-11238. Optimizing FederationClientInterceptor Call with Parallelism.

goiri commented on code in PR #4904:
URL: https://github.com/apache/hadoop/pull/4904#discussion_r975598672


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -773,24 +761,19 @@ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
       RouterServerUtil.logAndThrowException("Missing getClusterNodes request.", null);
     }
     long startTime = clock.getTime();
-    Map<SubClusterId, SubClusterInfo> subClusters =
-        federationFacade.getSubClusters(true);
-    Map<SubClusterId, GetClusterNodesResponse> clusterNodes = Maps.newHashMap();
-    for (SubClusterId subClusterId : subClusters.keySet()) {
-      ApplicationClientProtocol client;
-      try {
-        client = getClientRMProxyForSubCluster(subClusterId);
-        GetClusterNodesResponse response = client.getClusterNodes(request);
-        clusterNodes.put(subClusterId, response);
-      } catch (Exception ex) {
-        routerMetrics.incrClusterNodesFailedRetrieved();
-        RouterServerUtil.logAndThrowException("Unable to get cluster nodes due to exception.", ex);
-      }
+    ClientMethod remoteMethod = new ClientMethod("getClusterNodes",
+        new Class[]{GetClusterNodesRequest.class}, new Object[]{request});
+    Collection<GetClusterNodesResponse> clusterNodes = null;
+    try {
+      clusterNodes = invokeConcurrent(remoteMethod, GetClusterNodesResponse.class);

Review Comment:
   Can we define and return inside the try?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java:
##########
@@ -728,6 +729,18 @@ public FederationStateStore getStateStore() {
   }
 
   /**
+   * Get Active's SubClusterIds{@link SubClusterId}.
+   *
+   * @return SubClusterId Collection.
+   * @throws YarnException if the call to get active subClusterIds is unsuccessful
+   */
+  public Collection<SubClusterId> getActiveSubClusterIds() throws YarnException {
+    Map<SubClusterId, SubClusterInfo> activeSubClusters =
+        getSubClusters(true);

Review Comment:
   Can we put the true in a variable to make it easier to understand what the true represents?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -702,67 +698,59 @@ public GetClusterMetricsResponse getClusterMetrics(
     return RouterYarnClientUtils.merge(clusterMetrics);
   }
 
-  <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
-      ClientMethod request, Class<R> clazz) throws YarnException, IOException {
-    List<Callable<Object>> callables = new ArrayList<>();
-    List<Future<Object>> futures = new ArrayList<>();
-    Map<SubClusterId, IOException> exceptions = new TreeMap<>();
-    for (SubClusterId subClusterId : clusterIds) {
-      callables.add(new Callable<Object>() {
-        @Override
-        public Object call() throws Exception {
-          ApplicationClientProtocol protocol =
-              getClientRMProxyForSubCluster(subClusterId);
-          Method method = ApplicationClientProtocol.class
-              .getMethod(request.getMethodName(), request.getTypes());
-          return method.invoke(protocol, request.getParams());
-        }
+  <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
+      throws YarnException {
+
+    Collection<SubClusterId> subClusterIds = federationFacade.getActiveSubClusterIds();
+
+    List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
+    List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
+    Map<SubClusterId, Exception> exceptions = new TreeMap<>();
+
+    // Generate parallel Callable tasks
+    for (SubClusterId subClusterId : subClusterIds) {
+      callables.add(() -> {
+        ApplicationClientProtocol protocol =

Review Comment:
   Single line.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java:
##########
@@ -728,6 +729,18 @@ public FederationStateStore getStateStore() {
   }
 
   /**
+   * Get Active's SubClusterIds{@link SubClusterId}.
+   *
+   * @return SubClusterId Collection.
+   * @throws YarnException if the call to get active subClusterIds is unsuccessful
+   */
+  public Collection<SubClusterId> getActiveSubClusterIds() throws YarnException {
+    Map<SubClusterId, SubClusterInfo> activeSubClusters =

Review Comment:
   Single line.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java:
##########
@@ -702,67 +698,59 @@ public GetClusterMetricsResponse getClusterMetrics(
     return RouterYarnClientUtils.merge(clusterMetrics);
   }
 
-  <R> Map<SubClusterId, R> invokeConcurrent(ArrayList<SubClusterId> clusterIds,
-      ClientMethod request, Class<R> clazz) throws YarnException, IOException {
-    List<Callable<Object>> callables = new ArrayList<>();
-    List<Future<Object>> futures = new ArrayList<>();
-    Map<SubClusterId, IOException> exceptions = new TreeMap<>();
-    for (SubClusterId subClusterId : clusterIds) {
-      callables.add(new Callable<Object>() {
-        @Override
-        public Object call() throws Exception {
-          ApplicationClientProtocol protocol =
-              getClientRMProxyForSubCluster(subClusterId);
-          Method method = ApplicationClientProtocol.class
-              .getMethod(request.getMethodName(), request.getTypes());
-          return method.invoke(protocol, request.getParams());
-        }
+  <R> Collection<R> invokeConcurrent(ClientMethod request, Class<R> clazz)
+      throws YarnException {
+
+    Collection<SubClusterId> subClusterIds = federationFacade.getActiveSubClusterIds();
+
+    List<Callable<Pair<SubClusterId, Object>>> callables = new ArrayList<>();
+    List<Future<Pair<SubClusterId, Object>>> futures = new ArrayList<>();
+    Map<SubClusterId, Exception> exceptions = new TreeMap<>();
+
+    // Generate parallel Callable tasks
+    for (SubClusterId subClusterId : subClusterIds) {
+      callables.add(() -> {
+        ApplicationClientProtocol protocol =
+            getClientRMProxyForSubCluster(subClusterId);
+        Method method = ApplicationClientProtocol.class

Review Comment:
   Single line.



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -582,9 +583,9 @@ public void testGetClusterMetricsRequest() throws Exception {
     ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
         new Class[] {GetClusterMetricsRequest.class},
         new Object[] {GetClusterMetricsRequest.newInstance()});
-    Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
-        invokeConcurrent(new ArrayList<>(), remoteMethod, GetClusterMetricsResponse.class);
-    Assert.assertTrue(clusterMetrics.isEmpty());
+    Collection<GetClusterMetricsResponse> clusterMetrics = interceptor.
+        invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class);
+    Assert.assertTrue(!clusterMetrics.isEmpty());

Review Comment:
   Now the expectation is the opposite? What has changed?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java:
##########
@@ -582,9 +583,9 @@ public void testGetClusterMetricsRequest() throws Exception {
     ClientMethod remoteMethod = new ClientMethod("getClusterMetrics",
         new Class[] {GetClusterMetricsRequest.class},
         new Object[] {GetClusterMetricsRequest.newInstance()});
-    Map<SubClusterId, GetClusterMetricsResponse> clusterMetrics = interceptor.
-        invokeConcurrent(new ArrayList<>(), remoteMethod, GetClusterMetricsResponse.class);
-    Assert.assertTrue(clusterMetrics.isEmpty());
+    Collection<GetClusterMetricsResponse> clusterMetrics = interceptor.
+        invokeConcurrent(remoteMethod, GetClusterMetricsResponse.class);
+    Assert.assertTrue(!clusterMetrics.isEmpty());

Review Comment:
   assertFalse



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org