You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2019/09/06 17:17:26 UTC

[hadoop] branch trunk updated: YARN-9761. Allow overriding application submissions based on server side configs. Contributed by Pralabh Kumar

This is an automated email from the ASF dual-hosted git repository.

jhung pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e7d44e4  YARN-9761. Allow overriding application submissions based on server side configs. Contributed by Pralabh Kumar
e7d44e4 is described below

commit e7d44e48f708ba41e14d2a9e32a6760a62617485
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Fri Sep 6 10:10:53 2019 -0700

    YARN-9761. Allow overriding application submissions based on server side configs. Contributed by Pralabh Kumar
---
 .../apache/hadoop/yarn/conf/YarnConfiguration.java |  18 ++
 .../src/main/resources/yarn-default.xml            |  21 ++
 .../server/resourcemanager/ClientRMService.java    |  23 +++
 .../preprocessor/ContextProcessor.java             |  44 ++++
 .../preprocessor/NodeLabelProcessor.java           |  33 +++
 .../preprocessor/QueueProcessor.java               |  34 ++++
 .../SubmissionContextPreProcessor.java             | 223 +++++++++++++++++++++
 .../preprocessor/TagAddProcessor.java              |  44 ++++
 .../resourcemanager/preprocessor/package-info.java |  28 +++
 .../resourcemanager/TestClientRMService.java       | 178 ++++++++++++++++
 .../preprocessor/TestContextProcessor.java         |  63 ++++++
 .../preprocessor/TestNodeLabelProcessor.java       |  45 +++++
 .../preprocessor/TestQueueProcessor.java           |  43 ++++
 .../preprocessor/TestTagAddProcessor.java          |  47 +++++
 14 files changed, 844 insertions(+)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 0f1c544..f140d6f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -543,6 +543,24 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_NODES_INCLUDE_FILE_PATH = 
     RM_PREFIX + "nodes.include-path";
   public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = "";
+
+  /** Enable submission pre-processor.*/
+  public static final String RM_SUBMISSION_PREPROCESSOR_ENABLED =
+      RM_PREFIX + "submission-preprocessor.enabled";
+  public static final boolean DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED =
+      false;
+
+  /** Path to file with hosts for the submission processor to handle.*/
+  public static final String RM_SUBMISSION_PREPROCESSOR_FILE_PATH =
+      RM_PREFIX + "submission-preprocessor.file-path";
+  public static final String DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH =
+      "";
+
+  /** Submission processor refresh interval.*/
+  public static final String RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS =
+      RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms";
+  public static final int
+      DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = 0;
   
   /** Path to file with nodes to exclude.*/
   public static final String RM_NODES_EXCLUDE_FILE_PATH = 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e2be568..eb6bf14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -4263,4 +4263,25 @@
     <name>yarn.nodemanager.containers-launcher.class</name>
     <value>org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher</value>
   </property>
+
+  <property>
+    <description>
+      Enable the Pre processing of Application Submission context with server side configuration
+    </description>
+    <name>yarn.resourcemanager.submission-preprocessor.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>Path to file with hosts for the submission processor to handle.</description>
+    <name>yarn.resourcemanager.submission-preprocessor.file-path</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>Submission processor refresh interval</description>
+    <name>yarn.resourcemanager.submission-preprocessor.file-refresh-interval-ms</name>
+    <value>60000</value>
+  </property>
+
 </configuration>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 1e54cd6..2b93ca7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -167,6 +167,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.preprocessor.SubmissionContextPreProcessor;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
@@ -232,6 +233,8 @@ public class ClientRMService extends AbstractService implements
   private ReservationSystem reservationSystem;
   private ReservationInputValidator rValidator;
 
+  private SubmissionContextPreProcessor contextPreProcessor;
+
   private boolean filterAppsByUser = false;
 
   private static final EnumSet<RMAppState> ACTIVE_APP_STATES = EnumSet.of(
@@ -313,6 +316,13 @@ public class ClientRMService extends AbstractService implements
     this.timelineServiceV2Enabled = YarnConfiguration.
         timelineServiceV2Enabled(conf);
 
+    if (conf.getBoolean(
+        YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED,
+        YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED)) {
+      this.contextPreProcessor = new SubmissionContextPreProcessor();
+      this.contextPreProcessor.start(conf);
+    }
+
     super.serviceStart();
   }
 
@@ -321,6 +331,9 @@ public class ClientRMService extends AbstractService implements
     if (this.server != null) {
         this.server.stop();
     }
