You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/11 19:20:31 UTC
[10/42] hadoop git commit: YARN-2884. Added a proxy service in NM to
proxy the the communication between AM and RM. Contributed by Kishore
Chaliparambil
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
new file mode 100644
index 0000000..964379a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -0,0 +1,677 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+/**
+ * Base class for all the AMRMProxyService test cases. It provides utility
+ * methods that can be used by the concrete test case classes
+ *
+ */
+public abstract class BaseAMRMProxyTest {
+ private static final Log LOG = LogFactory
+ .getLog(BaseAMRMProxyTest.class);
+ /**
+ * The AMRMProxyService instance that will be used by all the test cases
+ */
+ private MockAMRMProxyService amrmProxyService;
+ /**
+ * Thread pool used for asynchronous operations
+ */
+ private static ExecutorService threadpool = Executors
+ .newCachedThreadPool();
+ private Configuration conf;
+ private AsyncDispatcher dispatcher;
+
+ protected MockAMRMProxyService getAMRMProxyService() {
+ Assert.assertNotNull(this.amrmProxyService);
+ return this.amrmProxyService;
+ }
+
+ @Before
+ public void setUp() {
+ this.conf = new YarnConfiguration();
+ this.conf.setBoolean(YarnConfiguration.AMRM_PROXY_ENABLED, true);
+ String mockPassThroughInterceptorClass =
+ PassThroughRequestInterceptor.class.getName();
+
+ // Create a request intercepter pipeline for testing. The last one in the
+ // chain will call the mock resource manager. The others in the chain will
+ // simply forward it to the next one in the chain
+ this.conf.set(YarnConfiguration.AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE,
+ mockPassThroughInterceptorClass + ","
+ + mockPassThroughInterceptorClass + ","
+ + mockPassThroughInterceptorClass + ","
+ + MockRequestInterceptor.class.getName());
+
+ this.dispatcher = new AsyncDispatcher();
+ this.dispatcher.init(conf);
+ this.dispatcher.start();
+ this.amrmProxyService = createAndStartAMRMProxyService();
+ }
+
+ @After
+ public void tearDown() {
+ amrmProxyService.stop();
+ amrmProxyService = null;
+ this.dispatcher.stop();
+ }
+
+ protected ExecutorService getThreadPool() {
+ return threadpool;
+ }
+
+ protected MockAMRMProxyService createAndStartAMRMProxyService() {
+ MockAMRMProxyService svc =
+ new MockAMRMProxyService(new NullContext(), dispatcher);
+ svc.init(conf);
+ svc.start();
+ return svc;
+ }
+
+ /**
+ * This helper method will invoke the specified function in parallel for each
+ * end point in the specified list using a thread pool and return the
+ * responses received from the function. It implements the logic required for
+ * dispatching requests in parallel and waiting for the responses. If any of
+ * the function call fails or times out, it will ignore and proceed with the
+ * rest. So the responses returned can be less than the number of end points
+ * specified
+ *
+ * @param testContext
+ * @param func
+ * @return
+ */
+ protected <T, R> List<R> runInParallel(List<T> testContexts,
+ final Function<T, R> func) {
+ ExecutorCompletionService<R> completionService =
+ new ExecutorCompletionService<R>(this.getThreadPool());
+ LOG.info("Sending requests to endpoints asynchronously. Number of test contexts="
+ + testContexts.size());
+ for (int index = 0; index < testContexts.size(); index++) {
+ final T testContext = testContexts.get(index);
+
+ LOG.info("Adding request to threadpool for test context: "
+ + testContext.toString());
+
+ completionService.submit(new Callable<R>() {
+ @Override
+ public R call() throws Exception {
+ LOG.info("Sending request. Test context:"
+ + testContext.toString());
+
+ R response = null;
+ try {
+ response = func.invoke(testContext);
+ LOG.info("Successfully sent request for context: "
+ + testContext.toString());
+ } catch (Throwable ex) {
+ LOG.error("Failed to process request for context: "
+ + testContext);
+ response = null;
+ }
+
+ return response;
+ }
+ });
+ }
+
+ ArrayList<R> responseList = new ArrayList<R>();
+ LOG.info("Waiting for responses from endpoints. Number of contexts="
+ + testContexts.size());
+ for (int i = 0; i < testContexts.size(); ++i) {
+ try {
+ final Future<R> future = completionService.take();
+ final R response = future.get(3000, TimeUnit.MILLISECONDS);
+ responseList.add(response);
+ } catch (Throwable e) {
+ LOG.error("Failed to process request " + e.getMessage());
+ }
+ }
+
+ return responseList;
+ }
+
+ /**
+ * Helper method to register an application master using specified testAppId
+ * as the application identifier and return the response
+ *
+ * @param testAppId
+ * @return
+ * @throws Exception
+ * @throws YarnException
+ * @throws IOException
+ */
+ protected RegisterApplicationMasterResponse registerApplicationMaster(
+ final int testAppId) throws Exception, YarnException, IOException {
+ final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
+
+ return ugi
+ .getUser()
+ .doAs(
+ new PrivilegedExceptionAction<RegisterApplicationMasterResponse>() {
+ @Override
+ public RegisterApplicationMasterResponse run()
+ throws Exception {
+ getAMRMProxyService().initApp(
+ ugi.getAppAttemptId(),
+ ugi.getUser().getUserName());
+
+ final RegisterApplicationMasterRequest req =
+ Records
+ .newRecord(RegisterApplicationMasterRequest.class);
+ req.setHost(Integer.toString(testAppId));
+ req.setRpcPort(testAppId);
+ req.setTrackingUrl("");
+
+ RegisterApplicationMasterResponse response =
+ getAMRMProxyService().registerApplicationMaster(req);
+ return response;
+ }
+ });
+ }
+
+ /**
+ * Helper method that can be used to register multiple application masters in
+ * parallel to the specified RM end points
+ *
+ * @param testContexts - used to identify the requests
+ * @return
+ */
+ protected <T> List<RegisterApplicationMasterResponseInfo<T>> registerApplicationMastersInParallel(
+ final ArrayList<T> testContexts) {
+ List<RegisterApplicationMasterResponseInfo<T>> responses =
+ runInParallel(testContexts,
+ new Function<T, RegisterApplicationMasterResponseInfo<T>>() {
+ @Override
+ public RegisterApplicationMasterResponseInfo<T> invoke(
+ T testContext) {
+ RegisterApplicationMasterResponseInfo<T> response = null;
+ try {
+ int index = testContexts.indexOf(testContext);
+ response =
+ new RegisterApplicationMasterResponseInfo<T>(
+ registerApplicationMaster(index), testContext);
+ Assert.assertNotNull(response.getResponse());
+ Assert.assertEquals(Integer.toString(index), response
+ .getResponse().getQueue());
+
+ LOG.info("Sucessfully registered application master with test context: "
+ + testContext);
+ } catch (Throwable ex) {
+ response = null;
+ LOG.error("Failed to register application master with test context: "
+ + testContext);
+ }
+
+ return response;
+ }
+ });
+
+ Assert.assertEquals(
+ "Number of responses received does not match with request",
+ testContexts.size(), responses.size());
+
+ Set<T> contextResponses = new TreeSet<T>();
+ for (RegisterApplicationMasterResponseInfo<T> item : responses) {
+ contextResponses.add(item.getTestContext());
+ }
+
+ for (T ep : testContexts) {
+ Assert.assertTrue(contextResponses.contains(ep));
+ }
+
+ return responses;
+ }
+
+ /**
+ * Unregisters the application master for specified application id
+ *
+ * @param appId
+ * @param status
+ * @return
+ * @throws Exception
+ * @throws YarnException
+ * @throws IOException
+ */
+ protected FinishApplicationMasterResponse finishApplicationMaster(
+ final int appId, final FinalApplicationStatus status)
+ throws Exception, YarnException, IOException {
+
+ final ApplicationUserInfo ugi = getApplicationUserInfo(appId);
+
+ return ugi.getUser().doAs(
+ new PrivilegedExceptionAction<FinishApplicationMasterResponse>() {
+ @Override
+ public FinishApplicationMasterResponse run() throws Exception {
+ final FinishApplicationMasterRequest req =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+ req.setDiagnostics("");
+ req.setTrackingUrl("");
+ req.setFinalApplicationStatus(status);
+
+ FinishApplicationMasterResponse response =
+ getAMRMProxyService().finishApplicationMaster(req);
+
+ getAMRMProxyService().stopApp(
+ ugi.getAppAttemptId().getApplicationId());
+
+ return response;
+ }
+ });
+ }
+
+ protected <T> List<FinishApplicationMasterResponseInfo<T>> finishApplicationMastersInParallel(
+ final ArrayList<T> testContexts) {
+ List<FinishApplicationMasterResponseInfo<T>> responses =
+ runInParallel(testContexts,
+ new Function<T, FinishApplicationMasterResponseInfo<T>>() {
+ @Override
+ public FinishApplicationMasterResponseInfo<T> invoke(
+ T testContext) {
+ FinishApplicationMasterResponseInfo<T> response = null;
+ try {
+ response =
+ new FinishApplicationMasterResponseInfo<T>(
+ finishApplicationMaster(
+ testContexts.indexOf(testContext),
+ FinalApplicationStatus.SUCCEEDED),
+ testContext);
+ Assert.assertNotNull(response.getResponse());
+
+ LOG.info("Sucessfully finished application master with test contexts: "
+ + testContext);
+ } catch (Throwable ex) {
+ response = null;
+ LOG.error("Failed to finish application master with test context: "
+ + testContext);
+ }
+
+ return response;
+ }
+ });
+
+ Assert.assertEquals(
+ "Number of responses received does not match with request",
+ testContexts.size(), responses.size());
+
+ Set<T> contextResponses = new TreeSet<T>();
+ for (FinishApplicationMasterResponseInfo<T> item : responses) {
+ Assert.assertNotNull(item);
+ Assert.assertNotNull(item.getResponse());
+ contextResponses.add(item.getTestContext());
+ }
+
+ for (T ep : testContexts) {
+ Assert.assertTrue(contextResponses.contains(ep));
+ }
+
+ return responses;
+ }
+
+ protected AllocateResponse allocate(final int testAppId)
+ throws Exception, YarnException, IOException {
+ final AllocateRequest req = Records.newRecord(AllocateRequest.class);
+ req.setResponseId(testAppId);
+ return allocate(testAppId, req);
+ }
+
+ protected AllocateResponse allocate(final int testAppId,
+ final AllocateRequest request) throws Exception, YarnException,
+ IOException {
+
+ final ApplicationUserInfo ugi = getApplicationUserInfo(testAppId);
+
+ return ugi.getUser().doAs(
+ new PrivilegedExceptionAction<AllocateResponse>() {
+ @Override
+ public AllocateResponse run() throws Exception {
+ AllocateResponse response =
+ getAMRMProxyService().allocate(request);
+ return response;
+ }
+ });
+ }
+
+ protected ApplicationUserInfo getApplicationUserInfo(final int testAppId) {
+ final ApplicationAttemptId attemptId =
+ getApplicationAttemptId(testAppId);
+
+ UserGroupInformation ugi =
+ UserGroupInformation.createRemoteUser(attemptId.toString());
+ AMRMTokenIdentifier token = new AMRMTokenIdentifier(attemptId, 1);
+ ugi.addTokenIdentifier(token);
+ return new ApplicationUserInfo(ugi, attemptId);
+ }
+
+ protected List<ResourceRequest> createResourceRequests(String[] hosts,
+ int memory, int vCores, int priority, int containers)
+ throws Exception {
+ return createResourceRequests(hosts, memory, vCores, priority,
+ containers, null);
+ }
+
+ protected List<ResourceRequest> createResourceRequests(String[] hosts,
+ int memory, int vCores, int priority, int containers,
+ String labelExpression) throws Exception {
+ List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
+ for (String host : hosts) {
+ ResourceRequest hostReq =
+ createResourceRequest(host, memory, vCores, priority,
+ containers, labelExpression);
+ reqs.add(hostReq);
+ ResourceRequest rackReq =
+ createResourceRequest("/default-rack", memory, vCores, priority,
+ containers, labelExpression);
+ reqs.add(rackReq);
+ }
+
+ ResourceRequest offRackReq =
+ createResourceRequest(ResourceRequest.ANY, memory, vCores,
+ priority, containers, labelExpression);
+ reqs.add(offRackReq);
+ return reqs;
+ }
+
+ protected ResourceRequest createResourceRequest(String resource,
+ int memory, int vCores, int priority, int containers)
+ throws Exception {
+ return createResourceRequest(resource, memory, vCores, priority,
+ containers, null);
+ }
+
+ protected ResourceRequest createResourceRequest(String resource,
+ int memory, int vCores, int priority, int containers,
+ String labelExpression) throws Exception {
+ ResourceRequest req = Records.newRecord(ResourceRequest.class);
+ req.setResourceName(resource);
+ req.setNumContainers(containers);
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(priority);
+ req.setPriority(pri);
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(memory);
+ capability.setVirtualCores(vCores);
+ req.setCapability(capability);
+ if (labelExpression != null) {
+ req.setNodeLabelExpression(labelExpression);
+ }
+ return req;
+ }
+
+ /**
+ * Returns an ApplicationId with the specified identifier
+ *
+ * @param testAppId
+ * @return
+ */
+ protected ApplicationId getApplicationId(int testAppId) {
+ return ApplicationId.newInstance(123456, testAppId);
+ }
+
+ /**
+ * Return an instance of ApplicationAttemptId using specified identifier. This
+ * identifier will be used for the ApplicationId too.
+ *
+ * @param testAppId
+ * @return
+ */
+ protected ApplicationAttemptId getApplicationAttemptId(int testAppId) {
+ return ApplicationAttemptId.newInstance(getApplicationId(testAppId),
+ testAppId);
+ }
+
+ /**
+ * Return an instance of ApplicationAttemptId using specified identifier and
+ * application id
+ *
+ * @param testAppId
+ * @return
+ */
+ protected ApplicationAttemptId getApplicationAttemptId(int testAppId,
+ ApplicationId appId) {
+ return ApplicationAttemptId.newInstance(appId, testAppId);
+ }
+
+ protected static class RegisterApplicationMasterResponseInfo<T> {
+ private RegisterApplicationMasterResponse response;
+ private T testContext;
+
+ RegisterApplicationMasterResponseInfo(
+ RegisterApplicationMasterResponse response, T testContext) {
+ this.response = response;
+ this.testContext = testContext;
+ }
+
+ public RegisterApplicationMasterResponse getResponse() {
+ return response;
+ }
+
+ public T getTestContext() {
+ return testContext;
+ }
+ }
+
+ protected static class FinishApplicationMasterResponseInfo<T> {
+ private FinishApplicationMasterResponse response;
+ private T testContext;
+
+ FinishApplicationMasterResponseInfo(
+ FinishApplicationMasterResponse response, T testContext) {
+ this.response = response;
+ this.testContext = testContext;
+ }
+
+ public FinishApplicationMasterResponse getResponse() {
+ return response;
+ }
+
+ public T getTestContext() {
+ return testContext;
+ }
+ }
+
+ protected static class ApplicationUserInfo {
+ private UserGroupInformation user;
+ private ApplicationAttemptId attemptId;
+
+ ApplicationUserInfo(UserGroupInformation user,
+ ApplicationAttemptId attemptId) {
+ this.user = user;
+ this.attemptId = attemptId;
+ }
+
+ public UserGroupInformation getUser() {
+ return this.user;
+ }
+
+ public ApplicationAttemptId getAppAttemptId() {
+ return this.attemptId;
+ }
+ }
+
+ protected static class MockAMRMProxyService extends AMRMProxyService {
+ public MockAMRMProxyService(Context nmContext,
+ AsyncDispatcher dispatcher) {
+ super(nmContext, dispatcher);
+ }
+
+ /**
+ * This method is used by the test code to initialize the pipeline. In the
+ * actual service, the initialization is called by the
+ * ContainerManagerImpl::StartContainers method
+ *
+ * @param applicationId
+ * @param user
+ */
+ public void initApp(ApplicationAttemptId applicationId, String user) {
+ super.initializePipeline(applicationId, user, null, null);
+ }
+
+ public void stopApp(ApplicationId applicationId) {
+ super.stopApplication(applicationId);
+ }
+ }
+
+ /**
+ * The Function interface is used for passing method pointers that can be
+ * invoked asynchronously at a later point.
+ */
+ protected interface Function<T, R> {
+ public R invoke(T input);
+ }
+
+ protected class NullContext implements Context {
+
+ @Override
+ public NodeId getNodeId() {
+ return null;
+ }
+
+ @Override
+ public int getHttpPort() {
+ return 0;
+ }
+
+ @Override
+ public ConcurrentMap<ApplicationId, Application> getApplications() {
+ return null;
+ }
+
+ @Override
+ public Map<ApplicationId, Credentials> getSystemCredentialsForApps() {
+ return null;
+ }
+
+ @Override
+ public ConcurrentMap<ContainerId, Container> getContainers() {
+ return null;
+ }
+
+ @Override
+ public NMContainerTokenSecretManager getContainerTokenSecretManager() {
+ return null;
+ }
+
+ @Override
+ public NMTokenSecretManagerInNM getNMTokenSecretManager() {
+ return null;
+ }
+
+ @Override
+ public NodeHealthStatus getNodeHealthStatus() {
+ return null;
+ }
+
+ @Override
+ public ContainerManagementProtocol getContainerManager() {
+ return null;
+ }
+
+ @Override
+ public LocalDirsHandlerService getLocalDirsHandler() {
+ return null;
+ }
+
+ @Override
+ public ApplicationACLsManager getApplicationACLsManager() {
+ return null;
+ }
+
+ @Override
+ public NMStateStoreService getNMStateStore() {
+ return null;
+ }
+
+ @Override
+ public boolean getDecommissioned() {
+ return false;
+ }
+
+ @Override
+ public void setDecommissioned(boolean isDecommissioned) {
+ }
+
+ @Override
+ public ConcurrentLinkedQueue<LogAggregationReport> getLogAggregationStatusForApps() {
+ return null;
+ }
+
+ @Override
+ public NodeResourceMonitor getNodeResourceMonitor() {
+ return null;
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
new file mode 100644
index 0000000..c962f97
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class MockRequestInterceptor extends AbstractRequestInterceptor {
+
+ private MockResourceManagerFacade mockRM;
+
+ public MockRequestInterceptor() {
+ }
+
+ public void init(AMRMProxyApplicationContext appContext) {
+ super.init(appContext);
+ mockRM =
+ new MockResourceManagerFacade(new YarnConfiguration(
+ super.getConf()), 0);
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return mockRM.registerApplicationMaster(request);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return mockRM.finishApplicationMaster(request);
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ return mockRM.allocate(request);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
new file mode 100644
index 0000000..7573a7a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
@@ -0,0 +1,469 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.directory.api.util.Strings;
+import org.apache.directory.api.util.exception.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.records.AMCommand;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
+import org.apache.hadoop.yarn.api.records.ContainerResourceIncrease;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.mortbay.log.Log;
+
+/**
+ * Mock Resource Manager facade implementation that exposes all the methods
+ * implemented by the YARN RM. The behavior and the values returned by this mock
+ * implementation is expected by the unit test cases. So please change the
+ * implementation with care.
+ */
+public class MockResourceManagerFacade implements
+ ApplicationMasterProtocol, ApplicationClientProtocol {
+
+ private HashMap<String, List<ContainerId>> applicationContainerIdMap =
+ new HashMap<String, List<ContainerId>>();
+ private HashMap<ContainerId, Container> allocatedContainerMap =
+ new HashMap<ContainerId, Container>();
+ private AtomicInteger containerIndex = new AtomicInteger(0);
+ private Configuration conf;
+
+ public MockResourceManagerFacade(Configuration conf,
+ int startContainerIndex) {
+ this.conf = conf;
+ this.containerIndex.set(startContainerIndex);
+ }
+
+ private static String getAppIdentifier() throws IOException {
+ AMRMTokenIdentifier result = null;
+ UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
+ Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
+ for (TokenIdentifier tokenId : tokenIds) {
+ if (tokenId instanceof AMRMTokenIdentifier) {
+ result = (AMRMTokenIdentifier) tokenId;
+ break;
+ }
+ }
+ return result != null ? result.getApplicationAttemptId().toString()
+ : "";
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ String amrmToken = getAppIdentifier();
+ Log.info("Registering application attempt: " + amrmToken);
+
+ synchronized (applicationContainerIdMap) {
+ Assert.assertFalse("The application id is already registered: "
+ + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
+ // Keep track of the containers that are returned to this application
+ applicationContainerIdMap.put(amrmToken,
+ new ArrayList<ContainerId>());
+ }
+
+ return RegisterApplicationMasterResponse.newInstance(null, null, null,
+ null, null, request.getHost(), null);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ String amrmToken = getAppIdentifier();
+ Log.info("Finishing application attempt: " + amrmToken);
+
+ synchronized (applicationContainerIdMap) {
+ // Remove the containers that were being tracked for this application
+ Assert.assertTrue("The application id is NOT registered: "
+ + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
+ List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken);
+ for (ContainerId c : ids) {
+ allocatedContainerMap.remove(c);
+ }
+ }
+
+ return FinishApplicationMasterResponse
+ .newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true
+ : false);
+ }
+
+ protected ApplicationId getApplicationId(int id) {
+ return ApplicationId.newInstance(12345, id);
+ }
+
+ protected ApplicationAttemptId getApplicationAttemptId(int id) {
+ return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ if (request.getAskList() != null && request.getAskList().size() > 0
+ && request.getReleaseList() != null
+ && request.getReleaseList().size() > 0) {
+ Assert.fail("The mock RM implementation does not support receiving "
+ + "askList and releaseList in the same heartbeat");
+ }
+
+ String amrmToken = getAppIdentifier();
+
+ ArrayList<Container> containerList = new ArrayList<Container>();
+ if (request.getAskList() != null) {
+ for (ResourceRequest rr : request.getAskList()) {
+ for (int i = 0; i < rr.getNumContainers(); i++) {
+ ContainerId containerId =
+ ContainerId.newInstance(getApplicationAttemptId(1),
+ containerIndex.incrementAndGet());
+ Container container = Records.newRecord(Container.class);
+ container.setId(containerId);
+ container.setPriority(rr.getPriority());
+
+ // We don't use the node for running containers in the test cases. So
+ // it is OK to hard code it to some dummy value
+ NodeId nodeId =
+ NodeId.newInstance(
+ !Strings.isEmpty(rr.getResourceName()) ? rr
+ .getResourceName() : "dummy", 1000);
+ container.setNodeId(nodeId);
+ container.setResource(rr.getCapability());
+ containerList.add(container);
+
+ synchronized (applicationContainerIdMap) {
+ // Keep track of the containers returned to this application. We
+ // will need it in future
+ Assert.assertTrue(
+ "The application id is Not registered before allocate(): "
+ + amrmToken,
+ applicationContainerIdMap.containsKey(amrmToken));
+ List<ContainerId> ids =
+ applicationContainerIdMap.get(amrmToken);
+ ids.add(containerId);
+ this.allocatedContainerMap.put(containerId, container);
+ }
+ }
+ }
+ }
+
+ if (request.getReleaseList() != null
+ && request.getReleaseList().size() > 0) {
+ Log.info("Releasing containers: " + request.getReleaseList().size());
+ synchronized (applicationContainerIdMap) {
+ Assert.assertTrue(
+ "The application id is not registered before allocate(): "
+ + amrmToken,
+ applicationContainerIdMap.containsKey(amrmToken));
+ List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
+
+ for (ContainerId id : request.getReleaseList()) {
+ boolean found = false;
+ for (ContainerId c : ids) {
+ if (c.equals(id)) {
+ found = true;
+ break;
+ }
+ }
+
+ Assert.assertTrue(
+ "ContainerId " + id
+ + " being released is not valid for application: "
+ + conf.get("AMRMTOKEN"), found);
+
+ ids.remove(id);
+
+ // Return the released container back to the AM with new fake Ids. The
+ // test case does not care about the IDs. The IDs are faked because
+ // otherwise the LRM will throw duplication identifier exception. This
+ // returning of fake containers is ONLY done for testing purpose - for
+ // the test code to get confirmation that the sub-cluster resource
+ // managers received the release request
+ ContainerId fakeContainerId =
+ ContainerId.newInstance(getApplicationAttemptId(1),
+ containerIndex.incrementAndGet());
+ Container fakeContainer = allocatedContainerMap.get(id);
+ fakeContainer.setId(fakeContainerId);
+ containerList.add(fakeContainer);
+ }
+ }
+ }
+
+ Log.info("Allocating containers: " + containerList.size()
+ + " for application attempt: " + conf.get("AMRMTOKEN"));
+ return AllocateResponse.newInstance(0,
+ new ArrayList<ContainerStatus>(), containerList,
+ new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
+ new ArrayList<NMToken>(),
+ new ArrayList<ContainerResourceIncrease>(),
+ new ArrayList<ContainerResourceDecrease>());
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException,
+ IOException {
+
+ GetApplicationReportResponse response =
+ Records.newRecord(GetApplicationReportResponse.class);
+ ApplicationReport report = Records.newRecord(ApplicationReport.class);
+ report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
+ report.setApplicationId(request.getApplicationId());
+ report.setCurrentApplicationAttemptId(ApplicationAttemptId
+ .newInstance(request.getApplicationId(), 1));
+ response.setApplicationReport(report);
+ return response;
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request) throws YarnException,
+ IOException {
+ GetApplicationAttemptReportResponse response =
+ Records.newRecord(GetApplicationAttemptReportResponse.class);
+ ApplicationAttemptReport report =
+ Records.newRecord(ApplicationAttemptReport.class);
+ report.setApplicationAttemptId(request.getApplicationAttemptId());
+ report
+ .setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED);
+ response.setApplicationAttemptReport(report);
+ return response;
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(
+ GetApplicationsRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(
+ GetClusterNodesRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException,
+ IOException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ return null;
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request) throws YarnException,
+ IOException {
+ return null;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
new file mode 100644
index 0000000..97a844e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/PassThroughRequestInterceptor.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Mock intercepter that does not do anything other than forwarding it to the
+ * next intercepter in the chain
+ *
+ */
+public class PassThroughRequestInterceptor extends
+ AbstractRequestInterceptor {
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return getNextInterceptor().registerApplicationMaster(request);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return getNextInterceptor().finishApplicationMaster(request);
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().allocate(request);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
new file mode 100644
index 0000000..69b913a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestAMRMProxyService.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestAMRMProxyService extends BaseAMRMProxyTest {
+
+ private static final Log LOG = LogFactory
+ .getLog(TestAMRMProxyService.class);
+
+ /**
+ * Test if the pipeline is created properly.
+ */
+ @Test
+ public void testRequestInterceptorChainCreation() throws Exception {
+ RequestInterceptor root =
+ super.getAMRMProxyService().createRequestInterceptorChain();
+ int index = 0;
+ while (root != null) {
+ switch (index) {
+ case 0:
+ case 1:
+ case 2:
+ Assert.assertEquals(PassThroughRequestInterceptor.class.getName(),
+ root.getClass().getName());
+ break;
+ case 3:
+ Assert.assertEquals(MockRequestInterceptor.class.getName(), root
+ .getClass().getName());
+ break;
+ }
+
+ root = root.getNextInterceptor();
+ index++;
+ }
+
+ Assert.assertEquals(
+ "The number of interceptors in chain does not match",
+ Integer.toString(4), Integer.toString(index));
+
+ }
+
+ /**
+ * Tests registration of a single application master.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRegisterOneApplicationMaster() throws Exception {
+ // The testAppId identifier is used as host name and the mock resource
+ // manager return it as the queue name. Assert that we received the queue
+ // name
+ int testAppId = 1;
+ RegisterApplicationMasterResponse response1 =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(response1);
+ Assert.assertEquals(Integer.toString(testAppId), response1.getQueue());
+ }
+
+ /**
+ * Tests the registration of multiple application master serially one at a
+ * time.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRegisterMulitpleApplicationMasters() throws Exception {
+ for (int testAppId = 0; testAppId < 3; testAppId++) {
+ RegisterApplicationMasterResponse response =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(response);
+ Assert
+ .assertEquals(Integer.toString(testAppId), response.getQueue());
+ }
+ }
+
+ /**
+ * Tests the registration of multiple application masters using multiple
+ * threads in parallel.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRegisterMulitpleApplicationMastersInParallel()
+ throws Exception {
+ int numberOfRequests = 5;
+ ArrayList<String> testContexts =
+ CreateTestRequestIdentifiers(numberOfRequests);
+ super.registerApplicationMastersInParallel(testContexts);
+ }
+
+ private ArrayList<String> CreateTestRequestIdentifiers(
+ int numberOfRequests) {
+ ArrayList<String> testContexts = new ArrayList<String>();
+ LOG.info("Creating " + numberOfRequests + " contexts for testing");
+ for (int ep = 0; ep < numberOfRequests; ep++) {
+ testContexts.add("test-endpoint-" + Integer.toString(ep));
+ LOG.info("Created test context: " + testContexts.get(ep));
+ }
+ return testContexts;
+ }
+
+ @Test
+ public void testFinishOneApplicationMasterWithSuccess() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ Assert.assertEquals(Integer.toString(testAppId),
+ registerResponse.getQueue());
+
+ FinishApplicationMasterResponse finshResponse =
+ finishApplicationMaster(testAppId,
+ FinalApplicationStatus.SUCCEEDED);
+
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(true, finshResponse.getIsUnregistered());
+ }
+
+ @Test
+ public void testFinishOneApplicationMasterWithFailure() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ Assert.assertEquals(Integer.toString(testAppId),
+ registerResponse.getQueue());
+
+ FinishApplicationMasterResponse finshResponse =
+ finishApplicationMaster(testAppId, FinalApplicationStatus.FAILED);
+
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(false, finshResponse.getIsUnregistered());
+
+ try {
+ // Try to finish an application master that is already finished.
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+ Assert
+ .fail("The request to finish application master should have failed");
+ } catch (Throwable ex) {
+ // This is expected. So nothing required here.
+ LOG.info("Finish registration failed as expected because it was not registered");
+ }
+ }
+
+ @Test
+ public void testFinishInvalidApplicationMaster() throws Exception {
+ try {
+ // Try to finish an application master that was not registered.
+ finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
+ Assert
+ .fail("The request to finish application master should have failed");
+ } catch (Throwable ex) {
+ // This is expected. So nothing required here.
+ LOG.info("Finish registration failed as expected because it was not registered");
+ }
+ }
+
+ @Test
+ public void testFinishMulitpleApplicationMasters() throws Exception {
+ int numberOfRequests = 3;
+ for (int index = 0; index < numberOfRequests; index++) {
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(index);
+ Assert.assertNotNull(registerResponse);
+ Assert.assertEquals(Integer.toString(index),
+ registerResponse.getQueue());
+ }
+
+ // Finish in reverse sequence
+ for (int index = numberOfRequests - 1; index >= 0; index--) {
+ FinishApplicationMasterResponse finshResponse =
+ finishApplicationMaster(index, FinalApplicationStatus.SUCCEEDED);
+
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(true, finshResponse.getIsUnregistered());
+
+ // Assert that the application has been removed from the collection
+ Assert.assertTrue(this.getAMRMProxyService()
+ .getPipelines().size() == index);
+ }
+
+ try {
+ // Try to finish an application master that is already finished.
+ finishApplicationMaster(1, FinalApplicationStatus.SUCCEEDED);
+ Assert
+ .fail("The request to finish application master should have failed");
+ } catch (Throwable ex) {
+ // This is expected. So nothing required here.
+ LOG.info("Finish registration failed as expected because it was not registered");
+ }
+
+ try {
+ // Try to finish an application master that was not registered.
+ finishApplicationMaster(4, FinalApplicationStatus.SUCCEEDED);
+ Assert
+ .fail("The request to finish application master should have failed");
+ } catch (Throwable ex) {
+ // This is expected. So nothing required here.
+ LOG.info("Finish registration failed as expected because it was not registered");
+ }
+ }
+
+ @Test
+ public void testFinishMulitpleApplicationMastersInParallel()
+ throws Exception {
+ int numberOfRequests = 5;
+ ArrayList<String> testContexts = new ArrayList<String>();
+ LOG.info("Creating " + numberOfRequests + " contexts for testing");
+ for (int i = 0; i < numberOfRequests; i++) {
+ testContexts.add("test-endpoint-" + Integer.toString(i));
+ LOG.info("Created test context: " + testContexts.get(i));
+
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(i);
+ Assert.assertNotNull(registerResponse);
+ Assert
+ .assertEquals(Integer.toString(i), registerResponse.getQueue());
+ }
+
+ finishApplicationMastersInParallel(testContexts);
+ }
+
+ @Test
+ public void testAllocateRequestWithNullValues() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ Assert.assertEquals(Integer.toString(testAppId),
+ registerResponse.getQueue());
+
+ AllocateResponse allocateResponse = allocate(testAppId);
+ Assert.assertNotNull(allocateResponse);
+
+ FinishApplicationMasterResponse finshResponse =
+ finishApplicationMaster(testAppId,
+ FinalApplicationStatus.SUCCEEDED);
+
+ Assert.assertNotNull(finshResponse);
+ Assert.assertEquals(true, finshResponse.getIsUnregistered());
+ }
+
+ @Test
+ public void testAllocateRequestWithoutRegistering() throws Exception {
+
+ try {
+ // Try to allocate an application master without registering.
+ allocate(1);
+ Assert
+ .fail("The request to allocate application master should have failed");
+ } catch (Throwable ex) {
+ // This is expected. So nothing required here.
+ LOG.info("AllocateRequest failed as expected because AM was not registered");
+ }
+ }
+
+ @Test
+ public void testAllocateWithOneResourceRequest() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ getContainersAndAssert(testAppId, 1);
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+ }
+
+ @Test
+ public void testAllocateWithMultipleResourceRequest() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ getContainersAndAssert(testAppId, 10);
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+ }
+
+ @Test
+ public void testAllocateAndReleaseContainers() throws Exception {
+ int testAppId = 1;
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ List<Container> containers = getContainersAndAssert(testAppId, 10);
+ releaseContainersAndAssert(testAppId, containers);
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+ }
+
+ @Test
+ public void testAllocateAndReleaseContainersForMultipleAM()
+ throws Exception {
+ int numberOfApps = 5;
+ for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull(registerResponse);
+ List<Container> containers = getContainersAndAssert(testAppId, 10);
+ releaseContainersAndAssert(testAppId, containers);
+ }
+ for (int testAppId = 0; testAppId < numberOfApps; testAppId++) {
+ finishApplicationMaster(testAppId, FinalApplicationStatus.SUCCEEDED);
+ }
+ }
+
+ @Test
+ public void testAllocateAndReleaseContainersForMultipleAMInParallel()
+ throws Exception {
+ int numberOfApps = 6;
+ ArrayList<Integer> tempAppIds = new ArrayList<Integer>();
+ for (int i = 0; i < numberOfApps; i++) {
+ tempAppIds.add(new Integer(i));
+ }
+
+ final ArrayList<Integer> appIds = tempAppIds;
+ List<Integer> responses =
+ runInParallel(appIds, new Function<Integer, Integer>() {
+ @Override
+ public Integer invoke(Integer testAppId) {
+ try {
+ RegisterApplicationMasterResponse registerResponse =
+ registerApplicationMaster(testAppId);
+ Assert.assertNotNull("response is null", registerResponse);
+ List<Container> containers =
+ getContainersAndAssert(testAppId, 10);
+ releaseContainersAndAssert(testAppId, containers);
+
+ LOG.info("Sucessfully registered application master with appId: "
+ + testAppId);
+ } catch (Throwable ex) {
+ LOG.error(
+ "Failed to register application master with appId: "
+ + testAppId, ex);
+ testAppId = null;
+ }
+
+ return testAppId;
+ }
+ });
+
+ Assert.assertEquals(
+ "Number of responses received does not match with request",
+ appIds.size(), responses.size());
+
+ for (Integer testAppId : responses) {
+ Assert.assertNotNull(testAppId);
+ finishApplicationMaster(testAppId.intValue(),
+ FinalApplicationStatus.SUCCEEDED);
+ }
+ }
+
+ private List<Container> getContainersAndAssert(int appId,
+ int numberOfResourceRequests) throws Exception {
+ AllocateRequest allocateRequest =
+ Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(1);
+
+ List<Container> containers =
+ new ArrayList<Container>(numberOfResourceRequests);
+ List<ResourceRequest> askList =
+ new ArrayList<ResourceRequest>(numberOfResourceRequests);
+ for (int testAppId = 0; testAppId < numberOfResourceRequests; testAppId++) {
+ askList.add(createResourceRequest(
+ "test-node-" + Integer.toString(testAppId), 6000, 2,
+ testAppId % 5, 1));
+ }
+
+ allocateRequest.setAskList(askList);
+
+ AllocateResponse allocateResponse = allocate(appId, allocateRequest);
+ Assert.assertNotNull("allocate() returned null response",
+ allocateResponse);
+
+ containers.addAll(allocateResponse.getAllocatedContainers());
+
+ // Send max 10 heart beats to receive all the containers. If not, we will
+ // fail the test
+ int numHeartbeat = 0;
+ while (containers.size() < askList.size() && numHeartbeat++ < 10) {
+ allocateResponse =
+ allocate(appId, Records.newRecord(AllocateRequest.class));
+ Assert.assertNotNull("allocate() returned null response",
+ allocateResponse);
+
+ containers.addAll(allocateResponse.getAllocatedContainers());
+
+ LOG.info("Number of allocated containers in this request: "
+ + Integer.toString(allocateResponse.getAllocatedContainers()
+ .size()));
+ LOG.info("Total number of allocated containers: "
+ + Integer.toString(containers.size()));
+ Thread.sleep(10);
+ }
+
+ // We broadcast the request, the number of containers we received will be
+ // higher than we ask
+ Assert.assertTrue("The asklist count is not same as response",
+ askList.size() <= containers.size());
+ return containers;
+ }
+
+ private void releaseContainersAndAssert(int appId,
+ List<Container> containers) throws Exception {
+ Assert.assertTrue(containers.size() > 0);
+ AllocateRequest allocateRequest =
+ Records.newRecord(AllocateRequest.class);
+ allocateRequest.setResponseId(1);
+
+ List<ContainerId> relList =
+ new ArrayList<ContainerId>(containers.size());
+ for (Container container : containers) {
+ relList.add(container.getId());
+ }
+
+ allocateRequest.setReleaseList(relList);
+
+ AllocateResponse allocateResponse = allocate(appId, allocateRequest);
+ Assert.assertNotNull(allocateResponse);
+
+ // The way the mock resource manager is setup, it will return the containers
+ // that were released in the response. This is done because the UAMs run
+ // asynchronously and we need to if all the resource managers received the
+ // release it. The containers sent by the mock resource managers will be
+ // aggregated and returned back to us and we can assert if all the release
+ // lists reached the sub-clusters
+ List<Container> containersForReleasedContainerIds =
+ new ArrayList<Container>();
+ containersForReleasedContainerIds.addAll(allocateResponse
+ .getAllocatedContainers());
+
+ // Send max 10 heart beats to receive all the containers. If not, we will
+ // fail the test
+ int numHeartbeat = 0;
+ while (containersForReleasedContainerIds.size() < relList.size()
+ && numHeartbeat++ < 10) {
+ allocateResponse =
+ allocate(appId, Records.newRecord(AllocateRequest.class));
+ Assert.assertNotNull(allocateResponse);
+ containersForReleasedContainerIds.addAll(allocateResponse
+ .getAllocatedContainers());
+
+ LOG.info("Number of containers received in this request: "
+ + Integer.toString(allocateResponse.getAllocatedContainers()
+ .size()));
+ LOG.info("Total number of containers received: "
+ + Integer.toString(containersForReleasedContainerIds.size()));
+ Thread.sleep(10);
+ }
+
+ Assert.assertEquals(relList.size(),
+ containersForReleasedContainerIds.size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f72f1e6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index c8b985d..14142de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -40,9 +40,7 @@ import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@@ -76,7 +74,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@@ -95,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@@ -175,69 +173,13 @@ public class ApplicationMasterService extends AbstractService implements
return this.masterServiceAddress;
}
- // Obtain the needed AMRMTokenIdentifier from the remote-UGI. RPC layer
- // currently sets only the required id, but iterate through anyways just to be
- // sure.
- private AMRMTokenIdentifier selectAMRMTokenIdentifier(
- UserGroupInformation remoteUgi) throws IOException {
- AMRMTokenIdentifier result = null;
- Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
- for (TokenIdentifier tokenId : tokenIds) {
- if (tokenId instanceof AMRMTokenIdentifier) {
- result = (AMRMTokenIdentifier) tokenId;
- break;
- }
- }
-
- return result;
- }
-
- private AMRMTokenIdentifier authorizeRequest()
- throws YarnException {
-
- UserGroupInformation remoteUgi;
- try {
- remoteUgi = UserGroupInformation.getCurrentUser();
- } catch (IOException e) {
- String msg =
- "Cannot obtain the user-name for authorizing ApplicationMaster. "
- + "Got exception: " + StringUtils.stringifyException(e);
- LOG.warn(msg);
- throw RPCUtil.getRemoteException(msg);
- }
-
- boolean tokenFound = false;
- String message = "";
- AMRMTokenIdentifier appTokenIdentifier = null;
- try {
- appTokenIdentifier = selectAMRMTokenIdentifier(remoteUgi);
- if (appTokenIdentifier == null) {
- tokenFound = false;
- message = "No AMRMToken found for user " + remoteUgi.getUserName();
- } else {
- tokenFound = true;
- }
- } catch (IOException e) {
- tokenFound = false;
- message =
- "Got exception while looking for AMRMToken for user "
- + remoteUgi.getUserName();
- }
-
- if (!tokenFound) {
- LOG.warn(message);
- throw RPCUtil.getRemoteException(message);
- }
-
- return appTokenIdentifier;
- }
-
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
- AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+ AMRMTokenIdentifier amrmTokenIdentifier =
+ YarnServerSecurityUtils.authorizeRequest();
ApplicationAttemptId applicationAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
@@ -346,7 +288,7 @@ public class ApplicationMasterService extends AbstractService implements
IOException {
ApplicationAttemptId applicationAttemptId =
- authorizeRequest().getApplicationAttemptId();
+ YarnServerSecurityUtils.authorizeRequest().getApplicationAttemptId();
ApplicationId appId = applicationAttemptId.getApplicationId();
RMApp rmApp =
@@ -430,7 +372,8 @@ public class ApplicationMasterService extends AbstractService implements
public AllocateResponse allocate(AllocateRequest request)
throws YarnException, IOException {
- AMRMTokenIdentifier amrmTokenIdentifier = authorizeRequest();
+ AMRMTokenIdentifier amrmTokenIdentifier =
+ YarnServerSecurityUtils.authorizeRequest();
ApplicationAttemptId appAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();