You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/10/28 17:40:18 UTC

[GitHub] [hadoop] goiri commented on a diff in pull request #5057: YARN-11359. [Federation] Routing admin invocations transparently to multiple RMs.

goiri commented on code in PR #5057:
URL: https://github.com/apache/hadoop/pull/5057#discussion_r1008299199


##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java:
##########
@@ -491,4 +514,24 @@ public static SubClusterId getRandomActiveSubCluster(
     // Randomly choose a SubCluster
     return subClusterIds.get(rand.nextInt(subClusterIds.size()));
   }
+
+  public static UserGroupInformation setupUser(String userName) {
+    UserGroupInformation user = null;
+    try {
+      // Do not create a proxy user if user name matches the user name on
+      // current UGI
+      if (UserGroupInformation.isSecurityEnabled()) {
+        user = UserGroupInformation.createProxyUser(userName, UserGroupInformation.getLoginUser());
+      } else if (userName.equalsIgnoreCase(UserGroupInformation.getCurrentUser().getUserName())) {

Review Comment:
   userName could be potentiall null



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java:
##########
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
+import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.router.RouterMetrics;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.*;

Review Comment:
   Expand



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/FederationRMAdminInterceptor.java:
##########
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodesToAttributesMappingResponse;
+import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.router.RouterMetrics;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.*;
+import java.util.concurrent.*;
+
+public class FederationRMAdminInterceptor extends AbstractRMAdminRequestInterceptor {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationRMAdminInterceptor.class);
+
+  private Map<SubClusterId, ResourceManagerAdministrationProtocol> adminRMProxies;
+  private FederationStateStoreFacade federationFacade;
+  private final Clock clock = new MonotonicClock();
+  private RouterMetrics routerMetrics;
+  private ThreadPoolExecutor executorService;
+  private Configuration conf;
+
+  @Override
+  public void init(String userName) {
+    super.init(userName);
+
+    int numThreads = getConf().getInt(
+        YarnConfiguration.ROUTER_USER_CLIENT_THREADS_SIZE,
+        YarnConfiguration.DEFAULT_ROUTER_USER_CLIENT_THREADS_SIZE);
+    ThreadFactory threadFactory = new ThreadFactoryBuilder()
+        .setNameFormat("RPC Router RMAdminClient-" + userName + "-%d ").build();
+
+    BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
+    this.executorService = new ThreadPoolExecutor(numThreads, numThreads,
+        0L, TimeUnit.MILLISECONDS, workQueue, threadFactory);
+
+    federationFacade = FederationStateStoreFacade.getInstance();
+    this.conf = this.getConf();
+    this.adminRMProxies = new ConcurrentHashMap<>();
+    routerMetrics = RouterMetrics.getMetrics();
+  }
+
+  @VisibleForTesting
+  protected ResourceManagerAdministrationProtocol getAdminRMProxyForSubCluster(
+      SubClusterId subClusterId) throws YarnException {
+
+    if (adminRMProxies.containsKey(subClusterId)) {
+      return adminRMProxies.get(subClusterId);
+    }
+
+    ResourceManagerAdministrationProtocol adminRMProxy = null;
+    try {
+      boolean serviceAuthEnabled = this.conf.getBoolean(
+          CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false);
+      UserGroupInformation realUser = user;
+      if (serviceAuthEnabled) {
+        realUser = UserGroupInformation.createProxyUser(
+            user.getShortUserName(), UserGroupInformation.getLoginUser());
+      }
+      adminRMProxy = FederationProxyProviderUtil.createRMProxy(getConf(),
+          ResourceManagerAdministrationProtocol.class, subClusterId, realUser);
+    } catch (Exception e) {
+      RouterServerUtil.logAndThrowException(e,
+          "Unable to create the interface to reach the SubCluster %s", subClusterId);
+    }
+    adminRMProxies.put(subClusterId, adminRMProxy);
+    return adminRMProxy;
+  }
+
+  @Override
+  public void setNextInterceptor(RMAdminRequestInterceptor next) {
+    throw new YarnRuntimeException("setNextInterceptor is being called on "
+       + "FederationRMAdminRequestInterceptor, which should be the last one "
+       + "in the chain. Check if the interceptor pipeline configuration "
+       + "is correct");
+  }
+
+  @Override
+  public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
+      throws StandbyException, YarnException, IOException {
+
+    // parameter verification.
+    if (request == null) {
+      routerMetrics.incrRefreshQueuesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Missing RefreshQueues request.", null);
+    }
+
+    // call refreshQueues of activeSubClusters.
+    try {
+      long startTime = clock.getTime();
+      AdminMethod remoteMethod = new AdminMethod("refreshQueues",
+          new Class[] {RefreshQueuesRequest.class}, new Object[] {request});
+      Collection<RefreshQueuesResponse> refreshQueueResps =
+          invokeConcurrent(remoteMethod, RefreshQueuesResponse.class);
+
+      // If we get the return result from refreshQueueResps,
+      // it means that the call has been successful,
+      // and the RefreshQueuesResponse method can be reconstructed and returned.
+      if (CollectionUtils.isNotEmpty(refreshQueueResps)) {
+        long stopTime = clock.getTime();
+        routerMetrics.succeededRefreshQueuesRetrieved(stopTime - startTime);
+        return RefreshQueuesResponse.newInstance();
+      }
+    } catch (Exception e) {
+      routerMetrics.incrRefreshQueuesFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Unable to refreshQueue to exception.", e);
+    }
+
+    routerMetrics.incrRefreshQueuesFailedRetrieved();
+    throw new YarnException("Unable to refreshQueue.");
+  }
+
+  @Override
+  public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
+      throws StandbyException, YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+      RefreshSuperUserGroupsConfigurationRequest request)
+      throws StandbyException, YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+      RefreshUserToGroupsMappingsRequest request)
+      throws StandbyException, YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshAdminAclsResponse refreshAdminAcls(RefreshAdminAclsRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshServiceAclsResponse refreshServiceAcls(RefreshServiceAclsRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public UpdateNodeResourceResponse updateNodeResource(UpdateNodeResourceRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshNodesResourcesResponse refreshNodesResources(RefreshNodesResourcesRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
+      AddToClusterNodeLabelsRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+      RemoveFromClusterNodeLabelsRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(ReplaceLabelsOnNodeRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+      CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+      RefreshClusterMaxPriorityRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodesToAttributesMappingResponse mapAttributesToNodes(
+      NodesToAttributesMappingRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public String[] getGroupsForUser(String user) throws IOException {
+    return new String[0];
+  }
+
+  <R> Collection<R> invokeConcurrent(AdminMethod request, Class<R> clazz)

Review Comment:
   Don't we have very similar methods in other places?
   Can we refactor?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java:
##########
@@ -200,7 +200,6 @@ public void shutdown() {
         MockNM mockNM = mockNMs.getOrDefault(subClusterId, null);
         try {
           mockNM.unRegisterNode();
-          mockNM = null;

Review Comment:
   no need?



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java:
##########
@@ -491,4 +514,24 @@ public static SubClusterId getRandomActiveSubCluster(
     // Randomly choose a SubCluster
     return subClusterIds.get(rand.nextInt(subClusterIds.size()));
   }
+
+  public static UserGroupInformation setupUser(String userName) {

Review Comment:
   final String userName



##########
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AdminMethod.java:
##########
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+
+/**
+ * Class to define admin method,params and arguments.
+ */
+public class AdminMethod {

Review Comment:
   Is there anything really admin about this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org