You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-issues@hadoop.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/12/26 09:08:00 UTC

[jira] [Commented] (YARN-8900) [Router] Federation: routing getContainers REST invocations transparently to multiple RMs

    [ https://issues.apache.org/jira/browse/YARN-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651976#comment-17651976 ] 

ASF GitHub Bot commented on YARN-8900:
--------------------------------------

freeeshi commented on code in PR #4543:
URL: https://github.com/apache/hadoop/pull/4543#discussion_r1057149227


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java:
##########
@@ -1366,4 +1393,44 @@ public void shutdown() {
       threadpool.shutdown();
     }
   }
+
+  private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> clusterIds,
+      ClientMethod request, Class<R> clazz) {
+
+    Map<SubClusterInfo, R> results = new HashMap<>();
+
+    // Send the requests in parallel
+    CompletionService<R> compSvc = new ExecutorCompletionService<>(this.threadpool);
+
+    for (final SubClusterInfo info : clusterIds) {
+      compSvc.submit(() -> {
+        DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+            info.getSubClusterId(), info.getRMWebServiceAddress());
+        try {
+          Method method = DefaultRequestInterceptorREST.class.
+              getMethod(request.getMethodName(), request.getTypes());
+          Object retObj = method.invoke(interceptor, request.getParams());
+          R ret = clazz.cast(retObj);
+          return ret;
+        } catch (Exception e) {
+          LOG.error("SubCluster {} failed to call {} method.", info.getSubClusterId(),
+              request.getMethodName(), e);
+          return null;
+        }
+      });
+    }
+
+    clusterIds.stream().forEach(clusterId -> {
+      try {
+        Future<R> future = compSvc.take();
+        R response = future.get();
+        if (response != null) {
+          results.put(clusterId, response);

Review Comment:
   hi,@slfan1989,I have a question.
   - Operation `compSvc.take()` is not guaranteed to be in the same order as operation `compSvc.submit()`, in addition, clusterIds type might be Set. So, `clusterId->response` maybe mistake?
   - Can we do this like `FederationClientInterceptor#invokeConcurrent` ?





> [Router] Federation: routing getContainers REST invocations transparently to multiple RMs
> -----------------------------------------------------------------------------------------
>
>                 Key: YARN-8900
>                 URL: https://issues.apache.org/jira/browse/YARN-8900
>             Project: Hadoop YARN
>          Issue Type: Sub-task
>          Components: federation, router
>            Reporter: Giovanni Matteo Fumarola
>            Assignee: Shilun Fan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 3.4.0
>
>         Attachments: YARN-8900.v1.patch, YARN-8900.v2.patch
>
>          Time Spent: 5h 50m
>  Remaining Estimate: 0h
>
> This JIRA tracks the design/implementation of the layer for routing RMWebServicesProtocol requests to the appropriate RM(s) in a federated YARN cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: yarn-issues-help@hadoop.apache.org