You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/08/02 00:25:00 UTC

[18/50] [abbrv] hadoop git commit: YARN-5413. Create a proxy chain for ResourceManager Admin API in the Router. (Giovanni Matteo Fumarola via Subru).

YARN-5413. Create a proxy chain for ResourceManager Admin API in the Router. (Giovanni Matteo Fumarola via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/67846a55
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/67846a55
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/67846a55

Branch: refs/heads/YARN-2915
Commit: 67846a5519b5905c2d925cf4c602f715b653e72c
Parents: 4846069
Author: Subru Krishnan <su...@apache.org>
Authored: Tue May 9 19:19:27 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Tue Aug 1 17:22:11 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  22 +-
 .../src/main/resources/yarn-default.xml         |  25 +-
 .../hadoop/yarn/util/TestLRUCacheHashMap.java   |   2 +-
 .../yarn/server/MockResourceManagerFacade.java  | 120 +++++-
 .../hadoop/yarn/server/router/Router.java       |  10 +
 .../AbstractClientRequestInterceptor.java       |  11 +-
 .../DefaultClientRequestInterceptor.java        |   2 +-
 .../router/clientrm/RouterClientRMService.java  |  16 +-
 .../AbstractRMAdminRequestInterceptor.java      |  90 ++++
 .../DefaultRMAdminRequestInterceptor.java       | 215 ++++++++++
 .../rmadmin/RMAdminRequestInterceptor.java      |  65 +++
 .../router/rmadmin/RouterRMAdminService.java    | 423 +++++++++++++++++++
 .../server/router/rmadmin/package-info.java     |  20 +
 .../router/clientrm/BaseRouterClientRMTest.java |   2 +-
 .../router/rmadmin/BaseRouterRMAdminTest.java   | 346 +++++++++++++++
 .../rmadmin/MockRMAdminRequestInterceptor.java  |  36 ++
 .../PassThroughRMAdminRequestInterceptor.java   | 148 +++++++
 .../rmadmin/TestRouterRMAdminService.java       | 219 ++++++++++
 18 files changed, 1750 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index cf9c237..1432867 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2639,6 +2639,8 @@ public class YarnConfiguration extends Configuration {
 
   public static final String ROUTER_PREFIX = YARN_PREFIX + "router.";
 
+  public static final String ROUTER_BIND_HOST = ROUTER_PREFIX + "bind-host";
+
   public static final String ROUTER_CLIENTRM_PREFIX =
       ROUTER_PREFIX + "clientrm.";
 
@@ -2654,9 +2656,23 @@ public class YarnConfiguration extends Configuration {
       "org.apache.hadoop.yarn.server.router.clientrm."
           + "DefaultClientRequestInterceptor";
 
-  public static final String ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE =
-      ROUTER_CLIENTRM_PREFIX + "cache-max-size";
-  public static final int DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE = 25;
+  public static final String ROUTER_PIPELINE_CACHE_MAX_SIZE =
+      ROUTER_PREFIX + "pipeline.cache-max-size";
+  public static final int DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE = 25;
+
+  public static final String ROUTER_RMADMIN_PREFIX = ROUTER_PREFIX + "rmadmin.";
+
+  public static final String ROUTER_RMADMIN_ADDRESS =
+      ROUTER_RMADMIN_PREFIX + ".address";
+  public static final int DEFAULT_ROUTER_RMADMIN_PORT = 8052;
+  public static final String DEFAULT_ROUTER_RMADMIN_ADDRESS =
+      "0.0.0.0:" + DEFAULT_ROUTER_RMADMIN_PORT;
+
+  public static final String ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE =
+      ROUTER_RMADMIN_PREFIX + "interceptor-class.pipeline";
+  public static final String DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS =
+      "org.apache.hadoop.yarn.server.router.rmadmin."
+          + "DefaultRMAdminRequestInterceptor";
 
   ////////////////////////////////
   // Other Configs

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 94dccd1..8219325 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -3177,14 +3177,35 @@
 
   <property>
     <description>
-      Size of LRU cache for Router ClientRM Service.
+      Size of LRU cache for Router ClientRM Service and RMAdmin Service.
     </description>
-    <name>yarn.router.clientrm.cache-max-size</name>
+    <name>yarn.router.pipeline.cache-max-size</name>
     <value>25</value>
   </property>
 
   <property>
     <description>
+      The comma separated list of class names that implement the
+      RequestInterceptor interface. This is used by the RouterRMAdminService
+      to create the request processing pipeline for users.
+    </description>
+    <name>yarn.router.rmadmin.interceptor-class.pipeline</name>
+    <value>org.apache.hadoop.yarn.server.router.rmadmin.DefaultRMAdminRequestInterceptor</value>
+  </property>
+
+  <property>
+    <description>
+      The actual address the server will bind to. If this optional address is
+      set, the RPC and webapp servers will bind to this address and the port specified in
+      yarn.router.address and yarn.router.webapp.address, respectively. This is
+      most useful for making Router listen to all interfaces by setting to 0.0.0.0.
+    </description>
+    <name>yarn.router.bind-host</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>
       Comma-separated list of PlacementRules to determine how applications
       submitted by certain users get mapped to certain queues. Default is
       user-group, which corresponds to UserGroupMappingPlacementRule.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java
index 1cbb56c..9d3ec32 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestLRUCacheHashMap.java
@@ -24,7 +24,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 /**
- * Test class to validate the correctness of the LRUCacheHashMap.
+ * Test class to validate the correctness of the {@code LRUCacheHashMap}.
  *
  */
 public class TestLRUCacheHashMap {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index e302c70..696188b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.StandbyException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@@ -117,6 +118,33 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.slf4j.Logger;
@@ -130,8 +158,8 @@ import com.google.common.base.Strings;
  * implementation is expected by the Router/AMRMProxy unit test cases. So please
  * change the implementation with care.
  */
-public class MockResourceManagerFacade
-    implements ApplicationClientProtocol, ApplicationMasterProtocol {
+public class MockResourceManagerFacade implements ApplicationClientProtocol,
+    ApplicationMasterProtocol, ResourceManagerAdministrationProtocol {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(MockResourceManagerFacade.class);
@@ -508,4 +536,92 @@ public class MockResourceManagerFacade
       throws YarnException, IOException {
     return UpdateApplicationTimeoutsResponse.newInstance();
   }
+
+  @Override
+  public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
+      throws StandbyException, YarnException, IOException {
+    return RefreshQueuesResponse.newInstance();
+  }
+
+  @Override
+  public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
+      throws StandbyException, YarnException, IOException {
+    return RefreshNodesResponse.newInstance();
+  }
+
+  @Override
+  public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+      RefreshSuperUserGroupsConfigurationRequest request)
+      throws StandbyException, YarnException, IOException {
+    return RefreshSuperUserGroupsConfigurationResponse.newInstance();
+  }
+
+  @Override
+  public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+      RefreshUserToGroupsMappingsRequest request)
+      throws StandbyException, YarnException, IOException {
+    return RefreshUserToGroupsMappingsResponse.newInstance();
+  }
+
+  @Override
+  public RefreshAdminAclsResponse refreshAdminAcls(
+      RefreshAdminAclsRequest request) throws YarnException, IOException {
+    return RefreshAdminAclsResponse.newInstance();
+  }
+
+  @Override
+  public RefreshServiceAclsResponse refreshServiceAcls(
+      RefreshServiceAclsRequest request) throws YarnException, IOException {
+    return RefreshServiceAclsResponse.newInstance();
+  }
+
+  @Override
+  public UpdateNodeResourceResponse updateNodeResource(
+      UpdateNodeResourceRequest request) throws YarnException, IOException {
+    return UpdateNodeResourceResponse.newInstance();
+  }
+
+  @Override
+  public RefreshNodesResourcesResponse refreshNodesResources(
+      RefreshNodesResourcesRequest request) throws YarnException, IOException {
+    return RefreshNodesResourcesResponse.newInstance();
+  }
+
+  @Override
+  public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
+      AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
+    return AddToClusterNodeLabelsResponse.newInstance();
+  }
+
+  @Override
+  public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+      RemoveFromClusterNodeLabelsRequest request)
+      throws YarnException, IOException {
+    return RemoveFromClusterNodeLabelsResponse.newInstance();
+  }
+
+  @Override
+  public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
+      ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+    return ReplaceLabelsOnNodeResponse.newInstance();
+  }
+
+  @Override
+  public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+      CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
+      throws YarnException, IOException {
+    return CheckForDecommissioningNodesResponse.newInstance(null);
+  }
+
+  @Override
+  public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+      RefreshClusterMaxPriorityRequest request)
+      throws YarnException, IOException {
+    return RefreshClusterMaxPriorityResponse.newInstance();
+  }
+
+  @Override
+  public String[] getGroupsForUser(String user) throws IOException {
+    return new String[0];
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
index 7cfabf5..d2eee5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/Router.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.router.clientrm.RouterClientRMService;
+import org.apache.hadoop.yarn.server.router.rmadmin.RouterRMAdminService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +55,7 @@ public class Router extends CompositeService {
   private Configuration conf;
   private AtomicBoolean isStopping = new AtomicBoolean(false);
   private RouterClientRMService clientRMProxyService;
+  private RouterRMAdminService rmAdminProxyService;
 
   /**
    * Priority of the Router shutdown hook.
@@ -71,8 +73,12 @@ public class Router extends CompositeService {
   @Override
   protected void serviceInit(Configuration config) throws Exception {
     this.conf = config;
+    // ClientRM Proxy
     clientRMProxyService = createClientRMProxyService();
     addService(clientRMProxyService);
+    // RMAdmin Proxy
+    rmAdminProxyService = createRMAdminProxyService();
+    addService(rmAdminProxyService);
     super.serviceInit(conf);
   }
 
@@ -107,6 +113,10 @@ public class Router extends CompositeService {
     return new RouterClientRMService();
   }
 
+  protected RouterRMAdminService createRMAdminProxyService() {
+    return new RouterRMAdminService();
+  }
+
   public static void main(String[] argv) {
     Configuration conf = new YarnConfiguration();
     Thread

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
index fc6a118..5980b03 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
@@ -21,8 +21,9 @@ package org.apache.hadoop.yarn.server.router.clientrm;
 import org.apache.hadoop.conf.Configuration;
 
 /**
- * Implements the RequestInterceptor interface and provides common functionality
- * which can can be used and/or extended by other concrete intercepter classes.
+ * Implements the {@link ClientRequestInterceptor} interface and provides common
+ * functionality which can can be used and/or extended by other concrete
+ * intercepter classes.
  *
  */
 public abstract class AbstractClientRequestInterceptor
@@ -31,7 +32,7 @@ public abstract class AbstractClientRequestInterceptor
   private ClientRequestInterceptor nextInterceptor;
 
   /**
-   * Sets the {@code RequestInterceptor} in the chain.
+   * Sets the {@link ClientRequestInterceptor} in the chain.
    */
   @Override
   public void setNextInterceptor(ClientRequestInterceptor nextInterceptor) {
@@ -59,7 +60,7 @@ public abstract class AbstractClientRequestInterceptor
   }
 
   /**
-   * Initializes the {@code ClientRequestInterceptor}.
+   * Initializes the {@link ClientRequestInterceptor}.
    */
   @Override
   public void init(String user) {
@@ -69,7 +70,7 @@ public abstract class AbstractClientRequestInterceptor
   }
 
   /**
-   * Disposes the {@code ClientRequestInterceptor}.
+   * Disposes the {@link ClientRequestInterceptor}.
    */
   @Override
   public void shutdown() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
index 12b933b..9e2bfed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
@@ -91,7 +91,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 
 /**
- * Extends the AbstractRequestInterceptorClient class and provides an
+ * Extends the {@code AbstractRequestInterceptorClient} class and provides an
  * implementation that simply forwards the client requests to the cluster
  * resource manager.
  *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
index 00016dd..fd2c610 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/RouterClientRMService.java
@@ -104,10 +104,11 @@ import com.google.common.annotations.VisibleForTesting;
 
 /**
  * RouterClientRMService is a service that runs on each router that can be used
- * to intercept and inspect ApplicationClientProtocol messages from client to
- * the cluster resource manager. It listens ApplicationClientProtocol messages
- * from the client and creates a request intercepting pipeline instance for each
- * client. The pipeline is a chain of intercepter instances that can inspect and
+ * to intercept and inspect {@link ApplicationClientProtocol} messages from
+ * client to the cluster resource manager. It listens
+ * {@link ApplicationClientProtocol} messages from the client and creates a
+ * request intercepting pipeline instance for each client. The pipeline is a
+ * chain of {@link ClientRequestInterceptor} instances that can inspect and
  * modify the request/response as needed. The main difference with
  * AMRMProxyService is the protocol they implement.
  */
@@ -137,13 +138,14 @@ public class RouterClientRMService extends AbstractService
     UserGroupInformation.setConfiguration(conf);
 
     this.listenerEndpoint =
-        conf.getSocketAddr(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
+        conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
+            YarnConfiguration.ROUTER_CLIENTRM_ADDRESS,
             YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_ADDRESS,
             YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PORT);
 
     int maxCacheSize =
-        conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
-            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE);
+        conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+            YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
     this.userPipelineMap = Collections.synchronizedMap(
         new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
             maxCacheSize, true));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java
