You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-issues@hadoop.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/12/26 09:08:00 UTC
[jira] [Commented] (YARN-8900) [Router] Federation: routing getContainers REST invocations transparently to multiple RMs
[ https://issues.apache.org/jira/browse/YARN-8900?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17651976#comment-17651976 ]
ASF GitHub Bot commented on YARN-8900:
--------------------------------------
freeeshi commented on code in PR #4543:
URL: https://github.com/apache/hadoop/pull/4543#discussion_r1057149227
##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java:
##########
@@ -1366,4 +1393,44 @@ public void shutdown() {
threadpool.shutdown();
}
}
+
+ private <R> Map<SubClusterInfo, R> invokeConcurrent(Collection<SubClusterInfo> clusterIds,
+ ClientMethod request, Class<R> clazz) {
+
+ Map<SubClusterInfo, R> results = new HashMap<>();
+
+ // Send the requests in parallel
+ CompletionService<R> compSvc = new ExecutorCompletionService<>(this.threadpool);
+
+ for (final SubClusterInfo info : clusterIds) {
+ compSvc.submit(() -> {
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(
+ info.getSubClusterId(), info.getRMWebServiceAddress());
+ try {
+ Method method = DefaultRequestInterceptorREST.class.
+ getMethod(request.getMethodName(), request.getTypes());
+ Object retObj = method.invoke(interceptor, request.getParams());
+ R ret = clazz.cast(retObj);
+ return ret;
+ } catch (Exception e) {
+ LOG.error("SubCluster {} failed to call {} method.", info.getSubClusterId(),
+ request.getMethodName(), e);
+ return null;
+ }
+ });
+ }
+
+ clusterIds.stream().forEach(clusterId -> {
+ try {
+ Future<R> future = compSvc.take();
+ R response = future.get();
+ if (response != null) {
+ results.put(clusterId, response);
Review Comment:
hi,@slfan1989,I have a question.
- Operation `compSvc.take()` is not guaranteed to be in the same order as operation `compSvc.submit()`, in addition, clusterIds type might be Set. So, `clusterId->response` maybe mistake?
- Can we do this like `FederationClientInterceptor#invokeConcurrent` ?
> [Router] Federation: routing getContainers REST invocations transparently to multiple RMs
> -----------------------------------------------------------------------------------------
>
> Key: YARN-8900
> URL: https://issues.apache.org/jira/browse/YARN-8900
> Project: Hadoop YARN
> Issue Type: Sub-task
> Components: federation, router
> Reporter: Giovanni Matteo Fumarola
> Assignee: Shilun Fan
> Priority: Major
> Labels: pull-request-available
> Fix For: 3.4.0
>
> Attachments: YARN-8900.v1.patch, YARN-8900.v2.patch
>
> Time Spent: 5h 50m
> Remaining Estimate: 0h
>
> This JIRA tracks the design/implementation of the layer for routing RMWebServicesProtocol requests to the appropriate RM(s) in a federated YARN cluster.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: yarn-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: yarn-issues-help@hadoop.apache.org