You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/08/02 00:25:05 UTC
[23/50] [abbrv] hadoop git commit: YARN-5411. Create a proxy chain
for ApplicationClientProtocol in the Router. (Giovanni Matteo Fumarola via
Subru).
YARN-5411. Create a proxy chain for ApplicationClientProtocol in the Router. (Giovanni Matteo Fumarola via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/48460690
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/48460690
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/48460690
Branch: refs/heads/YARN-2915
Commit: 4846069061b6baa06da3b524b9e36567dd368388
Parents: be99c1f
Author: Subru Krishnan <su...@apache.org>
Authored: Wed May 3 18:26:15 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Tue Aug 1 17:22:11 2017 -0700
----------------------------------------------------------------------
hadoop-project/pom.xml | 7 +
.../hadoop/yarn/conf/YarnConfiguration.java | 21 +
.../hadoop/yarn/util/LRUCacheHashMap.java | 49 ++
.../src/main/resources/yarn-default.xml | 18 +
.../hadoop/yarn/util/TestLRUCacheHashMap.java | 74 +++
.../hadoop-yarn-server-common/pom.xml | 11 +
.../yarn/server/MockResourceManagerFacade.java | 511 +++++++++++++++++
.../hadoop-yarn-server-nodemanager/pom.xml | 7 +
.../amrmproxy/MockRequestInterceptor.java | 14 +-
.../amrmproxy/MockResourceManagerFacade.java | 514 -----------------
.../hadoop-yarn-server-router/pom.xml | 19 +
.../hadoop/yarn/server/router/Router.java | 98 +++-
.../AbstractClientRequestInterceptor.java | 89 +++
.../clientrm/ClientRequestInterceptor.java | 65 +++
.../DefaultClientRequestInterceptor.java | 334 +++++++++++
.../router/clientrm/RouterClientRMService.java | 544 ++++++++++++++++++
.../server/router/clientrm/package-info.java | 20 +
.../hadoop/yarn/server/router/TestRouter.java | 26 -
.../router/clientrm/BaseRouterClientRMTest.java | 574 +++++++++++++++++++
.../clientrm/MockClientRequestInterceptor.java | 36 ++
.../PassThroughClientRequestInterceptor.java | 267 +++++++++
.../clientrm/TestRouterClientRMService.java | 210 +++++++
22 files changed, 2960 insertions(+), 548 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 15bd1fa..7301e90 100755
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -324,6 +324,13 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 7bcb123..cf9c237 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2637,6 +2637,27 @@ public class YarnConfiguration extends Configuration {
public static final int DEFAULT_FEDERATION_STATESTORE_SQL_MAXCONNECTIONS = 1;
+ public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
+
+ public static final String ROUTER_CLIENTRM_PREFIX =
+ ROUTER_PREFIX + "clientrm.";
+
+ public static final String ROUTER_CLIENTRM_ADDRESS =
+ ROUTER_CLIENTRM_PREFIX + ".address";
+ public static final int DEFAULT_ROUTER_CLIENTRM_PORT = 8050;
+ public static final String DEFAULT_ROUTER_CLIENTRM_ADDRESS =
+ "0.0.0.0:" + DEFAULT_ROUTER_CLIENTRM_PORT;
+
+ public static final String ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE =
+ ROUTER_CLIENTRM_PREFIX + "interceptor-class.pipeline";
+ public static final String DEFAULT_ROUTER_CLIENTRM_INTERCEPTOR_CLASS =
+ "org.apache.hadoop.yarn.server.router.clientrm."
+ + "DefaultClientRequestInterceptor";
+
+ public static final String ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE =
+ ROUTER_CLIENTRM_PREFIX + "cache-max-size";
+ public static final int DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = 25;
+
////////////////////////////////
// Other Configs
////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java
new file mode 100644
index 0000000..7cb4e1b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/LRUCacheHashMap.java
@@ -0,0 +1,49 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.util;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * LRU cache with a configurable maximum cache size and access order.
+ */
+public class LRUCacheHashMap<K, V> extends LinkedHashMap<K, V> {
+
+ private static final long serialVersionUID = 1L;
+
+ // Maximum size of the cache
+ private int maxSize;
+
+ /**
+ * Constructor.
+ *
+ * @param maxSize max size of the cache
+ * @param accessOrder true for access-order, false for insertion-order
+ */
+ public LRUCacheHashMap(int maxSize, boolean accessOrder) {
+ super(maxSize, 0.75f, accessOrder);
+ this.maxSize = maxSize;
+ }
+
+ @Override
+ protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ return size() > maxSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 6af7321..94dccd1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3167,6 +3167,24 @@
<property>
<description>
+ The comma separated list of class names that implement the
+ RequestInterceptor interface. This is used by the RouterClientRMService
+ to create the request processing pipeline for users.
+ </description>
+ <name>yarn.router.clientrm.interceptor-class.pipeline</name>
+ <value>org.apache.hadoop.yarn.server.router.clientrm.DefaultClientRequestInterceptor</value>
+ </property>
+
+ <property>
+ <description>
+ Size of LRU cache for Router ClientRM Service.
+ </description>
+ <name>yarn.router.clientrm.cache-max-size</name>
+ <value>25</value>
+ </property>
+
+ <property>
+ <description>
Comma-separated list of PlacementRules to determine how applications
submitted by certain users get mapped to certain queues. Default is
user-group, which corresponds to UserGroupMappingPlacementRule.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java
new file mode 100644
index 0000000..1cbb56c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java
@@ -0,0 +1,74 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.util;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class to validate the correctness of the LRUCacheHashMap.
+ *
+ */
+public class TestLRUCacheHashMap {
+
+ /**
+ * Test if the different entries are generated, and LRU cache is working as
+ * expected.
+ */
+ @Test
+ public void testLRUCache()
+ throws YarnException, IOException, InterruptedException {
+
+ int mapSize = 5;
+
+ LRUCacheHashMap<String, Integer> map =
+ new LRUCacheHashMap<String, Integer>(mapSize, true);
+
+ map.put("1", 1);
+ map.put("2", 2);
+ map.put("3", 3);
+ map.put("4", 4);
+ map.put("5", 5);
+
+ Assert.assertEquals(mapSize, map.size());
+
+ // Check if all the elements in the map are from 1 to 5
+ for (int i = 1; i < mapSize; i++) {
+ Assert.assertTrue(map.containsKey(Integer.toString(i)));
+ }
+
+ map.put("6", 6);
+ map.put("3", 3);
+ map.put("7", 7);
+ map.put("8", 8);
+
+ Assert.assertEquals(mapSize, map.size());
+
+ // Check if all the elements in the map are from 5 to 8 and the 3
+ for (int i = 5; i < mapSize; i++) {
+ Assert.assertTrue(map.containsKey(Integer.toString(i)));
+ }
+
+ Assert.assertTrue(map.containsKey("3"));
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index c9f6d79..5f85097 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -209,6 +209,17 @@
</excludes>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <phase>test-compile</phase>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
new file mode 100644
index 0000000..e302c70
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -0,0 +1,511 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.AMCommand;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.ReservationAllocationState;
+import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Strings;
+
+/**
+ * Mock Resource Manager facade implementation that exposes all the methods
+ * implemented by the YARN RM. The behavior and the values returned by this mock
+ * implementation is expected by the Router/AMRMProxy unit test cases. So please
+ * change the implementation with care.
+ */
+public class MockResourceManagerFacade
+ implements ApplicationClientProtocol, ApplicationMasterProtocol {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MockResourceManagerFacade.class);
+
+ private HashMap<String, List<ContainerId>> applicationContainerIdMap =
+ new HashMap<String, List<ContainerId>>();
+ private HashMap<ContainerId, Container> allocatedContainerMap =
+ new HashMap<ContainerId, Container>();
+ private AtomicInteger containerIndex = new AtomicInteger(0);
+ private Configuration conf;
+
+ public MockResourceManagerFacade(Configuration conf,
+ int startContainerIndex) {
+ this.conf = conf;
+ this.containerIndex.set(startContainerIndex);
+ }
+
+ private static String getAppIdentifier() throws IOException {
+ AMRMTokenIdentifier result = null;
+ UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
+ Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
+ for (TokenIdentifier tokenId : tokenIds) {
+ if (tokenId instanceof AMRMTokenIdentifier) {
+ result = (AMRMTokenIdentifier) tokenId;
+ break;
+ }
+ }
+ return result != null ? result.getApplicationAttemptId().toString() : "";
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
+ String amrmToken = getAppIdentifier();
+ LOG.info("Registering application attempt: " + amrmToken);
+
+ synchronized (applicationContainerIdMap) {
+ Assert.assertFalse(
+ "The application id is already registered: " + amrmToken,
+ applicationContainerIdMap.containsKey(amrmToken));
+ // Keep track of the containers that are returned to this application
+ applicationContainerIdMap.put(amrmToken, new ArrayList<ContainerId>());
+ }
+
+ return RegisterApplicationMasterResponse.newInstance(null, null, null, null,
+ null, request.getHost(), null);
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request)
+ throws YarnException, IOException {
+ String amrmToken = getAppIdentifier();
+ LOG.info("Finishing application attempt: " + amrmToken);
+
+ synchronized (applicationContainerIdMap) {
+ // Remove the containers that were being tracked for this application
+ Assert.assertTrue("The application id is NOT registered: " + amrmToken,
+ applicationContainerIdMap.containsKey(amrmToken));
+ List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken);
+ for (ContainerId c : ids) {
+ allocatedContainerMap.remove(c);
+ }
+ }
+
+ return FinishApplicationMasterResponse.newInstance(
+ request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED
+ ? true : false);
+ }
+
+ protected ApplicationId getApplicationId(int id) {
+ return ApplicationId.newInstance(12345, id);
+ }
+
+ protected ApplicationAttemptId getApplicationAttemptId(int id) {
+ return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ if (request.getAskList() != null && request.getAskList().size() > 0
+ && request.getReleaseList() != null
+ && request.getReleaseList().size() > 0) {
+ Assert.fail("The mock RM implementation does not support receiving "
+ + "askList and releaseList in the same heartbeat");
+ }
+
+ String amrmToken = getAppIdentifier();
+
+ ArrayList<Container> containerList = new ArrayList<Container>();
+ if (request.getAskList() != null) {
+ for (ResourceRequest rr : request.getAskList()) {
+ for (int i = 0; i < rr.getNumContainers(); i++) {
+ ContainerId containerId = ContainerId.newInstance(
+ getApplicationAttemptId(1), containerIndex.incrementAndGet());
+ Container container = Records.newRecord(Container.class);
+ container.setId(containerId);
+ container.setPriority(rr.getPriority());
+
+ // We don't use the node for running containers in the test cases. So
+ // it is OK to hard code it to some dummy value
+ NodeId nodeId =
+ NodeId.newInstance(!Strings.isNullOrEmpty(rr.getResourceName())
+ ? rr.getResourceName() : "dummy", 1000);
+ container.setNodeId(nodeId);
+ container.setResource(rr.getCapability());
+ containerList.add(container);
+
+ synchronized (applicationContainerIdMap) {
+ // Keep track of the containers returned to this application. We
+ // will need it in future
+ Assert.assertTrue(
+ "The application id is Not registered before allocate(): "
+ + amrmToken,
+ applicationContainerIdMap.containsKey(amrmToken));
+ List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
+ ids.add(containerId);
+ this.allocatedContainerMap.put(containerId, container);
+ }
+ }
+ }
+ }
+
+ if (request.getReleaseList() != null
+ && request.getReleaseList().size() > 0) {
+ LOG.info("Releasing containers: " + request.getReleaseList().size());
+ synchronized (applicationContainerIdMap) {
+ Assert
+ .assertTrue(
+ "The application id is not registered before allocate(): "
+ + amrmToken,
+ applicationContainerIdMap.containsKey(amrmToken));
+ List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
+
+ for (ContainerId id : request.getReleaseList()) {
+ boolean found = false;
+ for (ContainerId c : ids) {
+ if (c.equals(id)) {
+ found = true;
+ break;
+ }
+ }
+
+ Assert.assertTrue("ContainerId " + id
+ + " being released is not valid for application: "
+ + conf.get("AMRMTOKEN"), found);
+
+ ids.remove(id);
+
+ // Return the released container back to the AM with new fake Ids. The
+ // test case does not care about the IDs. The IDs are faked because
+ // otherwise the LRM will throw duplication identifier exception. This
+ // returning of fake containers is ONLY done for testing purpose - for
+ // the test code to get confirmation that the sub-cluster resource
+ // managers received the release request
+ ContainerId fakeContainerId = ContainerId.newInstance(
+ getApplicationAttemptId(1), containerIndex.incrementAndGet());
+ Container fakeContainer = allocatedContainerMap.get(id);
+ fakeContainer.setId(fakeContainerId);
+ containerList.add(fakeContainer);
+ }
+ }
+ }
+
+ LOG.info("Allocating containers: " + containerList.size()
+ + " for application attempt: " + conf.get("AMRMTOKEN"));
+
+ // Always issue a new AMRMToken as if RM rolled master key
+ Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], "");
+
+ return AllocateResponse.newInstance(0, new ArrayList<ContainerStatus>(),
+ containerList, new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC,
+ 1, null, new ArrayList<NMToken>(), newAMRMToken,
+ new ArrayList<UpdatedContainer>());
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnException, IOException {
+
+ GetApplicationReportResponse response =
+ Records.newRecord(GetApplicationReportResponse.class);
+ ApplicationReport report = Records.newRecord(ApplicationReport.class);
+ report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
+ report.setApplicationId(request.getApplicationId());
+ report.setCurrentApplicationAttemptId(
+ ApplicationAttemptId.newInstance(request.getApplicationId(), 1));
+ response.setApplicationReport(report);
+ return response;
+ }
+
+ @Override
+ public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+ GetApplicationAttemptReportRequest request)
+ throws YarnException, IOException {
+
+ GetApplicationAttemptReportResponse response =
+ Records.newRecord(GetApplicationAttemptReportResponse.class);
+ ApplicationAttemptReport report =
+ Records.newRecord(ApplicationAttemptReport.class);
+ report.setApplicationAttemptId(request.getApplicationAttemptId());
+ report.setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED);
+ response.setApplicationAttemptReport(report);
+ return response;
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnException, IOException {
+ return GetNewApplicationResponse.newInstance(null, null, null);
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnException, IOException {
+ return SubmitApplicationResponse.newInstance();
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnException, IOException {
+ return KillApplicationResponse.newInstance(true);
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnException, IOException {
+ return GetClusterMetricsResponse.newInstance(null);
+ }
+
+ @Override
+ public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+ throws YarnException, IOException {
+ return GetApplicationsResponse.newInstance(null);
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnException, IOException {
+ return GetClusterNodesResponse.newInstance(null);
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnException, IOException {
+ return GetQueueInfoResponse.newInstance(null);
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+ return GetQueueUserAclsInfoResponse.newInstance(null);
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnException, IOException {
+ return GetDelegationTokenResponse.newInstance(null);
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnException, IOException {
+ return RenewDelegationTokenResponse.newInstance(0);
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnException, IOException {
+ return CancelDelegationTokenResponse.newInstance();
+ }
+
+ @Override
+ public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+ MoveApplicationAcrossQueuesRequest request)
+ throws YarnException, IOException {
+ return MoveApplicationAcrossQueuesResponse.newInstance();
+ }
+
+ @Override
+ public GetApplicationAttemptsResponse getApplicationAttempts(
+ GetApplicationAttemptsRequest request) throws YarnException, IOException {
+ return GetApplicationAttemptsResponse.newInstance(null);
+ }
+
+ @Override
+ public GetContainerReportResponse getContainerReport(
+ GetContainerReportRequest request) throws YarnException, IOException {
+ return GetContainerReportResponse.newInstance(null);
+ }
+
+ @Override
+ public GetContainersResponse getContainers(GetContainersRequest request)
+ throws YarnException, IOException {
+ return GetContainersResponse.newInstance(null);
+ }
+
+ @Override
+ public ReservationSubmissionResponse submitReservation(
+ ReservationSubmissionRequest request) throws YarnException, IOException {
+ return ReservationSubmissionResponse.newInstance();
+ }
+
+ @Override
+ public ReservationListResponse listReservations(
+ ReservationListRequest request) throws YarnException, IOException {
+ return ReservationListResponse
+ .newInstance(new ArrayList<ReservationAllocationState>());
+ }
+
+ @Override
+ public ReservationUpdateResponse updateReservation(
+ ReservationUpdateRequest request) throws YarnException, IOException {
+ return ReservationUpdateResponse.newInstance();
+ }
+
+ @Override
+ public ReservationDeleteResponse deleteReservation(
+ ReservationDeleteRequest request) throws YarnException, IOException {
+ return ReservationDeleteResponse.newInstance();
+ }
+
+ @Override
+ public GetNodesToLabelsResponse getNodeToLabels(
+ GetNodesToLabelsRequest request) throws YarnException, IOException {
+ return GetNodesToLabelsResponse
+ .newInstance(new HashMap<NodeId, Set<String>>());
+ }
+
+ @Override
+ public GetClusterNodeLabelsResponse getClusterNodeLabels(
+ GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+ return GetClusterNodeLabelsResponse.newInstance(new ArrayList<NodeLabel>());
+ }
+
+ @Override
+ public GetLabelsToNodesResponse getLabelsToNodes(
+ GetLabelsToNodesRequest request) throws YarnException, IOException {
+ return GetLabelsToNodesResponse.newInstance(null);
+ }
+
+ @Override
+ public GetNewReservationResponse getNewReservation(
+ GetNewReservationRequest request) throws YarnException, IOException {
+ return GetNewReservationResponse
+ .newInstance(ReservationId.newInstance(0, 0));
+ }
+
+ @Override
+ public FailApplicationAttemptResponse failApplicationAttempt(
+ FailApplicationAttemptRequest request) throws YarnException, IOException {
+ return FailApplicationAttemptResponse.newInstance();
+ }
+
+ @Override
+ public UpdateApplicationPriorityResponse updateApplicationPriority(
+ UpdateApplicationPriorityRequest request)
+ throws YarnException, IOException {
+ return UpdateApplicationPriorityResponse.newInstance(null);
+ }
+
+ @Override
+ public SignalContainerResponse signalToContainer(
+ SignalContainerRequest request) throws YarnException, IOException {
+ return new SignalContainerResponsePBImpl();
+ }
+
+ @Override
+ public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+ UpdateApplicationTimeoutsRequest request)
+ throws YarnException, IOException {
+ return UpdateApplicationTimeoutsResponse.newInstance();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
index 094519a..28ee0d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
@@ -172,6 +172,13 @@
<groupId>org.fusesource.leveldbjni</groupId>
<artifactId>leveldbjni-all</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
index c962f97..1cbb237 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockRequestInterceptor.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
public class MockRequestInterceptor extends AbstractRequestInterceptor {
@@ -38,22 +39,21 @@ public class MockRequestInterceptor extends AbstractRequestInterceptor {
public void init(AMRMProxyApplicationContext appContext) {
super.init(appContext);
- mockRM =
- new MockResourceManagerFacade(new YarnConfiguration(
- super.getConf()), 0);
+ mockRM = new MockResourceManagerFacade(
+ new YarnConfiguration(super.getConf()), 0);
}
@Override
public RegisterApplicationMasterResponse registerApplicationMaster(
- RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
+ RegisterApplicationMasterRequest request)
+ throws YarnException, IOException {
return mockRM.registerApplicationMaster(request);
}
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
- FinishApplicationMasterRequest request) throws YarnException,
- IOException {
+ FinishApplicationMasterRequest request)
+ throws YarnException, IOException {
return mockRM.finishApplicationMaster(request);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
deleted file mode 100644
index f584c94..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
+++ /dev/null
@@ -1,514 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.base.Strings;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
-import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.records.AMCommand;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NMToken;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.api.records.UpdatedContainer;
-import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.util.Records;
-import org.junit.Assert;
-import org.eclipse.jetty.util.log.Log;
-
-/**
- * Mock Resource Manager facade implementation that exposes all the methods
- * implemented by the YARN RM. The behavior and the values returned by this mock
- * implementation is expected by the unit test cases. So please change the
- * implementation with care.
- */
-public class MockResourceManagerFacade implements
- ApplicationMasterProtocol, ApplicationClientProtocol {
-
- private HashMap<String, List<ContainerId>> applicationContainerIdMap =
- new HashMap<String, List<ContainerId>>();
- private HashMap<ContainerId, Container> allocatedContainerMap =
- new HashMap<ContainerId, Container>();
- private AtomicInteger containerIndex = new AtomicInteger(0);
- private Configuration conf;
-
- public MockResourceManagerFacade(Configuration conf,
- int startContainerIndex) {
- this.conf = conf;
- this.containerIndex.set(startContainerIndex);
- }
-
- private static String getAppIdentifier() throws IOException {
- AMRMTokenIdentifier result = null;
- UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
- Set<TokenIdentifier> tokenIds = remoteUgi.getTokenIdentifiers();
- for (TokenIdentifier tokenId : tokenIds) {
- if (tokenId instanceof AMRMTokenIdentifier) {
- result = (AMRMTokenIdentifier) tokenId;
- break;
- }
- }
- return result != null ? result.getApplicationAttemptId().toString()
- : "";
- }
-
- @Override
- public RegisterApplicationMasterResponse registerApplicationMaster(
- RegisterApplicationMasterRequest request) throws YarnException,
- IOException {
- String amrmToken = getAppIdentifier();
- Log.getLog().info("Registering application attempt: " + amrmToken);
-
- synchronized (applicationContainerIdMap) {
- Assert.assertFalse("The application id is already registered: "
- + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
- // Keep track of the containers that are returned to this application
- applicationContainerIdMap.put(amrmToken,
- new ArrayList<ContainerId>());
- }
-
- return RegisterApplicationMasterResponse.newInstance(null, null, null,
- null, null, request.getHost(), null);
- }
-
- @Override
- public FinishApplicationMasterResponse finishApplicationMaster(
- FinishApplicationMasterRequest request) throws YarnException,
- IOException {
- String amrmToken = getAppIdentifier();
- Log.getLog().info("Finishing application attempt: " + amrmToken);
-
- synchronized (applicationContainerIdMap) {
- // Remove the containers that were being tracked for this application
- Assert.assertTrue("The application id is NOT registered: "
- + amrmToken, applicationContainerIdMap.containsKey(amrmToken));
- List<ContainerId> ids = applicationContainerIdMap.remove(amrmToken);
- for (ContainerId c : ids) {
- allocatedContainerMap.remove(c);
- }
- }
-
- return FinishApplicationMasterResponse
- .newInstance(request.getFinalApplicationStatus() == FinalApplicationStatus.SUCCEEDED ? true
- : false);
- }
-
- protected ApplicationId getApplicationId(int id) {
- return ApplicationId.newInstance(12345, id);
- }
-
- protected ApplicationAttemptId getApplicationAttemptId(int id) {
- return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
- }
-
- @SuppressWarnings("deprecation")
- @Override
- public AllocateResponse allocate(AllocateRequest request)
- throws YarnException, IOException {
- if (request.getAskList() != null && request.getAskList().size() > 0
- && request.getReleaseList() != null
- && request.getReleaseList().size() > 0) {
- Assert.fail("The mock RM implementation does not support receiving "
- + "askList and releaseList in the same heartbeat");
- }
-
- String amrmToken = getAppIdentifier();
-
- ArrayList<Container> containerList = new ArrayList<Container>();
- if (request.getAskList() != null) {
- for (ResourceRequest rr : request.getAskList()) {
- for (int i = 0; i < rr.getNumContainers(); i++) {
- ContainerId containerId =
- ContainerId.newInstance(getApplicationAttemptId(1),
- containerIndex.incrementAndGet());
- Container container = Records.newRecord(Container.class);
- container.setId(containerId);
- container.setPriority(rr.getPriority());
-
- // We don't use the node for running containers in the test cases. So
- // it is OK to hard code it to some dummy value
- NodeId nodeId =
- NodeId.newInstance(
- !Strings.isNullOrEmpty(rr.getResourceName()) ? rr
- .getResourceName() : "dummy", 1000);
- container.setNodeId(nodeId);
- container.setResource(rr.getCapability());
- containerList.add(container);
-
- synchronized (applicationContainerIdMap) {
- // Keep track of the containers returned to this application. We
- // will need it in future
- Assert.assertTrue(
- "The application id is Not registered before allocate(): "
- + amrmToken,
- applicationContainerIdMap.containsKey(amrmToken));
- List<ContainerId> ids =
- applicationContainerIdMap.get(amrmToken);
- ids.add(containerId);
- this.allocatedContainerMap.put(containerId, container);
- }
- }
- }
- }
-
- if (request.getReleaseList() != null
- && request.getReleaseList().size() > 0) {
- Log.getLog().info("Releasing containers: "
- + request.getReleaseList().size());
- synchronized (applicationContainerIdMap) {
- Assert.assertTrue(
- "The application id is not registered before allocate(): "
- + amrmToken,
- applicationContainerIdMap.containsKey(amrmToken));
- List<ContainerId> ids = applicationContainerIdMap.get(amrmToken);
-
- for (ContainerId id : request.getReleaseList()) {
- boolean found = false;
- for (ContainerId c : ids) {
- if (c.equals(id)) {
- found = true;
- break;
- }
- }
-
- Assert.assertTrue(
- "ContainerId " + id
- + " being released is not valid for application: "
- + conf.get("AMRMTOKEN"), found);
-
- ids.remove(id);
-
- // Return the released container back to the AM with new fake Ids. The
- // test case does not care about the IDs. The IDs are faked because
- // otherwise the LRM will throw duplication identifier exception. This
- // returning of fake containers is ONLY done for testing purpose - for
- // the test code to get confirmation that the sub-cluster resource
- // managers received the release request
- ContainerId fakeContainerId =
- ContainerId.newInstance(getApplicationAttemptId(1),
- containerIndex.incrementAndGet());
- Container fakeContainer = allocatedContainerMap.get(id);
- fakeContainer.setId(fakeContainerId);
- containerList.add(fakeContainer);
- }
- }
- }
-
- Log.getLog().info("Allocating containers: " + containerList.size()
- + " for application attempt: " + conf.get("AMRMTOKEN"));
-
- // Always issue a new AMRMToken as if RM rolled master key
- Token newAMRMToken = Token.newInstance(new byte[0], "", new byte[0], "");
-
- return AllocateResponse.newInstance(0,
- new ArrayList<ContainerStatus>(), containerList,
- new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
- new ArrayList<NMToken>(), newAMRMToken,
- new ArrayList<UpdatedContainer>());
- }
-
- @Override
- public GetApplicationReportResponse getApplicationReport(
- GetApplicationReportRequest request) throws YarnException,
- IOException {
-
- GetApplicationReportResponse response =
- Records.newRecord(GetApplicationReportResponse.class);
- ApplicationReport report = Records.newRecord(ApplicationReport.class);
- report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
- report.setApplicationId(request.getApplicationId());
- report.setCurrentApplicationAttemptId(ApplicationAttemptId
- .newInstance(request.getApplicationId(), 1));
- response.setApplicationReport(report);
- return response;
- }
-
- @Override
- public GetApplicationAttemptReportResponse getApplicationAttemptReport(
- GetApplicationAttemptReportRequest request) throws YarnException,
- IOException {
- GetApplicationAttemptReportResponse response =
- Records.newRecord(GetApplicationAttemptReportResponse.class);
- ApplicationAttemptReport report =
- Records.newRecord(ApplicationAttemptReport.class);
- report.setApplicationAttemptId(request.getApplicationAttemptId());
- report
- .setYarnApplicationAttemptState(YarnApplicationAttemptState.LAUNCHED);
- response.setApplicationAttemptReport(report);
- return response;
- }
-
- @Override
- public GetNewApplicationResponse getNewApplication(
- GetNewApplicationRequest request) throws YarnException, IOException {
- return null;
- }
-
- @Override
- public SubmitApplicationResponse submitApplication(
- SubmitApplicationRequest request) throws YarnException, IOException {
- return null;
- }
-
- @Override
- public KillApplicationResponse forceKillApplication(
- KillApplicationRequest request) throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetClusterMetricsResponse getClusterMetrics(
- GetClusterMetricsRequest request) throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetApplicationsResponse getApplications(
- GetApplicationsRequest request) throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetClusterNodesResponse getClusterNodes(
- GetClusterNodesRequest request) throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
- throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetQueueUserAclsInfoResponse getQueueUserAcls(
- GetQueueUserAclsInfoRequest request) throws YarnException,
- IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetDelegationTokenResponse getDelegationToken(
- GetDelegationTokenRequest request) throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public RenewDelegationTokenResponse renewDelegationToken(
- RenewDelegationTokenRequest request) throws YarnException,
- IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public CancelDelegationTokenResponse cancelDelegationToken(
- CancelDelegationTokenRequest request) throws YarnException,
- IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
- MoveApplicationAcrossQueuesRequest request) throws YarnException,
- IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetApplicationAttemptsResponse getApplicationAttempts(
- GetApplicationAttemptsRequest request) throws YarnException,
- IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetContainerReportResponse getContainerReport(
- GetContainerReportRequest request) throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetContainersResponse getContainers(GetContainersRequest request)
- throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetNewReservationResponse getNewReservation(
- GetNewReservationRequest request) throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public ReservationSubmissionResponse submitReservation(
- ReservationSubmissionRequest request) throws YarnException,
- IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public ReservationListResponse listReservations(
- ReservationListRequest request) throws YarnException,
- IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public ReservationUpdateResponse updateReservation(
- ReservationUpdateRequest request) throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public ReservationDeleteResponse deleteReservation(
- ReservationDeleteRequest request) throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetNodesToLabelsResponse getNodeToLabels(
- GetNodesToLabelsRequest request) throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetClusterNodeLabelsResponse getClusterNodeLabels(
- GetClusterNodeLabelsRequest request) throws YarnException,
- IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public GetLabelsToNodesResponse getLabelsToNodes(
- GetLabelsToNodesRequest request) throws YarnException, IOException {
- return null;
- }
-
- @Override
- public UpdateApplicationPriorityResponse updateApplicationPriority(
- UpdateApplicationPriorityRequest request) throws YarnException,
- IOException {
- return null;
- }
-
- @Override
- public SignalContainerResponse signalToContainer(
- SignalContainerRequest request) throws IOException {
-return null;
-}
-
- @Override
- public FailApplicationAttemptResponse failApplicationAttempt(
- FailApplicationAttemptRequest request) throws YarnException, IOException {
- throw new NotImplementedException();
- }
-
- @Override
- public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
- UpdateApplicationTimeoutsRequest request)
- throws YarnException, IOException {
- throw new NotImplementedException();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
index 25afa5c..89813de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
@@ -50,12 +50,31 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ </plugin>
</plugins>
</build>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
index 7be8a59..7cfabf5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
@@ -18,6 +18,20 @@
package org.apache.hadoop.yarn.server.router;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* The router is a stateless YARN component which is the entry point to the
* cluster. It can be deployed on multiple nodes behind a Virtual IP (VIP) with
@@ -33,6 +47,88 @@ package org.apache.hadoop.yarn.server.router;
* This provides a placeholder for throttling mis-behaving clients (YARN-1546)
* and masks the access to multiple RMs (YARN-3659).
*/
-public class Router{
+public class Router extends CompositeService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Router.class);
+ private static CompositeServiceShutdownHook routerShutdownHook;
+ private Configuration conf;
+ private AtomicBoolean isStopping = new AtomicBoolean(false);
+ private RouterClientRMService clientRMProxyService;
+
+ /**
+ * Priority of the Router shutdown hook.
+ */
+ public static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+ public Router() {
+ super(Router.class.getName());
+ }
+
+ protected void doSecureLogin() throws IOException {
+ // TODO YARN-6539 Create SecureLogin inside Router
+ }
+
+ @Override
+ protected void serviceInit(Configuration config) throws Exception {
+ this.conf = config;
+ clientRMProxyService = createClientRMProxyService();
+ addService(clientRMProxyService);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ try {
+ doSecureLogin();
+ } catch (IOException e) {
+ throw new YarnRuntimeException("Failed Router login", e);
+ }
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (isStopping.getAndSet(true)) {
+ return;
+ }
+ super.serviceStop();
+ }
+
+ protected void shutDown() {
+ new Thread() {
+ @Override
+ public void run() {
+ Router.this.stop();
+ }
+ }.start();
+ }
+
+ protected RouterClientRMService createClientRMProxyService() {
+ return new RouterClientRMService();
+ }
+
+ public static void main(String[] argv) {
+ Configuration conf = new YarnConfiguration();
+ Thread
+ .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ StringUtils.startupShutdownMessage(Router.class, argv, LOG);
+ Router router = new Router();
+ try {
+
+ // Remove the old hook if we are rebooting.
+ if (null != routerShutdownHook) {
+ ShutdownHookManager.get().removeShutdownHook(routerShutdownHook);
+ }
+
+ routerShutdownHook = new CompositeServiceShutdownHook(router);
+ ShutdownHookManager.get().addShutdownHook(routerShutdownHook,
+ SHUTDOWN_HOOK_PRIORITY);
+ router.init(conf);
+ router.start();
+ } catch (Throwable t) {
+ LOG.error("Error starting Router", t);
+ System.exit(-1);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
new file mode 100644
index 0000000..fc6a118
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Implements the RequestInterceptor interface and provides common functionality
+ * which can can be used and/or extended by other concrete intercepter classes.
+ *
+ */
+public abstract class AbstractClientRequestInterceptor
+ implements ClientRequestInterceptor {
+ private Configuration conf;
+ private ClientRequestInterceptor nextInterceptor;
+
+ /**
+ * Sets the {@code RequestInterceptor} in the chain.
+ */
+ @Override
+ public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) {
+ this.nextInterceptor = nextInterceptor;
+ }
+
+ /**
+ * Sets the {@link Configuration}.
+ */
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.setConf(conf);
+ }
+ }
+
+ /**
+ * Gets the {@link Configuration}.
+ */
+ @Override
+ public Configuration getConf() {
+ return this.conf;
+ }
+
+ /**
+ * Initializes the {@code ClientRequestInterceptor}.
+ */
+ @Override
+ public void init(String user) {
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.init(user);
+ }
+ }
+
+ /**
+ * Disposes the {@code ClientRequestInterceptor}.
+ */
+ @Override
+ public void shutdown() {
+ if (this.nextInterceptor != null) {
+ this.nextInterceptor.shutdown();
+ }
+ }
+
+ /**
+ * Gets the next {@link ClientRequestInterceptor} in the chain.
+ */
+ @Override
+ public ClientRequestInterceptor getNextInterceptor() {
+ return this.nextInterceptor;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/48460690/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java
new file mode 100644
index 0000000..2f8fb93
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/ClientRequestInterceptor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+
+/**
+ * Defines the contract to be implemented by the request intercepter classes,
+ * that can be used to intercept and inspect messages sent from the client to
+ * the resource manager.
+ */
+public interface ClientRequestInterceptor
+ extends ApplicationClientProtocol, Configurable {
+ /**
+ * This method is called for initializing the intercepter. This is guaranteed
+ * to be called only once in the lifetime of this instance.
+ *
+ * @param user the name of the client
+ */
+ void init(String user);
+
+ /**
+ * This method is called to release the resources held by the intercepter.
+ * This will be called when the application pipeline is being destroyed. The
+ * concrete implementations should dispose the resources and forward the
+ * request to the next intercepter, if any.
+ */
+ void shutdown();
+
+ /**
+ * Sets the next intercepter in the pipeline. The concrete implementation of
+ * this interface should always pass the request to the nextInterceptor after
+ * inspecting the message. The last intercepter in the chain is responsible to
+ * send the messages to the resource manager service and so the last
+ * intercepter will not receive this method call.
+ *
+ * @param nextInterceptor the ClientRequestInterceptor to set in the pipeline
+ */
+ void setNextInterceptor(ClientRequestInterceptor nextInterceptor);
+
+ /**
+ * Returns the next intercepter in the chain.
+ *
+ * @return the next intercepter in the chain
+ */
+ ClientRequestInterceptor getNextInterceptor();
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org