new file mode 100644
index 0000000..a4972fc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/AbstractRMAdminRequestInterceptor.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Implements the {@link RMAdminRequestInterceptor} interface and provides
+ * common functionality which can can be used and/or extended by other concrete
+ * intercepter classes.
+ *
+ */
+public abstract class AbstractRMAdminRequestInterceptor
+    implements RMAdminRequestInterceptor {
+  private Configuration conf;
+  private RMAdminRequestInterceptor nextInterceptor;
+
+  /**
+   * Sets the {@link RMAdminRequestInterceptor} in the chain.
+   */
+  @Override
+  public void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor) {
+    this.nextInterceptor = nextInterceptor;
+  }
+
+  /**
+   * Sets the {@link Configuration}.
+   */
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.setConf(conf);
+    }
+  }
+
+  /**
+   * Gets the {@link Configuration}.
+   */
+  @Override
+  public Configuration getConf() {
+    return this.conf;
+  }
+
+  /**
+   * Initializes the {@link RMAdminRequestInterceptor}.
+   */
+  @Override
+  public void init(String user) {
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.init(user);
+    }
+  }
+
+  /**
+   * Disposes the {@link RMAdminRequestInterceptor}.
+   */
+  @Override
+  public void shutdown() {
+    if (this.nextInterceptor != null) {
+      this.nextInterceptor.shutdown();
+    }
+  }
+
+  /**
+   * Gets the next {@link RMAdminRequestInterceptor} in the chain.
+   */
+  @Override
+  public RMAdminRequestInterceptor getNextInterceptor() {
+    return this.nextInterceptor;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
new file mode 100644
index 0000000..7e6a1ff
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/DefaultRMAdminRequestInterceptor.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Extends the {@link AbstractRMAdminRequestInterceptor} class and provides an
+ * implementation that simply forwards the client requests to the cluster
+ * resource manager.
+ *
+ */
+public class DefaultRMAdminRequestInterceptor
+    extends AbstractRMAdminRequestInterceptor {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DefaultRMAdminRequestInterceptor.class);
+  private ResourceManagerAdministrationProtocol rmAdminProxy;
+  private UserGroupInformation user = null;
+
+  @Override
+  public void init(String userName) {
+    super.init(userName);
+    try {
+      // Do not create a proxy user if user name matches the user name on
+      // current UGI
+      if (userName.equalsIgnoreCase(
+          UserGroupInformation.getCurrentUser().getUserName())) {
+        user = UserGroupInformation.getCurrentUser();
+      } else {
+        user = UserGroupInformation.createProxyUser(userName,
+            UserGroupInformation.getCurrentUser());
+      }
+
+      final Configuration conf = this.getConf();
+
+      rmAdminProxy = user.doAs(
+          new PrivilegedExceptionAction<ResourceManagerAdministrationProtocol>() {
+            @Override
+            public ResourceManagerAdministrationProtocol run()
+                throws Exception {
+              return ClientRMProxy.createRMProxy(conf,
+                  ResourceManagerAdministrationProtocol.class);
+            }
+          });
+    } catch (IOException e) {
+      String message = "Error while creating Router RMAdmin Service for user:";
+      if (user != null) {
+        message += ", user: " + user;
+      }
+
+      LOG.info(message);
+      throw new YarnRuntimeException(message, e);
+    } catch (Exception e) {
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  @Override
+  public void setNextInterceptor(RMAdminRequestInterceptor next) {
+    throw new YarnRuntimeException("setNextInterceptor is being called on "
+        + "DefaultRMAdminRequestInterceptor, which should be the last one "
+        + "in the chain. Check if the interceptor pipeline configuration "
+        + "is correct");
+  }
+
+  @VisibleForTesting
+  public void setRMAdmin(ResourceManagerAdministrationProtocol rmAdmin) {
+    this.rmAdminProxy = rmAdmin;
+
+  }
+
+  @Override
+  public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
+      throws StandbyException, YarnException, IOException {
+    return rmAdminProxy.refreshQueues(request);
+  }
+
+  @Override
+  public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
+      throws StandbyException, YarnException, IOException {
+    return rmAdminProxy.refreshNodes(request);
+  }
+
+  @Override
+  public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+      RefreshSuperUserGroupsConfigurationRequest request)
+      throws StandbyException, YarnException, IOException {
+    return rmAdminProxy.refreshSuperUserGroupsConfiguration(request);
+  }
+
+  @Override
+  public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+      RefreshUserToGroupsMappingsRequest request)
+      throws StandbyException, YarnException, IOException {
+    return rmAdminProxy.refreshUserToGroupsMappings(request);
+  }
+
+  @Override
+  public RefreshAdminAclsResponse refreshAdminAcls(
+      RefreshAdminAclsRequest request) throws YarnException, IOException {
+    return rmAdminProxy.refreshAdminAcls(request);
+  }
+
+  @Override
+  public RefreshServiceAclsResponse refreshServiceAcls(
+      RefreshServiceAclsRequest request) throws YarnException, IOException {
+    return rmAdminProxy.refreshServiceAcls(request);
+  }
+
+  @Override
+  public UpdateNodeResourceResponse updateNodeResource(
+      UpdateNodeResourceRequest request) throws YarnException, IOException {
+    return rmAdminProxy.updateNodeResource(request);
+  }
+
+  @Override
+  public RefreshNodesResourcesResponse refreshNodesResources(
+      RefreshNodesResourcesRequest request) throws YarnException, IOException {
+    return rmAdminProxy.refreshNodesResources(request);
+  }
+
+  @Override
+  public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
+      AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
+    return rmAdminProxy.addToClusterNodeLabels(request);
+  }
+
+  @Override
+  public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+      RemoveFromClusterNodeLabelsRequest request)
+      throws YarnException, IOException {
+    return rmAdminProxy.removeFromClusterNodeLabels(request);
+  }
+
+  @Override
+  public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
+      ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+    return rmAdminProxy.replaceLabelsOnNode(request);
+  }
+
+  @Override
+  public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+      CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
+      throws YarnException, IOException {
+    return rmAdminProxy
+        .checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
+  }
+
+  @Override
+  public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+      RefreshClusterMaxPriorityRequest request)
+      throws YarnException, IOException {
+    return rmAdminProxy.refreshClusterMaxPriority(request);
+  }
+
+  @Override
+  public String[] getGroupsForUser(String userName) throws IOException {
+    return rmAdminProxy.getGroupsForUser(userName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminRequestInterceptor.java
new file mode 100644
index 0000000..dc4bda0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RMAdminRequestInterceptor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+
+/**
+ * Defines the contract to be implemented by the request intercepter classes,
+ * that can be used to intercept and inspect messages sent from the client to
+ * the resource manager.
+ */
+public interface RMAdminRequestInterceptor
+    extends ResourceManagerAdministrationProtocol, Configurable {
+  /**
+   * This method is called for initializing the intercepter. This is guaranteed
+   * to be called only once in the lifetime of this instance.
+   *
+   * @param user the name of the client
+   */
+  void init(String user);
+
+  /**
+   * This method is called to release the resources held by the intercepter.
+   * This will be called when the application pipeline is being destroyed. The
+   * concrete implementations should dispose the resources and forward the
+   * request to the next intercepter, if any.
+   */
+  void shutdown();
+
+  /**
+   * Sets the next intercepter in the pipeline. The concrete implementation of
+   * this interface should always pass the request to the nextInterceptor after
+   * inspecting the message. The last intercepter in the chain is responsible to
+   * send the messages to the resource manager service and so the last
+   * intercepter will not receive this method call.
+   *
+   * @param nextInterceptor the RMAdminRequestInterceptor to set in the pipeline
+   */
+  void setNextInterceptor(RMAdminRequestInterceptor nextInterceptor);
+
+  /**
+   * Returns the next intercepter in the chain.
+   *
+   * @return the next intercepter in the chain
+   */
+  RMAdminRequestInterceptor getNextInterceptor();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
new file mode 100644
index 0000000..b8b7ad8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/RouterRMAdminService.java
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.ResourceManagerAdministrationProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * RouterRMAdminService is a service that runs on each router that can be used
+ * to intercept and inspect {@code ResourceManagerAdministrationProtocol}
+ * messages from client to the cluster resource manager. It listens
+ * {@code ResourceManagerAdministrationProtocol} messages from the client and
+ * creates a request intercepting pipeline instance for each client. The
+ * pipeline is a chain of intercepter instances that can inspect and modify the
+ * request/response as needed. The main difference with AMRMProxyService is the
+ * protocol they implement.
+ */
+public class RouterRMAdminService extends AbstractService
+    implements ResourceManagerAdministrationProtocol {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterRMAdminService.class);
+
+  private Server server;
+  private InetSocketAddress listenerEndpoint;
+
+  // For each user we store an interceptors' pipeline.
+  // For performance issue we use LRU cache to keep in memory the newest ones
+  // and remove the oldest used ones.
+  private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
+
+  public RouterRMAdminService() {
+    super(RouterRMAdminService.class.getName());
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting Router RMAdmin Service");
+    Configuration conf = getConfig();
+    YarnRPC rpc = YarnRPC.create(conf);
+    UserGroupInformation.setConfiguration(conf);
+
+    this.listenerEndpoint =
+        conf.getSocketAddr(YarnConfiguration.ROUTER_BIND_HOST,
+            YarnConfiguration.ROUTER_RMADMIN_ADDRESS,
+            YarnConfiguration.DEFAULT_ROUTER_RMADMIN_ADDRESS,
+            YarnConfiguration.DEFAULT_ROUTER_RMADMIN_PORT);
+
+    int maxCacheSize =
+        conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+            YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
+    this.userPipelineMap = Collections.synchronizedMap(
+        new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
+            maxCacheSize, true));
+
+    Configuration serverConf = new Configuration(conf);
+
+    int numWorkerThreads =
+        serverConf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
+            YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT);
+
+    this.server = rpc.getServer(ResourceManagerAdministrationProtocol.class,
+        this, listenerEndpoint, serverConf, null, numWorkerThreads);
+
+    this.server.start();
+    LOG.info("Router RMAdminService listening on address: "
+        + this.server.getListenerAddress());
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    LOG.info("Stopping Router RMAdminService");
+    if (this.server != null) {
+      this.server.stop();
+    }
+    userPipelineMap.clear();
+    super.serviceStop();
+  }
+
+  /**
+   * Returns the comma separated intercepter class names from the configuration.
+   *
+   * @param conf
+   * @return the intercepter class names as an instance of ArrayList
+   */
+  private List<String> getInterceptorClassNames(Configuration conf) {
+    String configuredInterceptorClassNames =
+        conf.get(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
+            YarnConfiguration.DEFAULT_ROUTER_RMADMIN_INTERCEPTOR_CLASS);
+
+    List<String> interceptorClassNames = new ArrayList<String>();
+    Collection<String> tempList =
+        StringUtils.getStringCollection(configuredInterceptorClassNames);
+    for (String item : tempList) {
+      interceptorClassNames.add(item.trim());
+    }
+
+    return interceptorClassNames;
+  }
+
+  private RequestInterceptorChainWrapper getInterceptorChain()
+      throws IOException {
+    String user = UserGroupInformation.getCurrentUser().getUserName();
+    if (!userPipelineMap.containsKey(user)) {
+      initializePipeline(user);
+    }
+    return userPipelineMap.get(user);
+  }
+
+  /**
+   * Gets the Request intercepter chains for all the users.
+   *
+   * @return the request intercepter chains.
+   */
+  @VisibleForTesting
+  protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
+    return this.userPipelineMap;
+  }
+
+  /**
+   * This method creates and returns reference of the first intercepter in the
+   * chain of request intercepter instances.
+   *
+   * @return the reference of the first intercepter in the chain
+   */
+  @VisibleForTesting
+  protected RMAdminRequestInterceptor createRequestInterceptorChain() {
+    Configuration conf = getConfig();
+
+    List<String> interceptorClassNames = getInterceptorClassNames(conf);
+
+    RMAdminRequestInterceptor pipeline = null;
+    RMAdminRequestInterceptor current = null;
+    for (String interceptorClassName : interceptorClassNames) {
+      try {
+        Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
+        if (RMAdminRequestInterceptor.class
+            .isAssignableFrom(interceptorClass)) {
+          RMAdminRequestInterceptor interceptorInstance =
+              (RMAdminRequestInterceptor) ReflectionUtils
+                  .newInstance(interceptorClass, conf);
+          if (pipeline == null) {
+            pipeline = interceptorInstance;
+            current = interceptorInstance;
+            continue;
+          } else {
+            current.setNextInterceptor(interceptorInstance);
+            current = interceptorInstance;
+          }
+        } else {
+          throw new YarnRuntimeException(
+              "Class: " + interceptorClassName + " not instance of "
+                  + RMAdminRequestInterceptor.class.getCanonicalName());
+        }
+      } catch (ClassNotFoundException e) {
+        throw new YarnRuntimeException(
+            "Could not instantiate RMAdminRequestInterceptor: "
+                + interceptorClassName,
+            e);
+      }
+    }
+
+    if (pipeline == null) {
+      throw new YarnRuntimeException(
+          "RequestInterceptor pipeline is not configured in the system");
+    }
+    return pipeline;
+  }
+
+  /**
+   * Initializes the request intercepter pipeline for the specified user.
+   *
+   * @param user
+   */
+  private void initializePipeline(String user) {
+    RequestInterceptorChainWrapper chainWrapper = null;
+    synchronized (this.userPipelineMap) {
+      if (this.userPipelineMap.containsKey(user)) {
+        LOG.info("Request to start an already existing user: {}"
+            + " was received, so ignoring.", user);
+        return;
+      }
+
+      chainWrapper = new RequestInterceptorChainWrapper();
+      this.userPipelineMap.put(user, chainWrapper);
+    }
+
+    // We register the pipeline instance in the map first and then initialize it
+    // later because chain initialization can be expensive and we would like to
+    // release the lock as soon as possible to prevent other applications from
+    // blocking when one application's chain is initializing
+    LOG.info("Initializing request processing pipeline for the user: {}", user);
+
+    try {
+      RMAdminRequestInterceptor interceptorChain =
+          this.createRequestInterceptorChain();
+      interceptorChain.init(user);
+      chainWrapper.init(interceptorChain);
+    } catch (Exception e) {
+      synchronized (this.userPipelineMap) {
+        this.userPipelineMap.remove(user);
+      }
+      throw e;
+    }
+  }
+
+  /**
+   * Private structure for encapsulating RequestInterceptor and user instances.
+   *
+   */
+  @Private
+  public static class RequestInterceptorChainWrapper {
+    private RMAdminRequestInterceptor rootInterceptor;
+
+    /**
+     * Initializes the wrapper with the specified parameters.
+     *
+     * @param interceptor the first interceptor in the pipeline
+     */
+    public synchronized void init(RMAdminRequestInterceptor interceptor) {
+      this.rootInterceptor = interceptor;
+    }
+
+    /**
+     * Gets the root request intercepter.
+     *
+     * @return the root request intercepter
+     */
+    public synchronized RMAdminRequestInterceptor getRootInterceptor() {
+      return rootInterceptor;
+    }
+
+    /**
+     * Shutdown the chain of interceptors when the object is destroyed.
+     */
+    @Override
+    protected void finalize() {
+      rootInterceptor.shutdown();
+    }
+  }
+
+  @Override
+  public String[] getGroupsForUser(String user) throws IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().getGroupsForUser(user);
+  }
+
+  @Override
+  public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
+      throws StandbyException, YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().refreshQueues(request);
+
+  }
+
+  @Override
+  public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
+      throws StandbyException, YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().refreshNodes(request);
+
+  }
+
+  @Override
+  public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+      RefreshSuperUserGroupsConfigurationRequest request)
+      throws StandbyException, YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor()
+        .refreshSuperUserGroupsConfiguration(request);
+
+  }
+
+  @Override
+  public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+      RefreshUserToGroupsMappingsRequest request)
+      throws StandbyException, YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().refreshUserToGroupsMappings(request);
+
+  }
+
+  @Override
+  public RefreshAdminAclsResponse refreshAdminAcls(
+      RefreshAdminAclsRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().refreshAdminAcls(request);
+
+  }
+
+  @Override
+  public RefreshServiceAclsResponse refreshServiceAcls(
+      RefreshServiceAclsRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().refreshServiceAcls(request);
+
+  }
+
+  @Override
+  public UpdateNodeResourceResponse updateNodeResource(
+      UpdateNodeResourceRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().updateNodeResource(request);
+
+  }
+
+  @Override
+  public RefreshNodesResourcesResponse refreshNodesResources(
+      RefreshNodesResourcesRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().refreshNodesResources(request);
+
+  }
+
+  @Override
+  public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
+      AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().addToClusterNodeLabels(request);
+
+  }
+
+  @Override
+  public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+      RemoveFromClusterNodeLabelsRequest request)
+      throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().removeFromClusterNodeLabels(request);
+
+  }
+
+  @Override
+  public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
+      ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().replaceLabelsOnNode(request);
+
+  }
+
+  @Override
+  public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+      CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
+      throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor()
+        .checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
+  }
+
+  @Override
+  public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+      RefreshClusterMaxPriorityRequest request)
+      throws YarnException, IOException {
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain();
+    return pipeline.getRootInterceptor().refreshClusterMaxPriority(request);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/package-info.java
new file mode 100644
index 0000000..98a7ed0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/rmadmin/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Router RM Admin Proxy Service package. **/
+package org.apache.hadoop.yarn.server.router.rmadmin;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
index a283a62..7e15084 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
@@ -133,7 +133,7 @@ public abstract class BaseRouterClientRMTest {
             + "," + mockPassThroughInterceptorClass + ","
             + MockClientRequestInterceptor.class.getName());
 
-    this.conf.setInt(YarnConfiguration.ROUTER_CLIENTRM_PIPELINE_CACHE_MAX_SIZE,
+    this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
         TEST_MAX_CACHE_SIZE);
 
     this.dispatcher = new AsyncDispatcher();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java
new file mode 100644
index 0000000..d3eba61
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/BaseRouterRMAdminTest.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+
+/**
+ * Base class for all the RouterRMAdminService test cases. It provides utility
+ * methods that can be used by the concrete test case classes.
+ *
+ */
+public abstract class BaseRouterRMAdminTest {
+
+  /**
+   * The RouterRMAdminService instance that will be used by all the test cases.
+   */
+  private MockRouterRMAdminService rmAdminService;
+  /**
+   * Thread pool used for asynchronous operations.
+   */
+  private static ExecutorService threadpool = Executors.newCachedThreadPool();
+  private Configuration conf;
+  private AsyncDispatcher dispatcher;
+
+  public final static int TEST_MAX_CACHE_SIZE = 10;
+
+  protected MockRouterRMAdminService getRouterRMAdminService() {
+    Assert.assertNotNull(this.rmAdminService);
+    return this.rmAdminService;
+  }
+
+  @Before
+  public void setUp() {
+    this.conf = new YarnConfiguration();
+    String mockPassThroughInterceptorClass =
+        PassThroughRMAdminRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain will call the mock resource manager. The others in the chain will
+    // simply forward it to the next one in the chain
+    this.conf.set(YarnConfiguration.ROUTER_RMADMIN_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+            + "," + mockPassThroughInterceptorClass + ","
+            + MockRMAdminRequestInterceptor.class.getName());
+
+    this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+        TEST_MAX_CACHE_SIZE);
+
+    this.dispatcher = new AsyncDispatcher();
+    this.dispatcher.init(conf);
+    this.dispatcher.start();
+    this.rmAdminService = createAndStartRouterRMAdminService();
+  }
+
+  @After
+  public void tearDown() {
+    if (rmAdminService != null) {
+      rmAdminService.stop();
+      rmAdminService = null;
+    }
+    if (this.dispatcher != null) {
+      this.dispatcher.stop();
+    }
+  }
+
+  protected ExecutorService getThreadPool() {
+    return threadpool;
+  }
+
+  protected MockRouterRMAdminService createAndStartRouterRMAdminService() {
+    MockRouterRMAdminService svc = new MockRouterRMAdminService();
+    svc.init(conf);
+    svc.start();
+    return svc;
+  }
+
+  protected static class MockRouterRMAdminService extends RouterRMAdminService {
+    public MockRouterRMAdminService() {
+      super();
+    }
+  }
+
+  protected RefreshQueuesResponse refreshQueues(String user)
+      throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<RefreshQueuesResponse>() {
+          @Override
+          public RefreshQueuesResponse run() throws Exception {
+            RefreshQueuesRequest req = RefreshQueuesRequest.newInstance();
+            RefreshQueuesResponse response =
+                getRouterRMAdminService().refreshQueues(req);
+            return response;
+          }
+        });
+  }
+
+  protected RefreshNodesResponse refreshNodes(String user)
+      throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<RefreshNodesResponse>() {
+          @Override
+          public RefreshNodesResponse run() throws Exception {
+            RefreshNodesRequest req = RefreshNodesRequest.newInstance();
+            RefreshNodesResponse response =
+                getRouterRMAdminService().refreshNodes(req);
+            return response;
+          }
+        });
+  }
+
+  protected RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+      String user) throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user).doAs(
+        new PrivilegedExceptionAction<RefreshSuperUserGroupsConfigurationResponse>() {
+          @Override
+          public RefreshSuperUserGroupsConfigurationResponse run()
+              throws Exception {
+            RefreshSuperUserGroupsConfigurationRequest req =
+                RefreshSuperUserGroupsConfigurationRequest.newInstance();
+            RefreshSuperUserGroupsConfigurationResponse response =
+                getRouterRMAdminService()
+                    .refreshSuperUserGroupsConfiguration(req);
+            return response;
+          }
+        });
+  }
+
+  protected RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+      String user) throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user).doAs(
+        new PrivilegedExceptionAction<RefreshUserToGroupsMappingsResponse>() {
+          @Override
+          public RefreshUserToGroupsMappingsResponse run() throws Exception {
+            RefreshUserToGroupsMappingsRequest req =
+                RefreshUserToGroupsMappingsRequest.newInstance();
+            RefreshUserToGroupsMappingsResponse response =
+                getRouterRMAdminService().refreshUserToGroupsMappings(req);
+            return response;
+          }
+        });
+  }
+
+  protected RefreshAdminAclsResponse refreshAdminAcls(String user)
+      throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<RefreshAdminAclsResponse>() {
+          @Override
+          public RefreshAdminAclsResponse run() throws Exception {
+            RefreshAdminAclsRequest req = RefreshAdminAclsRequest.newInstance();
+            RefreshAdminAclsResponse response =
+                getRouterRMAdminService().refreshAdminAcls(req);
+            return response;
+          }
+        });
+  }
+
+  protected RefreshServiceAclsResponse refreshServiceAcls(String user)
+      throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<RefreshServiceAclsResponse>() {
+          @Override
+          public RefreshServiceAclsResponse run() throws Exception {
+            RefreshServiceAclsRequest req =
+                RefreshServiceAclsRequest.newInstance();
+            RefreshServiceAclsResponse response =
+                getRouterRMAdminService().refreshServiceAcls(req);
+            return response;
+          }
+        });
+  }
+
+  protected UpdateNodeResourceResponse updateNodeResource(String user)
+      throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<UpdateNodeResourceResponse>() {
+          @Override
+          public UpdateNodeResourceResponse run() throws Exception {
+            UpdateNodeResourceRequest req =
+                UpdateNodeResourceRequest.newInstance(null);
+            UpdateNodeResourceResponse response =
+                getRouterRMAdminService().updateNodeResource(req);
+            return response;
+          }
+        });
+  }
+
+  protected RefreshNodesResourcesResponse refreshNodesResources(String user)
+      throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<RefreshNodesResourcesResponse>() {
+          @Override
+          public RefreshNodesResourcesResponse run() throws Exception {
+            RefreshNodesResourcesRequest req =
+                RefreshNodesResourcesRequest.newInstance();
+            RefreshNodesResourcesResponse response =
+                getRouterRMAdminService().refreshNodesResources(req);
+            return response;
+          }
+        });
+  }
+
+  protected AddToClusterNodeLabelsResponse addToClusterNodeLabels(String user)
+      throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<AddToClusterNodeLabelsResponse>() {
+          @Override
+          public AddToClusterNodeLabelsResponse run() throws Exception {
+            AddToClusterNodeLabelsRequest req =
+                AddToClusterNodeLabelsRequest.newInstance(null);
+            AddToClusterNodeLabelsResponse response =
+                getRouterRMAdminService().addToClusterNodeLabels(req);
+            return response;
+          }
+        });
+  }
+
+  protected RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+      String user) throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user).doAs(
+        new PrivilegedExceptionAction<RemoveFromClusterNodeLabelsResponse>() {
+          @Override
+          public RemoveFromClusterNodeLabelsResponse run() throws Exception {
+            RemoveFromClusterNodeLabelsRequest req =
+                RemoveFromClusterNodeLabelsRequest.newInstance(null);
+            RemoveFromClusterNodeLabelsResponse response =
+                getRouterRMAdminService().removeFromClusterNodeLabels(req);
+            return response;
+          }
+        });
+  }
+
+  protected ReplaceLabelsOnNodeResponse replaceLabelsOnNode(String user)
+      throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<ReplaceLabelsOnNodeResponse>() {
+          @Override
+          public ReplaceLabelsOnNodeResponse run() throws Exception {
+            ReplaceLabelsOnNodeRequest req = ReplaceLabelsOnNodeRequest
+                .newInstance(new HashMap<NodeId, Set<String>>());
+            ReplaceLabelsOnNodeResponse response =
+                getRouterRMAdminService().replaceLabelsOnNode(req);
+            return response;
+          }
+        });
+  }
+
+  protected CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+      String user) throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user).doAs(
+        new PrivilegedExceptionAction<CheckForDecommissioningNodesResponse>() {
+          @Override
+          public CheckForDecommissioningNodesResponse run() throws Exception {
+            CheckForDecommissioningNodesRequest req =
+                CheckForDecommissioningNodesRequest.newInstance();
+            CheckForDecommissioningNodesResponse response =
+                getRouterRMAdminService().checkForDecommissioningNodes(req);
+            return response;
+          }
+        });
+  }
+
+  protected RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+      String user) throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user).doAs(
+        new PrivilegedExceptionAction<RefreshClusterMaxPriorityResponse>() {
+          @Override
+          public RefreshClusterMaxPriorityResponse run() throws Exception {
+            RefreshClusterMaxPriorityRequest req =
+                RefreshClusterMaxPriorityRequest.newInstance();
+            RefreshClusterMaxPriorityResponse response =
+                getRouterRMAdminService().refreshClusterMaxPriority(req);
+            return response;
+          }
+        });
+  }
+
+  protected String[] getGroupsForUser(String user)
+      throws IOException, InterruptedException {
+    return UserGroupInformation.createRemoteUser(user)
+        .doAs(new PrivilegedExceptionAction<String[]>() {
+          @Override
+          public String[] run() throws Exception {
+            String[] response =
+                getRouterRMAdminService().getGroupsForUser(user);
+            return response;
+          }
+        });
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/MockRMAdminRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/MockRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/MockRMAdminRequestInterceptor.java
new file mode 100644
index 0000000..ab7bdb4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/MockRMAdminRequestInterceptor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
+
+/**
+ * This class mocks the RMAmdinRequestInterceptor.
+ */
+public class MockRMAdminRequestInterceptor
+    extends DefaultRMAdminRequestInterceptor {
+
+  public void init(String user) {
+    MockResourceManagerFacade mockRM = new MockResourceManagerFacade(
+        new YarnConfiguration(super.getConf()), 0);
+    super.setRMAdmin(mockRM);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/67846a55/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java
new file mode 100644
index 0000000..38dcc3d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/rmadmin/PassThroughRMAdminRequestInterceptor.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.rmadmin;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.AddToClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.CheckForDecommissioningNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshClusterMaxPriorityResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResourcesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshNodesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshQueuesResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshServiceAclsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.RemoveFromClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceResponse;
+
+/**
+ * Mock intercepter that does not do anything other than forwarding it to the
+ * next intercepter in the chain.
+ */
+public class PassThroughRMAdminRequestInterceptor
+    extends AbstractRMAdminRequestInterceptor {
+
+  @Override
+  public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
+      throws StandbyException, YarnException, IOException {
+    return getNextInterceptor().refreshQueues(request);
+  }
+
+  @Override
+  public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
+      throws StandbyException, YarnException, IOException {
+    return getNextInterceptor().refreshNodes(request);
+  }
+
+  @Override
+  public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
+      RefreshSuperUserGroupsConfigurationRequest request)
+      throws StandbyException, YarnException, IOException {
+    return getNextInterceptor().refreshSuperUserGroupsConfiguration(request);
+  }
+
+  @Override
+  public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
+      RefreshUserToGroupsMappingsRequest request)
+      throws StandbyException, YarnException, IOException {
+    return getNextInterceptor().refreshUserToGroupsMappings(request);
+  }
+
+  @Override
+  public RefreshAdminAclsResponse refreshAdminAcls(
+      RefreshAdminAclsRequest request) throws YarnException, IOException {
+    return getNextInterceptor().refreshAdminAcls(request);
+  }
+
+  @Override
+  public RefreshServiceAclsResponse refreshServiceAcls(
+      RefreshServiceAclsRequest request) throws YarnException, IOException {
+    return getNextInterceptor().refreshServiceAcls(request);
+  }
+
+  @Override
+  public UpdateNodeResourceResponse updateNodeResource(
+      UpdateNodeResourceRequest request) throws YarnException, IOException {
+    return getNextInterceptor().updateNodeResource(request);
+  }
+
+  @Override
+  public RefreshNodesResourcesResponse refreshNodesResources(
+      RefreshNodesResourcesRequest request) throws YarnException, IOException {
+    return getNextInterceptor().refreshNodesResources(request);
+  }
+
+  @Override
+  public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
+      AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
+    return getNextInterceptor().addToClusterNodeLabels(request);
+  }
+
+  @Override
+  public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
+      RemoveFromClusterNodeLabelsRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().removeFromClusterNodeLabels(request);
+  }
+
+  @Override
+  public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
+      ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+    return getNextInterceptor().replaceLabelsOnNode(request);
+  }
+
+  @Override
+  public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
+      CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
+      throws YarnException, IOException {
+    return getNextInterceptor()
+        .checkForDecommissioningNodes(checkForDecommissioningNodesRequest);
+  }
+
+  @Override
+  public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
+      RefreshClusterMaxPriorityRequest request)
+      throws YarnException, IOException {
+    return getNextInterceptor().refreshClusterMaxPriority(request);
+  }
+
+  @Override
+  public String[] getGroupsForUser(String user) throws IOException {
+    return getNextInterceptor().getGroupsForUser(user);
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org