You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2022/01/31 19:34:01 UTC

[GitHub] [hadoop] minni31 opened a new pull request #3948: YARN-7898. [FederationStateStore] Create a proxy chain for Federation…

minni31 opened a new pull request #3948:
URL: https://github.com/apache/hadoop/pull/3948


   …StateStore API in the Router
   
   <!--
     Thanks for sending a pull request!
       1. If this is your first time, please read our contributor guidelines: https://cwiki.apache.org/confluence/display/HADOOP/How+To+Contribute
       2. Make sure your PR title starts with JIRA issue id, e.g., 'HADOOP-17799. Your PR title ...'.
   -->
   
   ### Description of PR
   
   
   ### How was this patch tested?
   
   
   ### For code changes:
   
   - [ ] Does the title or this PR starts with the corresponding JIRA issue id (e.g. 'HADOOP-17799. Your PR title ...')?
   - [ ] Object storage: have the integration tests been executed and the endpoint declared according to the connector-specific documentation?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the `LICENSE`, `LICENSE-binary`, `NOTICE-binary` files?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org


[GitHub] [hadoop] goiri commented on a change in pull request #3948: YARN-7898. [FederationStateStore] Create a proxy chain for Federation…

Posted by GitBox <gi...@apache.org>.
goiri commented on a change in pull request #3948:
URL: https://github.com/apache/hadoop/pull/3948#discussion_r796841858



