You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/05/27 00:11:57 UTC
[40/50] [abbrv] hadoop git commit: YARN-5411. Create a proxy chain
for ApplicationClientProtocol in the Router. (Giovanni Matteo Fumarola via
Subru).
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0125c1a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
new file mode 100644
index 0000000..12b933b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Extends the AbstractRequestInterceptorClient class and provides an
+ * implementation that simply forwards the client requests to the cluster
+ * resource manager.
+ *
+ */
+public class DefaultClientRequestInterceptor
+ extends AbstractClientRequestInterceptor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DefaultClientRequestInterceptor.class);
+ private ApplicationClientProtocol clientRMProxy;
+ private UserGroupInformation user = null;
+
+ @Override
+ public void init(String userName) {
+ super.init(userName);
+ try {
+ // Do not create a proxy user if user name matches the user name on
+ // current UGI
+ if (userName.equalsIgnoreCase(
+ UserGroupInformation.getCurrentUser().getUserName())) {
+ user = UserGroupInformation.getCurrentUser();
+ } else {
+ user = UserGroupInformation.createProxyUser(userName,
+ UserGroupInformation.getCurrentUser());
+ }
+
+ final Configuration conf = this.getConf();
+
+ clientRMProxy =
+ user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
+ @Override
+ public ApplicationClientProtocol run() throws Exception {
+ return ClientRMProxy.createRMProxy(conf,
+ ApplicationClientProtocol.class);
+ }
+ });
+ } catch (IOException e) {
+ String message = "Error while creating Router ClientRM Service for user:";
+ if (user != null) {
+ message += ", user: " + user;
+ }
+
+ LOG.info(message);
+ throw new YarnRuntimeException(message, e);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setNextInterceptor(ClientRequestInterceptor next) {
+ throw new YarnRuntimeException(
+ "setNextInterceptor is being called on DefaultRequestInterceptor,"
+ + "which should be the last one in the chain "
+ + "Check if the interceptor pipeline configuration is correct");
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ return clientRMProxy.getNewApplication(request);
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ return clientRMProxy.submitApplication(request);
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ return clientRMProxy.forceKillApplication(request);
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ return clientRMProxy.getClusterMetrics(request);
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnException, IOException {
+ return clientRMProxy.getClusterNodes(request);
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ return clientRMProxy.getQueueInfo(request);
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+ return clientRMProxy.getQueueUserAcls(request);
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request)
+ throws YarnException, IOException {
+ return clientRMProxy.moveApplicationAcrossQueues(request);
+ }
+
+ @Override
+ public GetNewReservationResponse getNewReservation(
+ GetNewReservationRequest request) throws YarnException, IOException {
+ return clientRMProxy.getNewReservation(request);
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ return clientRMProxy.submitReservation(request);
+ }
+
+ @Override
+ public ReservationListResponse listReservations(
+ ReservationListRequest request) throws YarnException, IOException {
+ return clientRMProxy.listReservations(request);
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ return clientRMProxy.updateReservation(request);
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ return clientRMProxy.deleteReservation(request);
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ return clientRMProxy.getNodeToLabels(request);
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ return clientRMProxy.getLabelsToNodes(request);
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+ return clientRMProxy.getClusterNodeLabels(request);
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+ return clientRMProxy.getApplicationReport(request);
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+ throws YarnException, IOException {
+ return clientRMProxy.getApplications(request);
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request)
+ throws YarnException, IOException {
+ return clientRMProxy.getApplicationAttemptReport(request);
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException, IOException {
+ return clientRMProxy.getApplicationAttempts(request);
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ return clientRMProxy.getContainerReport(request);
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ return clientRMProxy.getContainers(request);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ return clientRMProxy.getDelegationToken(request);
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ return clientRMProxy.renewDelegationToken(request);
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ return clientRMProxy.cancelDelegationToken(request);
+ }
+
+ @Override
+ public FailApplicationAttemptResponse failApplicationAttempt(
+ FailApplicationAttemptRequest request) throws YarnException, IOException {
+ return clientRMProxy.failApplicationAttempt(request);
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request)
+ throws YarnException, IOException {
+ return clientRMProxy.updateApplicationPriority(request);
+ }
+
+ @Override
+ public SignalContainerResponse signalToContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ return clientRMProxy.signalToContainer(request);
+ }
+
+ @Override
+ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+ UpdateApplicationTimeoutsRequest request)
+ throws YarnException, IOException {
+ return clientRMProxy.updateApplicationTimeouts(request);
+ }
+
+ @VisibleForTesting
+ public void setRMClient(ApplicationClientProtocol clientRM) {
+ this.clientRMProxy = clientRM;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0125c1a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
new file mode 100644
index 0000000..00016dd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
@@ -0,0 +1,544 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * RouterClientRMService is a service that runs on each router that can be used
+ * to intercept and inspect ApplicationClientProtocol messages from client to
+ * the cluster resource manager. It listens ApplicationClientProtocol messages
+ * from the client and creates a request intercepting pipeline instance for each
+ * client. The pipeline is a chain of intercepter instances that can inspect and
+ * modify the request/response as needed. The main difference with
+ * AMRMProxyService is the protocol they implement.
+ */
+public class RouterClientRMService extends AbstractService
+ implements ApplicationClientProtocol {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterClientRMService.class);
+
+ private Server server;
+ private InetSocketAddress listenerEndpoint;
+
+ // For each user we store an interceptors' pipeline.
+ // For performance issue we use LRU cache to keep in memory the newest ones
+ // and remove the oldest used ones.
+ private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
+
+ public RouterClientRMService() {
+ super(RouterClientRMService.class.getName());
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ LOG.info("Starting Router ClientRMService");
+ Configuration conf = getConfig();
+ YarnRPC rpc = YarnRPC.create(conf);
+ UserGroupInformation.setConfiguration(conf);
+
+ this.listenerEndpoint =
+ conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
+ YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
+ YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);
+
+ int maxCacheSize =
+ conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
+ YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE);
+ this.userPipelineMap = Collections.synchronizedMap(
+ new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
+ maxCacheSize, true));
+
+ Configuration serverConf = new Configuration(conf);
+
+ int numWorkerThreads =
+ serverConf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT);
+
+ this.server = rpc.getServer(ApplicationClientProtocol.class, this,
+ listenerEndpoint, serverConf, null, numWorkerThreads);
+
+ this.server.start();
+ LOG.info("Router ClientRMService listening on address: "
+ + this.server.getListenerAddress());
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ LOG.info("Stopping Router ClientRMService");
+ if (this.server != null) {
+ this.server.stop();
+ }
+ userPipelineMap.clear();
+ super.serviceStop();
+ }
+
+ /**
+ * Returns the comma separated intercepter class names from the configuration.
+ *
+ * @param conf
+ * @return the intercepter class names as an instance of ArrayList
+ */
+ private List<String> getInterceptorClassNames(Configuration conf) {
+ String configuredInterceptorClassNames =
+ conf.get(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+ YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS);
+
+ List<String> interceptorClassNames = new ArrayList<String>();
+ Collection<String> tempList =
+ StringUtils.getStringCollection(configuredInterceptorClassNames);
+ for (String item : tempList) {
+ interceptorClassNames.add(item.trim());
+ }
+
+ return interceptorClassNames;
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getNewApplication(request);
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().submitApplication(request);
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().forceKillApplication(request);
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getClusterMetrics(request);
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getClusterNodes(request);
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getQueueInfo(request);
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getQueueUserAcls(request);
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().moveApplicationAcrossQueues(request);
+ }
+
+ @Override
+ public GetNewReservationResponse getNewReservation(
+ GetNewReservationRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getNewReservation(request);
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().submitReservation(request);
+ }
+
+ @Override
+ public ReservationListResponse listReservations(
+ ReservationListRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().listReservations(request);
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().updateReservation(request);
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().deleteReservation(request);
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getNodeToLabels(request);
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getLabelsToNodes(request);
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getClusterNodeLabels(request);
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getApplicationReport(request);
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getApplications(request);
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getApplicationAttemptReport(request);
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getApplicationAttempts(request);
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getContainerReport(request);
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getContainers(request);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().getDelegationToken(request);
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().renewDelegationToken(request);
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().cancelDelegationToken(request);
+ }
+
+ @Override
+ public FailApplicationAttemptResponse failApplicationAttempt(
+ FailApplicationAttemptRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().failApplicationAttempt(request);
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().updateApplicationPriority(request);
+ }
+
+ @Override
+ public SignalContainerResponse signalToContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().signalToContainer(request);
+ }
+
+ @Override
+ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+ UpdateApplicationTimeoutsRequest request)
+ throws YarnException, IOException {
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+ return pipeline.getRootInterceptor().updateApplicationTimeouts(request);
+ }
+
+ private RequestInterceptorChainWrapper getInterceptorChain()
+ throws IOException {
+ String user = UserGroupInformation.getCurrentUser().getUserName();
+ if (!userPipelineMap.containsKey(user)) {
+ initializePipeline(user);
+ }
+ return userPipelineMap.get(user);
+ }
+
+ /**
+ * Gets the Request intercepter chains for all the users.
+ *
+ * @return the request intercepter chains.
+ */
+ @VisibleForTesting
+ protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
+ return this.userPipelineMap;
+ }
+
+ /**
+ * This method creates and returns reference of the first intercepter in the
+ * chain of request intercepter instances.
+ *
+ * @return the reference of the first intercepter in the chain
+ */
+ @VisibleForTesting
+ protected ClientRequestInterceptor createRequestInterceptorChain() {
+ Configuration conf = getConfig();
+
+ List<String> interceptorClassNames = getInterceptorClassNames(conf);
+
+ ClientRequestInterceptor pipeline = null;
+ ClientRequestInterceptor current = null;
+ for (String interceptorClassName : interceptorClassNames) {
+ try {
+ Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
+ if (ClientRequestInterceptor.class.isAssignableFrom(interceptorClass)) {
+ ClientRequestInterceptor interceptorInstance =
+ (ClientRequestInterceptor) ReflectionUtils
+ .newInstance(interceptorClass, conf);
+ if (pipeline == null) {
+ pipeline = interceptorInstance;
+ current = interceptorInstance;
+ continue;
+ } else {
+ current.setNextInterceptor(interceptorInstance);
+ current = interceptorInstance;
+ }
+ } else {
+ throw new YarnRuntimeException(
+ "Class: " + interceptorClassName + " not instance of "
+ + ClientRequestInterceptor.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate ApplicationClientRequestInterceptor: "
+ + interceptorClassName,
+ e);
+ }
+ }
+
+ if (pipeline == null) {
+ throw new YarnRuntimeException(
+ "RequestInterceptor pipeline is not configured in the system");
+ }
+ return pipeline;
+ }
+
+ /**
+ * Initializes the request intercepter pipeline for the specified application.
+ *
+ * @param user
+ */
+ private void initializePipeline(String user) {
+ RequestInterceptorChainWrapper chainWrapper = null;
+ synchronized (this.userPipelineMap) {
+ if (this.userPipelineMap.containsKey(user)) {
+ LOG.info("Request to start an already existing user: {}"
+ + " was received, so ignoring.", user);
+ return;
+ }
+
+ chainWrapper = new RequestInterceptorChainWrapper();
+ this.userPipelineMap.put(user, chainWrapper);
+ }
+
+ // We register the pipeline instance in the map first and then initialize it
+ // later because chain initialization can be expensive and we would like to
+ // release the lock as soon as possible to prevent other applications from
+ // blocking when one application's chain is initializing
+ LOG.info("Initializing request processing pipeline for application "
+ + "for the user: {}", user);
+
+ try {
+ ClientRequestInterceptor interceptorChain =
+ this.createRequestInterceptorChain();
+ interceptorChain.init(user);
+ chainWrapper.init(interceptorChain);
+ } catch (Exception e) {
+ synchronized (this.userPipelineMap) {
+ this.userPipelineMap.remove(user);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Private structure for encapsulating RequestInterceptor and user instances.
+ *
+ */
+ @Private
+ public static class RequestInterceptorChainWrapper {
+ private ClientRequestInterceptor rootInterceptor;
+
+ /**
+ * Initializes the wrapper with the specified parameters.
+ *
+ * @param interceptor the first interceptor in the pipeline
+ */
+ public synchronized void init(ClientRequestInterceptor interceptor) {
+ this.rootInterceptor = interceptor;
+ }
+
+ /**
+ * Gets the root request intercepter.
+ *
+ * @return the root request intercepter
+ */
+ public synchronized ClientRequestInterceptor getRootInterceptor() {
+ return rootInterceptor;
+ }
+
+ /**
+ * Shutdown the chain of interceptors when the object is destroyed.
+ */
+ @Override
+ protected void finalize() {
+ rootInterceptor.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0125c1a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java
new file mode 100644
index 0000000..7d1dadd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Router ClientRM Proxy Service package. **/
+package org.apache.hadoop.yarn.server.router.clientrm;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0125c1a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java
deleted file mode 100644
index a31d6b9..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouter.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.router;
-
-/**
- * Test class for YARN Router.
- */
-public class TestRouter {
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0125c1a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
new file mode 100644
index 0000000..a283a62
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
@@ -0,0 +1,574 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ReservationDefinition;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ReservationRequest;
+import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
+import org.apache.hadoop.yarn.api.records.ReservationRequests;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.UTCClock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+/**
+ * Base class for all the RouterClientRMService test cases. It provides utility
+ * methods that can be used by the concrete test case classes.
+ *
+ */
+public abstract class BaseRouterClientRMTest {
+
+ /**
+ * The RouterClientRMService instance that will be used by all the test cases.
+ */
+ private MockRouterClientRMService clientrmService;
+ /**
+ * Thread pool used for asynchronous operations.
+ */
+ private static ExecutorService threadpool = Executors.newCachedThreadPool();
+ private Configuration conf;
+ private AsyncDispatcher dispatcher;
+
+ public final static int TEST_MAX_CACHE_SIZE = 10;
+
+ protected MockRouterClientRMService getRouterClientRMService() {
+ Assert.assertNotNull(this.clientrmService);
+ return this.clientrmService;
+ }
+
+ @Before
+ public void setUp() {
+ this.conf = new YarnConfiguration();
+ String mockPassThroughInterceptorClass =
+ PassThroughClientRequestInterceptor.class.getName();
+
+ // Create a request intercepter pipeline for testing. The last one in the
+ // chain will call the mock resource manager. The others in the chain will
+ // simply forward it to the next one in the chain
+ this.conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+ mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ + "," + mockPassThroughInterceptorClass + ","
+ + MockClientRequestInterceptor.class.getName());
+
+ this.conf.setInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
+ TEST_MAX_CACHE_SIZE);
+
+ this.dispatcher = new AsyncDispatcher();
+ this.dispatcher.init(conf);
+ this.dispatcher.start();
+ this.clientrmService = createAndStartRouterClientRMService();
+ }
+
+ @After
+ public void tearDown() {
+ if (clientrmService != null) {
+ clientrmService.stop();
+ clientrmService = null;
+ }
+ if (this.dispatcher != null) {
+ this.dispatcher.stop();
+ }
+ }
+
+ protected ExecutorService getThreadPool() {
+ return threadpool;
+ }
+
+ protected MockRouterClientRMService createAndStartRouterClientRMService() {
+ MockRouterClientRMService svc = new MockRouterClientRMService();
+ svc.init(conf);
+ svc.start();
+ return svc;
+ }
+
+ protected static class MockRouterClientRMService
+ extends RouterClientRMService {
+ public MockRouterClientRMService() {
+ super();
+ }
+ }
+
+ protected GetNewApplicationResponse getNewApplication(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetNewApplicationResponse>() {
+ @Override
+ public GetNewApplicationResponse run() throws Exception {
+ GetNewApplicationRequest req =
+ GetNewApplicationRequest.newInstance();
+ GetNewApplicationResponse response =
+ getRouterClientRMService().getNewApplication(req);
+ return response;
+ }
+ });
+ }
+
+ protected SubmitApplicationResponse submitApplication(
+ final ApplicationId appId, String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<SubmitApplicationResponse>() {
+ @Override
+ public SubmitApplicationResponse run() throws Exception {
+ ApplicationSubmissionContext context =
+ ApplicationSubmissionContext.newInstance(appId, "", "", null,
+ null, false, false, -1, null, null);
+ SubmitApplicationRequest req =
+ SubmitApplicationRequest.newInstance(context);
+ SubmitApplicationResponse response =
+ getRouterClientRMService().submitApplication(req);
+ return response;
+ }
+ });
+ }
+
+ protected KillApplicationResponse forceKillApplication(
+ final ApplicationId appId, String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<KillApplicationResponse>() {
+ @Override
+ public KillApplicationResponse run() throws Exception {
+ KillApplicationRequest req =
+ KillApplicationRequest.newInstance(appId);
+ KillApplicationResponse response =
+ getRouterClientRMService().forceKillApplication(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetClusterMetricsResponse getClusterMetrics(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetClusterMetricsResponse>() {
+ @Override
+ public GetClusterMetricsResponse run() throws Exception {
+ GetClusterMetricsRequest req =
+ GetClusterMetricsRequest.newInstance();
+ GetClusterMetricsResponse response =
+ getRouterClientRMService().getClusterMetrics(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetClusterNodesResponse getClusterNodes(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetClusterNodesResponse>() {
+ @Override
+ public GetClusterNodesResponse run() throws Exception {
+ GetClusterNodesRequest req = GetClusterNodesRequest.newInstance();
+ GetClusterNodesResponse response =
+ getRouterClientRMService().getClusterNodes(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetQueueInfoResponse getQueueInfo(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetQueueInfoResponse>() {
+ @Override
+ public GetQueueInfoResponse run() throws Exception {
+ GetQueueInfoRequest req =
+ GetQueueInfoRequest.newInstance("default", false, false, false);
+ GetQueueInfoResponse response =
+ getRouterClientRMService().getQueueInfo(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetQueueUserAclsInfoResponse getQueueUserAcls(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetQueueUserAclsInfoResponse>() {
+ @Override
+ public GetQueueUserAclsInfoResponse run() throws Exception {
+ GetQueueUserAclsInfoRequest req =
+ GetQueueUserAclsInfoRequest.newInstance();
+ GetQueueUserAclsInfoResponse response =
+ getRouterClientRMService().getQueueUserAcls(req);
+ return response;
+ }
+ });
+ }
+
+ protected MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ String user, final ApplicationId appId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user).doAs(
+ new PrivilegedExceptionAction<MoveApplicationAcrossQueuesResponse>() {
+ @Override
+ public MoveApplicationAcrossQueuesResponse run() throws Exception {
+
+ MoveApplicationAcrossQueuesRequest req =
+ MoveApplicationAcrossQueuesRequest.newInstance(appId,
+ "newQueue");
+ MoveApplicationAcrossQueuesResponse response =
+ getRouterClientRMService().moveApplicationAcrossQueues(req);
+ return response;
+ }
+ });
+ }
+
+ public GetNewReservationResponse getNewReservation(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetNewReservationResponse>() {
+ @Override
+ public GetNewReservationResponse run() throws Exception {
+ GetNewReservationResponse response = getRouterClientRMService()
+ .getNewReservation(GetNewReservationRequest.newInstance());
+ return response;
+ }
+ });
+ }
+
+ protected ReservationSubmissionResponse submitReservation(String user,
+ final ReservationId reservationId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<ReservationSubmissionResponse>() {
+ @Override
+ public ReservationSubmissionResponse run() throws Exception {
+ Clock clock = new UTCClock();
+ long arrival = clock.getTime();
+ long duration = 60000;
+ long deadline = (long) (arrival + 1.05 * duration);
+
+ ReservationSubmissionRequest req = createSimpleReservationRequest(1,
+ arrival, deadline, duration, reservationId);
+ ReservationSubmissionResponse response =
+ getRouterClientRMService().submitReservation(req);
+ return response;
+ }
+ });
+ }
+
+ protected ReservationUpdateResponse updateReservation(String user,
+ final ReservationId reservationId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<ReservationUpdateResponse>() {
+ @Override
+ public ReservationUpdateResponse run() throws Exception {
+ Clock clock = new UTCClock();
+ long arrival = clock.getTime();
+ long duration = 60000;
+ long deadline = (long) (arrival + 1.05 * duration);
+ ReservationDefinition rDef =
+ createSimpleReservationRequest(1, arrival, deadline, duration,
+ reservationId).getReservationDefinition();
+
+ ReservationUpdateRequest req =
+ ReservationUpdateRequest.newInstance(rDef, reservationId);
+ ReservationUpdateResponse response =
+ getRouterClientRMService().updateReservation(req);
+ return response;
+ }
+ });
+ }
+
+ protected ReservationDeleteResponse deleteReservation(String user,
+ final ReservationId reservationId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<ReservationDeleteResponse>() {
+ @Override
+ public ReservationDeleteResponse run() throws Exception {
+ ReservationDeleteRequest req =
+ ReservationDeleteRequest.newInstance(reservationId);
+ ReservationDeleteResponse response =
+ getRouterClientRMService().deleteReservation(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetNodesToLabelsResponse getNodeToLabels(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetNodesToLabelsResponse>() {
+ @Override
+ public GetNodesToLabelsResponse run() throws Exception {
+ GetNodesToLabelsRequest req = GetNodesToLabelsRequest.newInstance();
+ GetNodesToLabelsResponse response =
+ getRouterClientRMService().getNodeToLabels(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetLabelsToNodesResponse getLabelsToNodes(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetLabelsToNodesResponse>() {
+ @Override
+ public GetLabelsToNodesResponse run() throws Exception {
+ GetLabelsToNodesRequest req = GetLabelsToNodesRequest.newInstance();
+ GetLabelsToNodesResponse response =
+ getRouterClientRMService().getLabelsToNodes(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetClusterNodeLabelsResponse getClusterNodeLabels(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetClusterNodeLabelsResponse>() {
+ @Override
+ public GetClusterNodeLabelsResponse run() throws Exception {
+ GetClusterNodeLabelsRequest req =
+ GetClusterNodeLabelsRequest.newInstance();
+ GetClusterNodeLabelsResponse response =
+ getRouterClientRMService().getClusterNodeLabels(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetApplicationReportResponse getApplicationReport(String user,
+ final ApplicationId appId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetApplicationReportResponse>() {
+ @Override
+ public GetApplicationReportResponse run() throws Exception {
+ GetApplicationReportRequest req =
+ GetApplicationReportRequest.newInstance(appId);
+ GetApplicationReportResponse response =
+ getRouterClientRMService().getApplicationReport(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetApplicationsResponse getApplications(String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetApplicationsResponse>() {
+ @Override
+ public GetApplicationsResponse run() throws Exception {
+ GetApplicationsRequest req = GetApplicationsRequest.newInstance();
+ GetApplicationsResponse response =
+ getRouterClientRMService().getApplications(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ String user, final ApplicationAttemptId appAttemptId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user).doAs(
+ new PrivilegedExceptionAction<GetApplicationAttemptReportResponse>() {
+ @Override
+ public GetApplicationAttemptReportResponse run() throws Exception {
+ GetApplicationAttemptReportRequest req =
+ GetApplicationAttemptReportRequest.newInstance(appAttemptId);
+ GetApplicationAttemptReportResponse response =
+ getRouterClientRMService().getApplicationAttemptReport(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetApplicationAttemptsResponse getApplicationAttempts(String user,
+ final ApplicationId applicationId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetApplicationAttemptsResponse>() {
+ @Override
+ public GetApplicationAttemptsResponse run() throws Exception {
+ GetApplicationAttemptsRequest req =
+ GetApplicationAttemptsRequest.newInstance(applicationId);
+ GetApplicationAttemptsResponse response =
+ getRouterClientRMService().getApplicationAttempts(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetContainerReportResponse getContainerReport(String user,
+ final ContainerId containerId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetContainerReportResponse>() {
+ @Override
+ public GetContainerReportResponse run() throws Exception {
+ GetContainerReportRequest req =
+ GetContainerReportRequest.newInstance(containerId);
+ GetContainerReportResponse response =
+ getRouterClientRMService().getContainerReport(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetContainersResponse getContainers(String user,
+ final ApplicationAttemptId appAttemptId)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetContainersResponse>() {
+ @Override
+ public GetContainersResponse run() throws Exception {
+ GetContainersRequest req =
+ GetContainersRequest.newInstance(appAttemptId);
+ GetContainersResponse response =
+ getRouterClientRMService().getContainers(req);
+ return response;
+ }
+ });
+ }
+
+ protected GetDelegationTokenResponse getDelegationToken(final String user)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<GetDelegationTokenResponse>() {
+ @Override
+ public GetDelegationTokenResponse run() throws Exception {
+ GetDelegationTokenRequest req =
+ GetDelegationTokenRequest.newInstance(user);
+ GetDelegationTokenResponse response =
+ getRouterClientRMService().getDelegationToken(req);
+ return response;
+ }
+ });
+ }
+
+ protected RenewDelegationTokenResponse renewDelegationToken(String user,
+ final Token token)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<RenewDelegationTokenResponse>() {
+ @Override
+ public RenewDelegationTokenResponse run() throws Exception {
+ RenewDelegationTokenRequest req =
+ RenewDelegationTokenRequest.newInstance(token);
+ RenewDelegationTokenResponse response =
+ getRouterClientRMService().renewDelegationToken(req);
+ return response;
+ }
+ });
+ }
+
+ protected CancelDelegationTokenResponse cancelDelegationToken(String user,
+ final Token token)
+ throws YarnException, IOException, InterruptedException {
+ return UserGroupInformation.createRemoteUser(user)
+ .doAs(new PrivilegedExceptionAction<CancelDelegationTokenResponse>() {
+ @Override
+ public CancelDelegationTokenResponse run() throws Exception {
+ CancelDelegationTokenRequest req =
+ CancelDelegationTokenRequest.newInstance(token);
+ CancelDelegationTokenResponse response =
+ getRouterClientRMService().cancelDelegationToken(req);
+ return response;
+ }
+ });
+ }
+
+ private ReservationSubmissionRequest createSimpleReservationRequest(
+ int numContainers, long arrival, long deadline, long duration,
+ ReservationId reservationId) {
+ // create a request with a single atomic ask
+ ReservationRequest r = ReservationRequest
+ .newInstance(Resource.newInstance(1024, 1), numContainers, 1, duration);
+ ReservationRequests reqs = ReservationRequests.newInstance(
+ Collections.singletonList(r), ReservationRequestInterpreter.R_ALL);
+ ReservationDefinition rDef = ReservationDefinition.newInstance(arrival,
+ deadline, reqs, "testRouterClientRMService#reservation");
+ ReservationSubmissionRequest request = ReservationSubmissionRequest
+ .newInstance(rDef, "dedicated", reservationId);
+ return request;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0125c1a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java
new file mode 100644
index 0000000..b38703f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/MockClientRequestInterceptor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
+
+/**
+ * This class mocks the ClientRequestInterceptor.
+ */
+public class MockClientRequestInterceptor
+ extends DefaultClientRequestInterceptor {
+
+ public void init(String user) {
+ MockResourceManagerFacade mockRM = new MockResourceManagerFacade(
+ new YarnConfiguration(super.getConf()), 0);
+ super.setRMClient(mockRM);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0125c1a8/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java
new file mode 100644
index 0000000..c403bd5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/PassThroughClientRequestInterceptor.java
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+/**
+ * Mock intercepter that does not do anything other than forwarding it to the
+ * next intercepter in the chain.
+ */
+public class PassThroughClientRequestInterceptor
+ extends AbstractClientRequestInterceptor {
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getNewApplication(request);
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().submitApplication(request);
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().forceKillApplication(request);
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getClusterMetrics(request);
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getClusterNodes(request);
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getQueueInfo(request);
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getQueueUserAcls(request);
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().moveApplicationAcrossQueues(request);
+ }
+
+ @Override
+ public GetNewReservationResponse getNewReservation(
+ GetNewReservationRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getNewReservation(request);
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ return getNextInterceptor().submitReservation(request);
+ }
+
+ @Override
+ public ReservationListResponse listReservations(
+ ReservationListRequest request) throws YarnException, IOException {
+ return getNextInterceptor().listReservations(request);
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ return getNextInterceptor().updateReservation(request);
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ return getNextInterceptor().deleteReservation(request);
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getNodeToLabels(request);
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getLabelsToNodes(request);
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getClusterNodeLabels(request);
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getApplicationReport(request);
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getApplications(request);
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getApplicationAttemptReport(request);
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getApplicationAttempts(request);
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getContainerReport(request);
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().getContainers(request);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ return getNextInterceptor().getDelegationToken(request);
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ return getNextInterceptor().renewDelegationToken(request);
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ return getNextInterceptor().cancelDelegationToken(request);
+ }
+
+ @Override
+ public FailApplicationAttemptResponse failApplicationAttempt(
+ FailApplicationAttemptRequest request) throws YarnException, IOException {
+ return getNextInterceptor().failApplicationAttempt(request);
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().updateApplicationPriority(request);
+ }
+
+ @Override
+ public SignalContainerResponse signalToContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ return getNextInterceptor().signalToContainer(request);
+ }
+
+ @Override
+ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+ UpdateApplicationTimeoutsRequest request)
+ throws YarnException, IOException {
+ return getNextInterceptor().updateApplicationTimeouts(request);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org