+    if (this.contextPreProcessor != null) {
+      this.contextPreProcessor.stop();
+    }
     super.serviceStop();
   }
 
@@ -332,6 +345,11 @@ public class ClientRMService extends AbstractService implements
             YarnConfiguration.DEFAULT_RM_PORT);
   }
 
+  @VisibleForTesting
+  SubmissionContextPreProcessor getContextPreProcessor() {
+    return this.contextPreProcessor;
+  }
+
   @Private
   public InetSocketAddress getBindAddress() {
     return clientBindAddress;
@@ -663,6 +681,11 @@ public class ClientRMService extends AbstractService implements
     checkReservationACLs(submissionContext.getQueue(), AuditConstants
             .SUBMIT_RESERVATION_REQUEST, reservationId);
 
+    if (this.contextPreProcessor != null) {
+      this.contextPreProcessor.preProcess(Server.getRemoteIp().getHostName(),
+          applicationId, submissionContext);
+    }
+
     try {
       // call RMAppManager to submit application directly
       rmAppManager.submitApplication(submissionContext,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/ContextProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/ContextProcessor.java
new file mode 100644
index 0000000..b33069c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/ContextProcessor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+
+
+/**
+ * This is the interface providing functionality to process
+ * application submission context.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ContextProcessor {
+  /**
+   * It will enrich the application submission context with value provided.
+   * @param host  Address of the host from where application launched.
+   * @param value  Value to be filled in ApplicationSubmissionContext.
+   * @param applicationId  Application Id of the application.
+   * @param submissionContext  Context of the application.
+   */
+  void process(String host, String value, ApplicationId applicationId,
+      ApplicationSubmissionContext submissionContext);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/NodeLabelProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/NodeLabelProcessor.java
new file mode 100644
index 0000000..30ea422
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/NodeLabelProcessor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+/**
+ * Processor will add the node label to application submission context.
+ */
+class NodeLabelProcessor implements ContextProcessor {
+  @Override
+  public void process(String host, String value, ApplicationId applicationId,
+      ApplicationSubmissionContext submissionContext) {
+    submissionContext.setNodeLabelExpression(value);
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/QueueProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/QueueProcessor.java
new file mode 100644
index 0000000..a47dcfe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/QueueProcessor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+
+/**
+ * Processor will add queue to application submission context.
+ */
+class QueueProcessor implements ContextProcessor {
+  @Override
+  public void process(String host, String value, ApplicationId applicationId,
+      ApplicationSubmissionContext submissionContext) {
+    submissionContext.setQueue(value);
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/SubmissionContextPreProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/SubmissionContextPreProcessor.java
new file mode 100644
index 0000000..68cc4cf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/SubmissionContextPreProcessor.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Pre process the ApplicationSubmissionContext with server side info.
+ */
+public class SubmissionContextPreProcessor {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SubmissionContextPreProcessor.class);
+  private static final String DEFAULT_COMMANDS = "*";
+  private static final int INITIAL_DELAY = 1000;
+
+  enum ContextProp {
+    // Node label Expression
+    NL(new NodeLabelProcessor()),
+    // Queue
+    Q(new QueueProcessor()),
+    // Tag Add
+    TA(new TagAddProcessor());
+
+    private ContextProcessor cp;
+    ContextProp(ContextProcessor cp) {
+      this.cp = cp;
+    }
+  }
+
+  private String hostsFilePath;
+  private volatile long lastModified = -1;
+  private volatile Map<String, Map<ContextProp, String>> hostCommands =
+      new HashMap<>();
+  private ScheduledExecutorService executorService;
+
+  public void start(Configuration conf) {
+    this.hostsFilePath =
+        conf.get(
+            YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH,
+            YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH);
+    int refreshPeriod =
+        conf.getInt(
+            YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS,
+            YarnConfiguration.
+                DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS);
+
+    LOG.info("Submission Context Preprocessor enabled: file=[{}], "
+            + "interval=[{}]", this.hostsFilePath, refreshPeriod);
+
+    executorService = Executors.newSingleThreadScheduledExecutor();
+    Runnable refreshConf = new Runnable() {
+      @Override
+      public void run() {
+        try {
+          refresh();
+        } catch (Exception ex) {
+          LOG.error("Error while refreshing Submission PreProcessor file [{}]",
+              hostsFilePath, ex);
+        }
+      }
+    };
+    if (refreshPeriod > 0) {
+      executorService.scheduleAtFixedRate(refreshConf, INITIAL_DELAY,
+          refreshPeriod, TimeUnit.MILLISECONDS);
+    } else {
+      executorService.schedule(refreshConf, INITIAL_DELAY,
+          TimeUnit.MILLISECONDS);
+    }
+  }
+
+  public void stop() {
+    if (this.executorService != null) {
+      this.executorService.shutdownNow();
+    }
+  }
+
+  public void preProcess(String host, ApplicationId applicationId,
+      ApplicationSubmissionContext submissionContext) {
+    Map<ContextProp, String> cMap = hostCommands.get(host);
+
+    // Try regex match
+    if (cMap == null) {
+      for (Map.Entry<String, Map<ContextProp, String>> entry :
+          hostCommands.entrySet()) {
+        if (entry.getKey().equals(DEFAULT_COMMANDS)) {
+          continue;
+        }
+        try {
+          Pattern p = Pattern.compile(entry.getKey());
+          Matcher m = p.matcher(host);
+          if (m.find()) {
+            cMap = hostCommands.get(entry.getKey());
+          }
+        } catch (PatternSyntaxException exception) {
+          LOG.warn("Invalid regex pattern: " + entry.getKey());
+        }
+      }
+    }
+    // Set to default value
+    if (cMap == null) {
+      cMap = hostCommands.get(DEFAULT_COMMANDS);
+    }
+    if (cMap != null) {
+      for (Map.Entry<ContextProp, String> entry : cMap.entrySet()) {
+        entry.getKey().cp.process(host, entry.getValue(),
+            applicationId, submissionContext);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public void refresh() throws Exception {
+    if (null == hostsFilePath || hostsFilePath.isEmpty()) {
+      LOG.warn("Host list file path [{}] is empty or does not exist !!",
+          hostsFilePath);
+    } else {
+      File hostFile = new File(hostsFilePath);
+      if (!hostFile.exists() || !hostFile.isFile()) {
+        LOG.warn("Host list file [{}] does not exist or is not a file !!",
+            hostFile);
+      } else if (hostFile.lastModified() <= lastModified) {
+        LOG.debug("Host list file [{}] has not been modified from last refresh",
+            hostFile);
+      } else {
+        FileInputStream fileInputStream = new FileInputStream(hostFile);
+        BufferedReader reader = null;
+        Map<String, Map<ContextProp, String>> tempHostCommands =
+            new HashMap<>();
+        try {
+          reader = new BufferedReader(new InputStreamReader(fileInputStream,
+              StandardCharsets.UTF_8));
+          String line;
+          while ((line = reader.readLine()) != null) {
+            // Lines should start with hostname and be followed with commands.
+            // Delimiter is any contiguous sequence of space or tab character.
+            // Commands are of the form:
+            //   <KEY>=<VALUE>
+            //   where KEY can be 'NL', 'Q' or 'TA' (more can be added later)
+            //   (TA stands for 'Tag Add')
+            // Sample lines:
+            // ...
+            // host1  NL=foo   Q=b
+            // host2   Q=c NL=bar
+            // ...
+            String[] commands = line.split("[ \t\n\f\r]+");
+            if (commands != null && commands.length > 1) {
+              String host = commands[0].trim();
+              if (host.startsWith("#")) {
+                // All lines starting with # is a comment
+                continue;
+              }
+              Map<ContextProp, String> cMap = null;
+              for (int i = 1; i < commands.length; i++) {
+                String[] cSplit = commands[i].split("=");
+                if (cSplit == null || cSplit.length != 2) {
+                  LOG.error("No commands found for line [{}]", commands[i]);
+                  continue;
+                }
+                if (cMap == null) {
+                  cMap = new HashMap<>();
+                }
+                cMap.put(ContextProp.valueOf(cSplit[0]), cSplit[1]);
+              }
+              if (cMap != null && cMap.size() > 0) {
+                tempHostCommands.put(host, cMap);
+                LOG.info("Following commands registered for host[{}] : {}",
+                    host, cMap);
+              }
+            }
+          }
+          lastModified = hostFile.lastModified();
+        } catch (Exception ex) {
+          // Do not commit the new map if we have an Exception..
+          tempHostCommands = null;
+          throw ex;
+        } finally {
+          if (tempHostCommands != null && tempHostCommands.size() > 0) {
+            hostCommands = tempHostCommands;
+          }
+          IOUtils.cleanupWithLogger(LOG, reader, fileInputStream);
+        }
+      }
+    }
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TagAddProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TagAddProcessor.java
new file mode 100644
index 0000000..22bf805
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TagAddProcessor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+
+
+/**
+ * This processor will add the tag to application submission context.
+ */
+class TagAddProcessor implements ContextProcessor {
+  @Override
+  public void process(String host, String value, ApplicationId applicationId,
+      ApplicationSubmissionContext submissionContext) {
+    Set<String> applicationTags = submissionContext.getApplicationTags();
+    if (applicationTags == null) {
+      applicationTags = new HashSet<>();
+    } else {
+      applicationTags = new HashSet<>(applicationTags);
+    }
+    applicationTags.add(value);
+    submissionContext.setApplicationTags(applicationTags);
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/package-info.java
new file mode 100644
index 0000000..70648b7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes to pre process the application submission
+ * context with server side configs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 3eed0be..d422233 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -31,11 +31,15 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
 import java.security.AccessControlException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
@@ -55,6 +59,7 @@ import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@@ -197,6 +202,7 @@ public class TestClientRMService {
   
   private final static String QUEUE_1 = "Q-1";
   private final static String QUEUE_2 = "Q-2";
+  private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo";
   private File resourceTypesFile = null;
 
   @Test
@@ -975,6 +981,178 @@ public class TestClientRMService {
     Assert.assertEquals(0, applications1.size());
   }
 
+  @Test (timeout = 30000)
+  @SuppressWarnings ("rawtypes")
+  public void testAppSubmitWithSubmissionPreProcessor() throws Exception {
+    ResourceScheduler scheduler = mockResourceScheduler();
+    RMContext rmContext = mock(RMContext.class);
+    mockRMContext(scheduler, rmContext);
+    YarnConfiguration yConf = new YarnConfiguration();
+    yConf.setBoolean(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED,
+        true);
+    yConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    // Override the YARN configuration.
+    when(rmContext.getYarnConfiguration()).thenReturn(yConf);
+    RMStateStore stateStore = mock(RMStateStore.class);
+    when(rmContext.getStateStore()).thenReturn(stateStore);
+    RMAppManager appManager = new RMAppManager(rmContext, scheduler,
+        null, mock(ApplicationACLsManager.class), new Configuration());
+    when(rmContext.getDispatcher().getEventHandler()).thenReturn(
+        new EventHandler<Event>() {
+          public void handle(Event event) {}
+        });
+    ApplicationId appId1 = getApplicationId(100);
+    ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
+    when(
+        mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
+            ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true);
+
+    QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
+    when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
+        any(QueueACL.class), any(RMApp.class), any(String.class),
+        any()))
+        .thenReturn(true);
+
+    ClientRMService rmService =
+        new ClientRMService(rmContext, scheduler, appManager,
+            mockAclsManager, mockQueueACLsManager, null);
+    File rulesFile = File.createTempFile("submission_rules", ".tmp");
+    rulesFile.deleteOnExit();
+    rulesFile.createNewFile();
+
+    yConf.set(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH,
+        rulesFile.getAbsolutePath());
+    rmService.serviceInit(yConf);
+    rmService.serviceStart();
+
+    BufferedWriter writer = new BufferedWriter(new FileWriter(rulesFile));
+    writer.write("host.cluster1.com   NL=foo     Q=bar  TA=cluster:cluster1");
+    writer.newLine();
+    writer.write("host.cluster2.com   Q=hello  NL=zuess   TA=cluster:cluster2");
+    writer.newLine();
+    writer.write("host.cluster.*.com   Q=hello  NL=reg   TA=cluster:reg");
+    writer.newLine();
+    writer.write("host.cluster.*.com   Q=hello  NL=reg   TA=cluster:reg");
+    writer.newLine();
+    writer.write("*   TA=cluster:other    Q=default  NL=barfoo");
+    writer.newLine();
+    writer.write("host.testcluster1.com  Q=default");
+    writer.flush();
+    writer.close();
+    rmService.getContextPreProcessor().refresh();
+    setupCurrentCall("host.cluster1.com");
+    SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
+        appId1, null, null);
+    try {
+      rmService.submitApplication(submitRequest1);
+    } catch (YarnException e) {
+      Assert.fail("Exception is not expected.");
+    }
+    RMApp app1 = rmContext.getRMApps().get(appId1);
+    Assert.assertNotNull("app doesn't exist", app1);
+    Assert.assertEquals("app name doesn't match",
+        YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName());
+    Assert.assertTrue("custom tag not present",
+        app1.getApplicationTags().contains("cluster:cluster1"));
+    Assert.assertEquals("app queue doesn't match", "bar", app1.getQueue());
+    Assert.assertEquals("app node label doesn't match",
+        "foo", app1.getApplicationSubmissionContext().getNodeLabelExpression());
+    setupCurrentCall("host.cluster2.com");
+    ApplicationId appId2 = getApplicationId(101);
+    SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
+        appId2, null, null);
+    submitRequest2.getApplicationSubmissionContext().setApplicationType(
+        "matchType");
+    Set<String> aTags = new HashSet<String>();
+    aTags.add(APPLICATION_TAG_SC_PREPROCESSOR);
+    submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags);
+    try {
+      rmService.submitApplication(submitRequest2);
+    } catch (YarnException e) {
+      Assert.fail("Exception is not expected.");
+    }
+    RMApp app2 = rmContext.getRMApps().get(appId2);
+    Assert.assertNotNull("app doesn't exist", app2);
+    Assert.assertEquals("app name doesn't match",
+        YarnConfiguration.DEFAULT_APPLICATION_NAME, app2.getName());
+    Assert.assertTrue("client tag not present",
+        app2.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR));
+    Assert.assertTrue("custom tag not present",
+        app2.getApplicationTags().contains("cluster:cluster2"));
+    Assert.assertEquals("app queue doesn't match", "hello", app2.getQueue());
+    Assert.assertEquals("app node label doesn't match",
+        "zuess",
+        app2.getApplicationSubmissionContext().getNodeLabelExpression());
+    // Test Default commands
+    setupCurrentCall("host2.cluster3.com");
+    ApplicationId appId3 = getApplicationId(102);
+    SubmitApplicationRequest submitRequest3 = mockSubmitAppRequest(
+        appId3, null, null);
+    submitRequest3.getApplicationSubmissionContext().setApplicationType(
+        "matchType");
+    submitRequest3.getApplicationSubmissionContext().setApplicationTags(aTags);
+    try {
+      rmService.submitApplication(submitRequest3);
+    } catch (YarnException e) {
+      Assert.fail("Exception is not expected.");
+    }
+    RMApp app3 = rmContext.getRMApps().get(appId3);
+    Assert.assertNotNull("app doesn't exist", app3);
+    Assert.assertEquals("app name doesn't match",
+        YarnConfiguration.DEFAULT_APPLICATION_NAME, app3.getName());
+    Assert.assertTrue("client tag not present",
+        app3.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR));
+    Assert.assertTrue("custom tag not present",
+        app3.getApplicationTags().contains("cluster:other"));
+    Assert.assertEquals("app queue doesn't match", "default", app3.getQueue());
+    Assert.assertEquals("app node label doesn't match",
+        "barfoo",
+        app3.getApplicationSubmissionContext().getNodeLabelExpression());
+    // Test regex
+    setupCurrentCall("host.cluster100.com");
+    ApplicationId appId4 = getApplicationId(103);
+    SubmitApplicationRequest submitRequest4 = mockSubmitAppRequest(
+        appId4, null, null);
+    try {
+      rmService.submitApplication(submitRequest4);
+    } catch (YarnException e) {
+      Assert.fail("Exception is not expected.");
+    }
+    RMApp app4 = rmContext.getRMApps().get(appId4);
+    Assert.assertTrue("custom tag not present",
+        app4.getApplicationTags().contains("cluster:reg"));
+    Assert.assertEquals("app node label doesn't match",
+        "reg", app4.getApplicationSubmissionContext().getNodeLabelExpression());
+    testSubmissionContextWithAbsentTAG(rmService, rmContext);
+    rmService.serviceStop();
+  }
+
+  private void testSubmissionContextWithAbsentTAG(ClientRMService rmService,
+      RMContext rmContext) throws Exception {
+    setupCurrentCall("host.testcluster1.com");
+    ApplicationId appId5 = getApplicationId(104);
+    SubmitApplicationRequest submitRequest5 = mockSubmitAppRequest(
+        appId5, null, null);
+    try {
+      rmService.submitApplication(submitRequest5);
+    } catch (YarnException e) {
+      Assert.fail("Exception is not expected.");
+    }
+    RMApp app5 = rmContext.getRMApps().get(appId5);
+    Assert.assertEquals("custom tag  present",
+        app5.getApplicationTags().size(), 0);
+    Assert.assertNull("app node label present",
+        app5.getApplicationSubmissionContext().getNodeLabelExpression());
+    Assert.assertEquals("Queue name is not present",
+        app5.getQueue(), "default");
+  }
+  private void setupCurrentCall(String hostName) throws UnknownHostException {
+    Server.Call mockCall = mock(Server.Call.class);
+    when(mockCall.getHostInetAddress()).thenReturn(
+                InetAddress.getByAddress(hostName,
+                        new byte[]{123, 123, 123, 123}));
+    Server.getCurCall().set(mockCall);
+  }
   
   @Test (timeout = 30000)
   @SuppressWarnings ("rawtypes")
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestContextProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestContextProcessor.java
new file mode 100644
index 0000000..039e794
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestContextProcessor.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * This class will test the functionality of all the three
+ * processor(Node, Queue, Tag)  together on same
+ * ApplicationSubmissionContext.
+ */
+public class TestContextProcessor {
+  @Test
+  public void testContextProcessor() {
+    Map<ContextProcessor, String> contextProcessorsAndValues =
+        new HashMap<>();
+    contextProcessorsAndValues.put(new NodeLabelProcessor(), "foo");
+    contextProcessorsAndValues.put(new QueueProcessor(), "queue1");
+    contextProcessorsAndValues.put(new TagAddProcessor(), "cluster:cluster1");
+    ApplicationId app = ApplicationId.newInstance(123456, 111);
+    ApplicationSubmissionContext applicationSubmissionContext =
+        mock(ApplicationSubmissionContext.class);
+    for(Map.Entry<ContextProcessor, String> entry :
+        contextProcessorsAndValues.entrySet()){
+      entry.getKey().process("host.cluster2.com", entry.getValue(),
+          app, applicationSubmissionContext);
+    }
+    Set<String> applicationTags =new HashSet<String>();
+    applicationTags.add("cluster:cluster1");
+    verify(applicationSubmissionContext, times(1))
+        .setNodeLabelExpression("foo");
+    verify(applicationSubmissionContext, times(1))
+        .setQueue("queue1");
+    verify(applicationSubmissionContext, times(1))
+        .setApplicationTags(applicationTags);
+
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestNodeLabelProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestNodeLabelProcessor.java
new file mode 100644
index 0000000..bb55b72
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestNodeLabelProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * This class will test the functionality of NodeLabelProcessor.
+ */
+public class TestNodeLabelProcessor {
+
+  @Test
+  public void testNodeLabelProcessor() {
+    ContextProcessor nodeLabelProcessor = new NodeLabelProcessor();
+    ApplicationId app = ApplicationId.newInstance(123456, 111);
+    ApplicationSubmissionContext applicationSubmissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
+    nodeLabelProcessor.process("host.cluster2.com", "foo", app,
+        applicationSubmissionContext);
+    verify(applicationSubmissionContext, times(1))
+        .setNodeLabelExpression("foo");
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestQueueProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestQueueProcessor.java
new file mode 100644
index 0000000..fc032bb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestQueueProcessor.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * This class will test the functionality of QueueProcessor.
+ */
+public class TestQueueProcessor {
+  @Test
+  public void testQueueProcessor() {
+    ContextProcessor queueProcessor = new QueueProcessor();
+    ApplicationId app = ApplicationId.newInstance(123456, 111);
+    ApplicationSubmissionContext applicationSubmissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
+    queueProcessor.process("host.cluster2.com", "queue1",
+        app, applicationSubmissionContext);
+    verify(applicationSubmissionContext, times(1)).setQueue("queue1");
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestTagAddProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestTagAddProcessor.java
new file mode 100644
index 0000000..abe0674
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestTagAddProcessor.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * This class will test the functionality of TagAddProcessor.
+ */
+public class TestTagAddProcessor {
+  @Test
+  public void testTagAddProcessor() {
+    ContextProcessor tagAddProcessor = new TagAddProcessor();
+    ApplicationId app = ApplicationId.newInstance(123456, 111);
+    ApplicationSubmissionContext applicationSubmissionContext =
+        mock(ApplicationSubmissionContext.class);
+    when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
+    tagAddProcessor.process("host.cluster2.com",
+        "cluster:cluster1", app, applicationSubmissionContext);
+    Set<String> applicationTags = new HashSet<String>();
+    applicationTags.add("cluster:cluster1");
+    verify(applicationSubmissionContext, times(1))
+        .setApplicationTags(applicationTags);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org