##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
##########
@@ -3427,6 +3427,37 @@ public static boolean isAclEnabled(Configuration conf) {
   public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
       false;
 
+  public static final String ROUTER_SSPROXY_PREFIX = ROUTER_PREFIX + "ssproxy.";
+
+  public static final String ROUTER_SSPROXY_INTERCEPTOR_CLASS_PIPELINE =
+      ROUTER_SSPROXY_PREFIX + "interceptor-class.pipeline";
+
+  public static final String DEFAULT_ROUTER_SSPROXY_INTERCEPTOR_CLASS =
+      "org.apache.hadoop.yarn.server.router.ssproxy."
+          + "StateStoreProxyDefaultInterceptor";
+
+  public static final String FEDERATION_STATESTORE_HTTP_PROXY_PREFIX =
+      FEDERATION_PREFIX + "state-store.http-proxy";
+
+  public static final String FEDERATION_STATESTORE_HTTP_PROXY_URL =
+      FEDERATION_STATESTORE_HTTP_PROXY_PREFIX + "url";
+
+  public static final String DEFAULT_FEDERATION_STATESTORE_HTTP_PROXY_URL =
+      DEFAULT_ROUTER_WEBAPP_ADDRESS;
+
+  public static final String
+      FEDERATION_STATESTORE_HTTP_PROXY_CONNECT_TIMEOUT_MS =
+      FEDERATION_STATESTORE_HTTP_PROXY_PREFIX + "connect.timeout-ms";
+
+  public static final int
+      DEFAULT_FEDERATION_STATESTORE_HTTP_PROXY_CONNECT_TIMEOUT_MS = 60000;
+
+  public static final String FEDERATION_STATESTORE_HTTP_PROXY_READ_TIMEOUT_MS =

Review comment:
       Can we leverage Cofngiruation#getTimeDuration()

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
##########
@@ -226,6 +204,17 @@ public static FederationStateStoreFacade getInstance() {
     return FACADE;
   }
 
+  /**
+   * Register a <em>subcluster</em> identified by {@code SubClusterId} to
+   * indicate participation in federation
+   *
+   * @param info the SubClusterInfo for the SubCluster that is participating
+   *             in federation
+   */
+  public void registerSubCluster(SubClusterInfo info) throws YarnException {
+    stateStore.registerSubCluster(SubClusterRegisterRequest.newInstance(info));

Review comment:
       Let's extract this newInstance.

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
##########
@@ -3427,6 +3427,37 @@ public static boolean isAclEnabled(Configuration conf) {
   public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
       false;
 
+  public static final String ROUTER_SSPROXY_PREFIX = ROUTER_PREFIX + "ssproxy.";
+
+  public static final String ROUTER_SSPROXY_INTERCEPTOR_CLASS_PIPELINE =
+      ROUTER_SSPROXY_PREFIX + "interceptor-class.pipeline";
+
+  public static final String DEFAULT_ROUTER_SSPROXY_INTERCEPTOR_CLASS =
+      "org.apache.hadoop.yarn.server.router.ssproxy."

Review comment:
       Leverage Configuration#getClass()

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java
##########
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.router.webapp.RouterWebServices;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * RouterStateStoreProxyWebServices is a service that runs on each router that
+ * can be used to intercept and inspect {@link StateStoreWebServiceProtocol}
+ * messages from client to the cluster FederationStateStore. It listens
+ * {@link StateStoreWebServiceProtocol} REST messages from the client and
+ * creates a request intercepting pipeline instance for each client. The
+ * pipeline is a chain of {@link StateStoreProxyInterceptor} instances that can
+ * inspect and modify the request/response as needed. The main difference with
+ * AMRMProxyService is the protocol they implement.
+ */
+@Singleton
+@Path(HttpProxyFederationStateStoreConsts.ROOT)
+public class RouterStateStoreProxyWebServices
+    implements StateStoreWebServiceProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterStateStoreProxyWebServices.class);
+  private final Router router;
+  private final Configuration conf;
+  private @Context HttpServletResponse response;
+
+  private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
+
+  @Inject
+  public RouterStateStoreProxyWebServices(final Router router,
+      Configuration conf) {
+    this.router = router;
+    this.conf = conf;
+    int maxCacheSize =

Review comment:
       Just for making it look better:
   ```
   int maxCacheSize = conf.getInt(
       YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
       YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
   ```

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationsHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterIdDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPoliciesConfigurationsDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterStateDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClustersInfoDAO;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends the {@code StateStoreProxyInterceptor} class and provides an
+ * implementation that simply forwards the client requests to the
+ * FederationStateStore.
+ */
+public class StateStoreProxyDefaultInterceptor
+    implements StateStoreProxyInterceptor {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(StateStoreProxyDefaultInterceptor.class.getName());
+
+  private FederationStateStoreFacade stateStoreFacade;
+  private @Context Configuration conf;
+
+  @Override
+  public void init(String user) {
+    this.stateStoreFacade = FederationStateStoreFacade.getInstance();
+  }
+
+  @Override
+  public void shutdown() {
+
+  }
+
+  @Override
+  public void setNextInterceptor(
+      StateStoreProxyInterceptor nextInterceptor) {
+    throw new YarnRuntimeException(
+        this.getClass().getName() + " should be the last interceptor");
+  }
+
+  @Override
+  public StateStoreProxyInterceptor getNextInterceptor() {
+    return null;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Response registerSubCluster(SubClusterInfoDAO scInfoDAO)
+      throws YarnException {
+    LOG.info("Registering SubCluster {}", scInfoDAO.subClusterId);
+    try {
+      stateStoreFacade.registerSubCluster(scInfoDAO.toSubClusterInfo());
+    } catch (YarnException e) {
+      LOG.error("Could not register SubCluster {}", scInfoDAO.subClusterId, e);
+      throw e;
+    }
+    LOG.info("Registered SubCluster {}", scInfoDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response deregisterSubCluster(
+      SubClusterDeregisterDAO deregisterDAO) throws YarnException {
+    SubClusterIdDAO scIdDAO = deregisterDAO.getSubClusterId();
+    SubClusterStateDAO scStateDAO = deregisterDAO.getScState();
+    LOG.info("Deregistering SubCluster {}", scIdDAO.subClusterId);
+    try {
+      stateStoreFacade
+          .deregisterSubCluster(SubClusterId.newInstance(scIdDAO.subClusterId),
+              scStateDAO.getSubClusterState());
+    } catch (YarnException e) {
+      LOG.error("Could not deregister SubCluster {}", scIdDAO.subClusterId, e);
+      throw e;
+    }
+    LOG.info("Deregistered SubCluster {}", scIdDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response subClusterHeartBeat(
+      SubClusterHeartbeatDAO heartbeatDAO) throws YarnException {
+    SubClusterIdDAO scIdDAO = heartbeatDAO.getSubClusterId();
+    SubClusterStateDAO scStateDAO = heartbeatDAO.getScState();
+    LOG.info("Heartbeating for SubCluster {}", scIdDAO.subClusterId);
+    try {
+      stateStoreFacade
+          .subClusterHeartBeat(SubClusterId.newInstance(scIdDAO.subClusterId),
+              scStateDAO.getSubClusterState(), heartbeatDAO.getCapability());
+    } catch (YarnException e) {
+      LOG.error("Could not heartbeat for SubCluster {}", scIdDAO.subClusterId,
+          e);
+      throw e;
+    }
+    LOG.info("Heartbeat for SubCluster {}", scIdDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response getSubCluster(String subClusterId)
+      throws YarnException {
+
+    if (subClusterId == null || subClusterId.isEmpty()) {
+      throw new NotFoundException("subClusterId is empty or null");
+    }
+
+    LOG.debug("Fetching subcluster info for subcluster: " + subClusterId);
+
+    SubClusterInfo resp =
+        stateStoreFacade.getSubCluster(SubClusterId.newInstance(subClusterId));
+
+    LOG.debug("Retrieved subcluster info for subcluster: " + subClusterId
+        + ". Subcluster details:" + resp);
+
+    if (resp == null) {
+      LOG.warn("Subcluster {} does not exist", subClusterId);
+      return Response.status(Response.Status.NOT_FOUND).build();
+    }
+
+    return Response.status(Response.Status.OK)
+        .entity(new SubClusterInfoDAO(resp)).build();
+  }
+
+  @Override
+  public Response getSubClusters(boolean filterInactiveSubClusters)
+      throws YarnException {
+    LOG.debug("Fetching info for all subclusters. filterInactiveSubClusters="
+        + filterInactiveSubClusters);
+
+    Map<SubClusterId, SubClusterInfo> resp =
+        stateStoreFacade.getSubClusters(filterInactiveSubClusters);
+
+    LOG.info(
+        "Retrieved subcluster info for all subclusters filter={} count is={}",
+        filterInactiveSubClusters, resp.size());
+
+    return Response.status(Response.Status.OK)
+        .entity(new SubClustersInfoDAO(resp.values())).build();
+  }
+
+  @Override
+  public Response getPoliciesConfigurations() throws YarnException {
+    LOG.debug("Fetching policy info for all queues");
+
+    Map<String, SubClusterPolicyConfiguration> resp =
+        stateStoreFacade.getPoliciesConfigurations();
+
+    LOG.debug(
+        "Retrieved policy info for all queues. Policy count is=" + resp.size());
+
+    return Response.status(Response.Status.OK)
+        .entity(new SubClusterPoliciesConfigurationsDAO(resp.values())).build();

Review comment:
       Extract values at least.

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java
##########
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.router.webapp.RouterWebServices;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * RouterStateStoreProxyWebServices is a service that runs on each router that
+ * can be used to intercept and inspect {@link StateStoreWebServiceProtocol}
+ * messages from client to the cluster FederationStateStore. It listens
+ * {@link StateStoreWebServiceProtocol} REST messages from the client and
+ * creates a request intercepting pipeline instance for each client. The
+ * pipeline is a chain of {@link StateStoreProxyInterceptor} instances that can
+ * inspect and modify the request/response as needed. The main difference with
+ * AMRMProxyService is the protocol they implement.
+ */
+@Singleton
+@Path(HttpProxyFederationStateStoreConsts.ROOT)
+public class RouterStateStoreProxyWebServices
+    implements StateStoreWebServiceProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterStateStoreProxyWebServices.class);
+  private final Router router;
+  private final Configuration conf;
+  private @Context HttpServletResponse response;
+
+  private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
+
+  @Inject
+  public RouterStateStoreProxyWebServices(final Router router,
+      Configuration conf) {
+    this.router = router;
+    this.conf = conf;
+    int maxCacheSize =
+        conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+            YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
+    this.userPipelineMap = Collections.synchronizedMap(
+        new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
+            maxCacheSize, true));
+  }
+
+  /**
+   * Returns the comma separated intercepter class names from the configuration.
+   *
+   * @param config
+   * @return the intercepter class names as an instance of ArrayList
+   */
+  private List<String> getInterceptorClassNames(Configuration config) {
+    String configuredInterceptorClassNames = config

Review comment:
       Move the get up.

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
##########
@@ -49,29 +49,7 @@
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
-import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
-import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
-import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
-import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
-import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
-import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
-import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
-import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
-import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
-import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.*;

Review comment:
       Avoid collapsing imports.

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java
##########
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.router.webapp.RouterWebServices;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * RouterStateStoreProxyWebServices is a service that runs on each router that
+ * can be used to intercept and inspect {@link StateStoreWebServiceProtocol}
+ * messages from client to the cluster FederationStateStore. It listens
+ * {@link StateStoreWebServiceProtocol} REST messages from the client and
+ * creates a request intercepting pipeline instance for each client. The
+ * pipeline is a chain of {@link StateStoreProxyInterceptor} instances that can
+ * inspect and modify the request/response as needed. The main difference with
+ * AMRMProxyService is the protocol they implement.
+ */
+@Singleton
+@Path(HttpProxyFederationStateStoreConsts.ROOT)
+public class RouterStateStoreProxyWebServices
+    implements StateStoreWebServiceProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterStateStoreProxyWebServices.class);
+  private final Router router;
+  private final Configuration conf;
+  private @Context HttpServletResponse response;
+
+  private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
+
+  @Inject
+  public RouterStateStoreProxyWebServices(final Router router,
+      Configuration conf) {
+    this.router = router;
+    this.conf = conf;
+    int maxCacheSize =
+        conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+            YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
+    this.userPipelineMap = Collections.synchronizedMap(
+        new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
+            maxCacheSize, true));
+  }
+
+  /**
+   * Returns the comma separated intercepter class names from the configuration.
+   *
+   * @param config
+   * @return the intercepter class names as an instance of ArrayList
+   */
+  private List<String> getInterceptorClassNames(Configuration config) {
+    String configuredInterceptorClassNames = config
+        .get(YarnConfiguration.ROUTER_SSPROXY_INTERCEPTOR_CLASS_PIPELINE,
+            YarnConfiguration.DEFAULT_ROUTER_SSPROXY_INTERCEPTOR_CLASS);
+
+    List<String> interceptorClassNames = new ArrayList<String>();
+    Collection<String> tempList =
+        StringUtils.getStringCollection(configuredInterceptorClassNames);
+    for (String item : tempList) {
+      interceptorClassNames.add(item.trim());

Review comment:
       BTW, why can't we just use the the collection coming from getStringCollection()?
   Configuration is also able to return a list of strings from Configuration#getStringCollection().

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationsHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterIdDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPoliciesConfigurationsDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterStateDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClustersInfoDAO;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends the {@code StateStoreProxyInterceptor} class and provides an
+ * implementation that simply forwards the client requests to the
+ * FederationStateStore.
+ */
+public class StateStoreProxyDefaultInterceptor
+    implements StateStoreProxyInterceptor {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(StateStoreProxyDefaultInterceptor.class.getName());
+
+  private FederationStateStoreFacade stateStoreFacade;
+  private @Context Configuration conf;
+
+  @Override
+  public void init(String user) {
+    this.stateStoreFacade = FederationStateStoreFacade.getInstance();
+  }
+
+  @Override
+  public void shutdown() {
+
+  }
+
+  @Override
+  public void setNextInterceptor(
+      StateStoreProxyInterceptor nextInterceptor) {
+    throw new YarnRuntimeException(
+        this.getClass().getName() + " should be the last interceptor");
+  }
+
+  @Override
+  public StateStoreProxyInterceptor getNextInterceptor() {
+    return null;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Response registerSubCluster(SubClusterInfoDAO scInfoDAO)
+      throws YarnException {
+    LOG.info("Registering SubCluster {}", scInfoDAO.subClusterId);
+    try {
+      stateStoreFacade.registerSubCluster(scInfoDAO.toSubClusterInfo());
+    } catch (YarnException e) {
+      LOG.error("Could not register SubCluster {}", scInfoDAO.subClusterId, e);
+      throw e;
+    }
+    LOG.info("Registered SubCluster {}", scInfoDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response deregisterSubCluster(
+      SubClusterDeregisterDAO deregisterDAO) throws YarnException {
+    SubClusterIdDAO scIdDAO = deregisterDAO.getSubClusterId();
+    SubClusterStateDAO scStateDAO = deregisterDAO.getScState();
+    LOG.info("Deregistering SubCluster {}", scIdDAO.subClusterId);
+    try {
+      stateStoreFacade
+          .deregisterSubCluster(SubClusterId.newInstance(scIdDAO.subClusterId),
+              scStateDAO.getSubClusterState());
+    } catch (YarnException e) {
+      LOG.error("Could not deregister SubCluster {}", scIdDAO.subClusterId, e);
+      throw e;
+    }
+    LOG.info("Deregistered SubCluster {}", scIdDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response subClusterHeartBeat(
+      SubClusterHeartbeatDAO heartbeatDAO) throws YarnException {
+    SubClusterIdDAO scIdDAO = heartbeatDAO.getSubClusterId();
+    SubClusterStateDAO scStateDAO = heartbeatDAO.getScState();
+    LOG.info("Heartbeating for SubCluster {}", scIdDAO.subClusterId);
+    try {
+      stateStoreFacade

Review comment:
       Extract.

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationsHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterIdDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPoliciesConfigurationsDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterStateDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClustersInfoDAO;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends the {@code StateStoreProxyInterceptor} class and provides an
+ * implementation that simply forwards the client requests to the
+ * FederationStateStore.
+ */
+public class StateStoreProxyDefaultInterceptor
+    implements StateStoreProxyInterceptor {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(StateStoreProxyDefaultInterceptor.class.getName());
+
+  private FederationStateStoreFacade stateStoreFacade;
+  private @Context Configuration conf;
+
+  @Override
+  public void init(String user) {
+    this.stateStoreFacade = FederationStateStoreFacade.getInstance();
+  }
+
+  @Override
+  public void shutdown() {
+
+  }
+
+  @Override
+  public void setNextInterceptor(
+      StateStoreProxyInterceptor nextInterceptor) {
+    throw new YarnRuntimeException(
+        this.getClass().getName() + " should be the last interceptor");
+  }
+
+  @Override
+  public StateStoreProxyInterceptor getNextInterceptor() {
+    return null;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Response registerSubCluster(SubClusterInfoDAO scInfoDAO)
+      throws YarnException {
+    LOG.info("Registering SubCluster {}", scInfoDAO.subClusterId);
+    try {
+      stateStoreFacade.registerSubCluster(scInfoDAO.toSubClusterInfo());
+    } catch (YarnException e) {
+      LOG.error("Could not register SubCluster {}", scInfoDAO.subClusterId, e);
+      throw e;
+    }
+    LOG.info("Registered SubCluster {}", scInfoDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response deregisterSubCluster(
+      SubClusterDeregisterDAO deregisterDAO) throws YarnException {
+    SubClusterIdDAO scIdDAO = deregisterDAO.getSubClusterId();
+    SubClusterStateDAO scStateDAO = deregisterDAO.getScState();
+    LOG.info("Deregistering SubCluster {}", scIdDAO.subClusterId);
+    try {
+      stateStoreFacade
+          .deregisterSubCluster(SubClusterId.newInstance(scIdDAO.subClusterId),

Review comment:
       Can we extract this a little?

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java
##########
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.router.webapp.RouterWebServices;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * RouterStateStoreProxyWebServices is a service that runs on each router that
+ * can be used to intercept and inspect {@link StateStoreWebServiceProtocol}
+ * messages from client to the cluster FederationStateStore. It listens
+ * {@link StateStoreWebServiceProtocol} REST messages from the client and
+ * creates a request intercepting pipeline instance for each client. The
+ * pipeline is a chain of {@link StateStoreProxyInterceptor} instances that can
+ * inspect and modify the request/response as needed. The main difference with
+ * AMRMProxyService is the protocol they implement.
+ */
+@Singleton
+@Path(HttpProxyFederationStateStoreConsts.ROOT)
+public class RouterStateStoreProxyWebServices
+    implements StateStoreWebServiceProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterStateStoreProxyWebServices.class);
+  private final Router router;
+  private final Configuration conf;
+  private @Context HttpServletResponse response;
+
+  private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
+
+  @Inject
+  public RouterStateStoreProxyWebServices(final Router router,
+      Configuration conf) {
+    this.router = router;
+    this.conf = conf;
+    int maxCacheSize =
+        conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+            YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
+    this.userPipelineMap = Collections.synchronizedMap(
+        new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
+            maxCacheSize, true));
+  }
+
+  /**
+   * Returns the comma separated intercepter class names from the configuration.
+   *
+   * @param config
+   * @return the intercepter class names as an instance of ArrayList
+   */
+  private List<String> getInterceptorClassNames(Configuration config) {
+    String configuredInterceptorClassNames = config
+        .get(YarnConfiguration.ROUTER_SSPROXY_INTERCEPTOR_CLASS_PIPELINE,
+            YarnConfiguration.DEFAULT_ROUTER_SSPROXY_INTERCEPTOR_CLASS);
+
+    List<String> interceptorClassNames = new ArrayList<String>();
+    Collection<String> tempList =
+        StringUtils.getStringCollection(configuredInterceptorClassNames);
+    for (String item : tempList) {
+      interceptorClassNames.add(item.trim());
+    }
+
+    return interceptorClassNames;
+  }
+
+  private void init() {
+    // clear content type
+    response.setContentType(null);
+  }
+
+  @POST
+  @Path(HttpProxyFederationStateStoreConsts.PATH_REGISTER)
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response registerSubCluster(
+      SubClusterInfoDAO scInfoDAO) throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor().registerSubCluster(scInfoDAO);
+  }
+
+  @POST
+  @Path(HttpProxyFederationStateStoreConsts.PATH_DEREGISTER)
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response deregisterSubCluster(
+      SubClusterDeregisterDAO deregisterDAO) throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor().deregisterSubCluster(deregisterDAO);
+  }
+
+  @POST @Path(HttpProxyFederationStateStoreConsts.PATH_HEARTBEAT)
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response subClusterHeartBeat(
+      SubClusterHeartbeatDAO heartbeatDAO) throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor().subClusterHeartBeat(heartbeatDAO);
+  }
+
+  @GET
+  @Path(HttpProxyFederationStateStoreConsts.PATH_SUBCLUSTERS_SCID)
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response getSubCluster(@PathParam(HttpProxyFederationStateStoreConsts.PARAM_SCID)
+      String subClusterId) throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor().getSubCluster(subClusterId);
+  }
+
+  @GET
+  @Path(HttpProxyFederationStateStoreConsts.PATH_SUBCLUSTERS)
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response getSubClusters(
+      @QueryParam(HttpProxyFederationStateStoreConsts.QUERY_SC_FILTER)
+      @DefaultValue("true") boolean filterInactiveSubClusters)
+      throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor()
+        .getSubClusters(filterInactiveSubClusters);
+  }
+
+  @GET
+  @Path(HttpProxyFederationStateStoreConsts.PATH_POLICY)
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response getPoliciesConfigurations()
+      throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor().getPoliciesConfigurations();
+  }
+
+  @GET
+  @Path(HttpProxyFederationStateStoreConsts.PATH_POLICY_QUEUE)
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response getPolicyConfiguration(
+      @PathParam(HttpProxyFederationStateStoreConsts.PARAM_QUEUE) String queue)
+      throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor().getPolicyConfiguration(queue);
+  }
+
+  @POST
+  @Path(HttpProxyFederationStateStoreConsts.PATH_POLICY)
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response setPolicyConfiguration(
+      SubClusterPolicyConfigurationDAO policyConf) throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor().setPolicyConfiguration(policyConf);
+  }
+
+  @POST
+  @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME)
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response addApplicationHomeSubCluster(
+      ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor()
+        .addApplicationHomeSubCluster(appHomeDAO);
+  }
+
+  @PUT
+  @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME)
+  @Consumes({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response updateApplicationHomeSubCluster(
+      ApplicationHomeSubClusterDAO appHomeDAO) throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor()
+        .updateApplicationHomeSubCluster(appHomeDAO);
+  }
+
+  @GET
+  @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME_APPID)
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response getApplicationHomeSubCluster(
+      @PathParam(HttpProxyFederationStateStoreConsts.PARAM_APPID) String appId)
+      throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor().getApplicationHomeSubCluster(appId);
+  }
+
+  @GET
+  @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME)
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response getApplicationsHomeSubCluster()
+      throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor().getApplicationsHomeSubCluster();
+  }
+
+  @DELETE
+  @Path(HttpProxyFederationStateStoreConsts.PATH_APP_HOME_APPID)
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public Response deleteApplicationHomeSubCluster(
+      @PathParam(HttpProxyFederationStateStoreConsts.PARAM_APPID) String appId)
+      throws YarnException {
+    init();
+    RequestInterceptorChainWrapper pipeline = getInterceptorChain(null);
+    return pipeline.getRootInterceptor().deleteApplicationHomeSubCluster(appId);
+  }
+
+  @VisibleForTesting
+  protected RequestInterceptorChainWrapper getInterceptorChain(
+      HttpServletRequest hsr) {
+    String user = "";
+    if (hsr != null) {
+      user = hsr.getRemoteUser();
+    }
+    try {
+      if (user == null || user.equals("")) {
+        // Yarn Router user
+        user = UserGroupInformation.getCurrentUser().getUserName();
+      }
+    } catch (IOException e) {
+      LOG.error("IOException " + e.getMessage());
+    }
+    if (!userPipelineMap.containsKey(user)) {
+      initializePipeline(user);
+    }
+    RequestInterceptorChainWrapper chain = userPipelineMap.get(user);
+    if (chain != null && chain.getRootInterceptor() != null) {
+      return chain;
+    }
+    return initializePipeline(user);
+  }
+
+  /**
+   * Gets the Request intercepter chains for all the users.
+   *
+   * @return the request intercepter chains.
+   */
+  @VisibleForTesting
+  protected Map<String, RequestInterceptorChainWrapper> getPipelines() {
+    return this.userPipelineMap;
+  }
+
+  /**
+   * This method creates and returns reference of the first intercepter in the
+   * chain of request intercepter instances.
+   *
+   * @return the reference of the first intercepter in the chain
+   */
+  @VisibleForTesting
+  protected StateStoreProxyInterceptor createRequestInterceptorChain() {
+    List<String> interceptorClassNames = getInterceptorClassNames(conf);
+
+    StateStoreProxyInterceptor pipeline = null;
+    StateStoreProxyInterceptor current = null;
+    for (String interceptorClassName : interceptorClassNames) {
+      try {
+        Class<?> interceptorClass = conf.getClassByName(interceptorClassName);

Review comment:
       Take a look at Configuration#getClass()

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationsHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterIdDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPoliciesConfigurationsDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterStateDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClustersInfoDAO;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends the {@code StateStoreProxyInterceptor} class and provides an
+ * implementation that simply forwards the client requests to the
+ * FederationStateStore.
+ */
+public class StateStoreProxyDefaultInterceptor
+    implements StateStoreProxyInterceptor {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(StateStoreProxyDefaultInterceptor.class.getName());
+
+  private FederationStateStoreFacade stateStoreFacade;
+  private @Context Configuration conf;
+
+  @Override
+  public void init(String user) {
+    this.stateStoreFacade = FederationStateStoreFacade.getInstance();
+  }
+
+  @Override
+  public void shutdown() {
+
+  }
+
+  @Override
+  public void setNextInterceptor(
+      StateStoreProxyInterceptor nextInterceptor) {
+    throw new YarnRuntimeException(
+        this.getClass().getName() + " should be the last interceptor");
+  }
+
+  @Override
+  public StateStoreProxyInterceptor getNextInterceptor() {
+    return null;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Response registerSubCluster(SubClusterInfoDAO scInfoDAO)
+      throws YarnException {
+    LOG.info("Registering SubCluster {}", scInfoDAO.subClusterId);
+    try {
+      stateStoreFacade.registerSubCluster(scInfoDAO.toSubClusterInfo());
+    } catch (YarnException e) {
+      LOG.error("Could not register SubCluster {}", scInfoDAO.subClusterId, e);
+      throw e;
+    }
+    LOG.info("Registered SubCluster {}", scInfoDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response deregisterSubCluster(
+      SubClusterDeregisterDAO deregisterDAO) throws YarnException {
+    SubClusterIdDAO scIdDAO = deregisterDAO.getSubClusterId();
+    SubClusterStateDAO scStateDAO = deregisterDAO.getScState();
+    LOG.info("Deregistering SubCluster {}", scIdDAO.subClusterId);
+    try {
+      stateStoreFacade
+          .deregisterSubCluster(SubClusterId.newInstance(scIdDAO.subClusterId),
+              scStateDAO.getSubClusterState());
+    } catch (YarnException e) {
+      LOG.error("Could not deregister SubCluster {}", scIdDAO.subClusterId, e);
+      throw e;
+    }
+    LOG.info("Deregistered SubCluster {}", scIdDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response subClusterHeartBeat(
+      SubClusterHeartbeatDAO heartbeatDAO) throws YarnException {
+    SubClusterIdDAO scIdDAO = heartbeatDAO.getSubClusterId();
+    SubClusterStateDAO scStateDAO = heartbeatDAO.getScState();
+    LOG.info("Heartbeating for SubCluster {}", scIdDAO.subClusterId);
+    try {
+      stateStoreFacade
+          .subClusterHeartBeat(SubClusterId.newInstance(scIdDAO.subClusterId),
+              scStateDAO.getSubClusterState(), heartbeatDAO.getCapability());
+    } catch (YarnException e) {
+      LOG.error("Could not heartbeat for SubCluster {}", scIdDAO.subClusterId,
+          e);
+      throw e;
+    }
+    LOG.info("Heartbeat for SubCluster {}", scIdDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response getSubCluster(String subClusterId)
+      throws YarnException {
+
+    if (subClusterId == null || subClusterId.isEmpty()) {
+      throw new NotFoundException("subClusterId is empty or null");
+    }
+
+    LOG.debug("Fetching subcluster info for subcluster: " + subClusterId);
+
+    SubClusterInfo resp =
+        stateStoreFacade.getSubCluster(SubClusterId.newInstance(subClusterId));
+
+    LOG.debug("Retrieved subcluster info for subcluster: " + subClusterId
+        + ". Subcluster details:" + resp);
+
+    if (resp == null) {
+      LOG.warn("Subcluster {} does not exist", subClusterId);
+      return Response.status(Response.Status.NOT_FOUND).build();
+    }
+
+    return Response.status(Response.Status.OK)
+        .entity(new SubClusterInfoDAO(resp)).build();
+  }
+
+  @Override
+  public Response getSubClusters(boolean filterInactiveSubClusters)
+      throws YarnException {
+    LOG.debug("Fetching info for all subclusters. filterInactiveSubClusters="
+        + filterInactiveSubClusters);
+
+    Map<SubClusterId, SubClusterInfo> resp =
+        stateStoreFacade.getSubClusters(filterInactiveSubClusters);
+
+    LOG.info(
+        "Retrieved subcluster info for all subclusters filter={} count is={}",
+        filterInactiveSubClusters, resp.size());
+
+    return Response.status(Response.Status.OK)
+        .entity(new SubClustersInfoDAO(resp.values())).build();
+  }
+
+  @Override
+  public Response getPoliciesConfigurations() throws YarnException {
+    LOG.debug("Fetching policy info for all queues");
+
+    Map<String, SubClusterPolicyConfiguration> resp =
+        stateStoreFacade.getPoliciesConfigurations();
+
+    LOG.debug(
+        "Retrieved policy info for all queues. Policy count is=" + resp.size());

Review comment:
       Logger {}

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationsHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterIdDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPoliciesConfigurationsDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterStateDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClustersInfoDAO;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends the {@code StateStoreProxyInterceptor} class and provides an
+ * implementation that simply forwards the client requests to the
+ * FederationStateStore.
+ */
+public class StateStoreProxyDefaultInterceptor
+    implements StateStoreProxyInterceptor {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(StateStoreProxyDefaultInterceptor.class.getName());
+
+  private FederationStateStoreFacade stateStoreFacade;
+  private @Context Configuration conf;
+
+  @Override
+  public void init(String user) {
+    this.stateStoreFacade = FederationStateStoreFacade.getInstance();
+  }
+
+  @Override
+  public void shutdown() {
+
+  }
+
+  @Override
+  public void setNextInterceptor(
+      StateStoreProxyInterceptor nextInterceptor) {
+    throw new YarnRuntimeException(
+        this.getClass().getName() + " should be the last interceptor");
+  }
+
+  @Override
+  public StateStoreProxyInterceptor getNextInterceptor() {
+    return null;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Response registerSubCluster(SubClusterInfoDAO scInfoDAO)
+      throws YarnException {
+    LOG.info("Registering SubCluster {}", scInfoDAO.subClusterId);
+    try {
+      stateStoreFacade.registerSubCluster(scInfoDAO.toSubClusterInfo());
+    } catch (YarnException e) {
+      LOG.error("Could not register SubCluster {}", scInfoDAO.subClusterId, e);
+      throw e;
+    }
+    LOG.info("Registered SubCluster {}", scInfoDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response deregisterSubCluster(
+      SubClusterDeregisterDAO deregisterDAO) throws YarnException {
+    SubClusterIdDAO scIdDAO = deregisterDAO.getSubClusterId();
+    SubClusterStateDAO scStateDAO = deregisterDAO.getScState();
+    LOG.info("Deregistering SubCluster {}", scIdDAO.subClusterId);
+    try {
+      stateStoreFacade
+          .deregisterSubCluster(SubClusterId.newInstance(scIdDAO.subClusterId),
+              scStateDAO.getSubClusterState());
+    } catch (YarnException e) {
+      LOG.error("Could not deregister SubCluster {}", scIdDAO.subClusterId, e);
+      throw e;
+    }
+    LOG.info("Deregistered SubCluster {}", scIdDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response subClusterHeartBeat(
+      SubClusterHeartbeatDAO heartbeatDAO) throws YarnException {
+    SubClusterIdDAO scIdDAO = heartbeatDAO.getSubClusterId();
+    SubClusterStateDAO scStateDAO = heartbeatDAO.getScState();
+    LOG.info("Heartbeating for SubCluster {}", scIdDAO.subClusterId);
+    try {
+      stateStoreFacade
+          .subClusterHeartBeat(SubClusterId.newInstance(scIdDAO.subClusterId),
+              scStateDAO.getSubClusterState(), heartbeatDAO.getCapability());
+    } catch (YarnException e) {
+      LOG.error("Could not heartbeat for SubCluster {}", scIdDAO.subClusterId,
+          e);
+      throw e;
+    }
+    LOG.info("Heartbeat for SubCluster {}", scIdDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response getSubCluster(String subClusterId)
+      throws YarnException {
+
+    if (subClusterId == null || subClusterId.isEmpty()) {
+      throw new NotFoundException("subClusterId is empty or null");
+    }
+
+    LOG.debug("Fetching subcluster info for subcluster: " + subClusterId);
+
+    SubClusterInfo resp =
+        stateStoreFacade.getSubCluster(SubClusterId.newInstance(subClusterId));
+
+    LOG.debug("Retrieved subcluster info for subcluster: " + subClusterId
+        + ". Subcluster details:" + resp);

Review comment:
       Use {} logger format.

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/RouterStateStoreProxyWebServices.java
##########
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.router.Router;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.router.webapp.RouterWebServices;
+import org.apache.hadoop.yarn.util.LRUCacheHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * RouterStateStoreProxyWebServices is a service that runs on each router that
+ * can be used to intercept and inspect {@link StateStoreWebServiceProtocol}
+ * messages from client to the cluster FederationStateStore. It listens
+ * {@link StateStoreWebServiceProtocol} REST messages from the client and
+ * creates a request intercepting pipeline instance for each client. The
+ * pipeline is a chain of {@link StateStoreProxyInterceptor} instances that can
+ * inspect and modify the request/response as needed. The main difference with
+ * AMRMProxyService is the protocol they implement.
+ */
+@Singleton
+@Path(HttpProxyFederationStateStoreConsts.ROOT)
+public class RouterStateStoreProxyWebServices
+    implements StateStoreWebServiceProtocol {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RouterStateStoreProxyWebServices.class);
+  private final Router router;
+  private final Configuration conf;
+  private @Context HttpServletResponse response;
+
+  private Map<String, RequestInterceptorChainWrapper> userPipelineMap;
+
+  @Inject
+  public RouterStateStoreProxyWebServices(final Router router,
+      Configuration conf) {
+    this.router = router;
+    this.conf = conf;
+    int maxCacheSize =
+        conf.getInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+            YarnConfiguration.DEFAULT_ROUTER_PIPELINE_CACHE_MAX_SIZE);
+    this.userPipelineMap = Collections.synchronizedMap(
+        new LRUCacheHashMap<String, RequestInterceptorChainWrapper>(
+            maxCacheSize, true));
+  }
+
+  /**
+   * Returns the comma separated intercepter class names from the configuration.
+   *
+   * @param config
+   * @return the intercepter class names as an instance of ArrayList
+   */
+  private List<String> getInterceptorClassNames(Configuration config) {
+    String configuredInterceptorClassNames = config
+        .get(YarnConfiguration.ROUTER_SSPROXY_INTERCEPTOR_CLASS_PIPELINE,
+            YarnConfiguration.DEFAULT_ROUTER_SSPROXY_INTERCEPTOR_CLASS);
+
+    List<String> interceptorClassNames = new ArrayList<String>();
+    Collection<String> tempList =
+        StringUtils.getStringCollection(configuredInterceptorClassNames);
+    for (String item : tempList) {
+      interceptorClassNames.add(item.trim());

Review comment:
       Extract item.trim() into somehting like: interceptorClassName

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/ssproxy/StateStoreProxyDefaultInterceptor.java
##########
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router.ssproxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.impl.HttpProxyFederationStateStoreConsts;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.ApplicationsHomeSubClusterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterDeregisterDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterHeartbeatDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterIdDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterInfoDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPoliciesConfigurationsDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterPolicyConfigurationDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClusterStateDAO;
+import org.apache.hadoop.yarn.server.federation.store.records.dao.SubClustersInfoDAO;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.PathParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.Response;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Extends the {@code StateStoreProxyInterceptor} class and provides an
+ * implementation that simply forwards the client requests to the
+ * FederationStateStore.
+ */
+public class StateStoreProxyDefaultInterceptor
+    implements StateStoreProxyInterceptor {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(StateStoreProxyDefaultInterceptor.class.getName());
+
+  private FederationStateStoreFacade stateStoreFacade;
+  private @Context Configuration conf;
+
+  @Override
+  public void init(String user) {
+    this.stateStoreFacade = FederationStateStoreFacade.getInstance();
+  }
+
+  @Override
+  public void shutdown() {
+
+  }
+
+  @Override
+  public void setNextInterceptor(
+      StateStoreProxyInterceptor nextInterceptor) {
+    throw new YarnRuntimeException(
+        this.getClass().getName() + " should be the last interceptor");
+  }
+
+  @Override
+  public StateStoreProxyInterceptor getNextInterceptor() {
+    return null;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Response registerSubCluster(SubClusterInfoDAO scInfoDAO)
+      throws YarnException {
+    LOG.info("Registering SubCluster {}", scInfoDAO.subClusterId);
+    try {
+      stateStoreFacade.registerSubCluster(scInfoDAO.toSubClusterInfo());
+    } catch (YarnException e) {
+      LOG.error("Could not register SubCluster {}", scInfoDAO.subClusterId, e);
+      throw e;
+    }
+    LOG.info("Registered SubCluster {}", scInfoDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response deregisterSubCluster(
+      SubClusterDeregisterDAO deregisterDAO) throws YarnException {
+    SubClusterIdDAO scIdDAO = deregisterDAO.getSubClusterId();
+    SubClusterStateDAO scStateDAO = deregisterDAO.getScState();
+    LOG.info("Deregistering SubCluster {}", scIdDAO.subClusterId);
+    try {
+      stateStoreFacade
+          .deregisterSubCluster(SubClusterId.newInstance(scIdDAO.subClusterId),
+              scStateDAO.getSubClusterState());
+    } catch (YarnException e) {
+      LOG.error("Could not deregister SubCluster {}", scIdDAO.subClusterId, e);
+      throw e;
+    }
+    LOG.info("Deregistered SubCluster {}", scIdDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response subClusterHeartBeat(
+      SubClusterHeartbeatDAO heartbeatDAO) throws YarnException {
+    SubClusterIdDAO scIdDAO = heartbeatDAO.getSubClusterId();
+    SubClusterStateDAO scStateDAO = heartbeatDAO.getScState();
+    LOG.info("Heartbeating for SubCluster {}", scIdDAO.subClusterId);
+    try {
+      stateStoreFacade
+          .subClusterHeartBeat(SubClusterId.newInstance(scIdDAO.subClusterId),
+              scStateDAO.getSubClusterState(), heartbeatDAO.getCapability());
+    } catch (YarnException e) {
+      LOG.error("Could not heartbeat for SubCluster {}", scIdDAO.subClusterId,
+          e);
+      throw e;
+    }
+    LOG.info("Heartbeat for SubCluster {}", scIdDAO.subClusterId);
+    return Response.status(Response.Status.OK).build();
+  }
+
+  @Override
+  public Response getSubCluster(String subClusterId)
+      throws YarnException {
+
+    if (subClusterId == null || subClusterId.isEmpty()) {
+      throw new NotFoundException("subClusterId is empty or null");
+    }
+
+    LOG.debug("Fetching subcluster info for subcluster: " + subClusterId);
+
+    SubClusterInfo resp =
+        stateStoreFacade.getSubCluster(SubClusterId.newInstance(subClusterId));
+
+    LOG.debug("Retrieved subcluster info for subcluster: " + subClusterId
+        + ". Subcluster details:" + resp);
+
+    if (resp == null) {
+      LOG.warn("Subcluster {} does not exist", subClusterId);
+      return Response.status(Response.Status.NOT_FOUND).build();
+    }
+
+    return Response.status(Response.Status.OK)
+        .entity(new SubClusterInfoDAO(resp)).build();
+  }
+
+  @Override
+  public Response getSubClusters(boolean filterInactiveSubClusters)
+      throws YarnException {
+    LOG.debug("Fetching info for all subclusters. filterInactiveSubClusters="
+        + filterInactiveSubClusters);
+
+    Map<SubClusterId, SubClusterInfo> resp =
+        stateStoreFacade.getSubClusters(filterInactiveSubClusters);
+
+    LOG.info(
+        "Retrieved subcluster info for all subclusters filter={} count is={}",
+        filterInactiveSubClusters, resp.size());
+
+    return Response.status(Response.Status.OK)
+        .entity(new SubClustersInfoDAO(resp.values())).build();
+  }
+
+  @Override
+  public Response getPoliciesConfigurations() throws YarnException {
+    LOG.debug("Fetching policy info for all queues");
+
+    Map<String, SubClusterPolicyConfiguration> resp =
+        stateStoreFacade.getPoliciesConfigurations();
+
+    LOG.debug(
+        "Retrieved policy info for all queues. Policy count is=" + resp.size());
+
+    return Response.status(Response.Status.OK)
+        .entity(new SubClusterPoliciesConfigurationsDAO(resp.values())).build();
+  }
+
+  @Override
+  public Response getPolicyConfiguration(
+      @PathParam(HttpProxyFederationStateStoreConsts.PARAM_QUEUE) String queue)
+      throws YarnException {
+
+    if (queue == null || queue.isEmpty()) {
+      throw new NotFoundException("queue name is empty or null");
+    }
+
+    LOG.debug("Fetching policy info for queue: " + queue);

Review comment:
       {}

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
##########
@@ -483,6 +472,20 @@ public Configuration getConf() {
     return this.conf;
   }
 
+  /**
+   * Periodic heartbeat from a <code>ResourceManager</code> to indicate
+   * liveliness.
+   *
+   * @param subClusterId the id of the sub cluster heart beating
+   * @param state the state of the sub cluster
+   * @param capability the capability of the sub cluster
+   */
+  public void subClusterHeartBeat(SubClusterId subClusterId,
+      SubClusterState state, String capability) throws YarnException {
+    stateStore.subClusterHeartbeat(SubClusterHeartbeatRequest

Review comment:
       Let's extract the request.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org