You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2019/09/06 17:17:43 UTC
[hadoop] branch branch-3.2 updated: YARN-9761. Allow overriding
application submissions based on server side configs. Contributed by
Pralabh Kumar
This is an automated email from the ASF dual-hosted git repository.
jhung pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 1f685ef YARN-9761. Allow overriding application submissions based on server side configs. Contributed by Pralabh Kumar
1f685ef is described below
commit 1f685efc7380ede6fef8da19edd68a86958c7961
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Fri Sep 6 10:10:53 2019 -0700
YARN-9761. Allow overriding application submissions based on server side configs. Contributed by Pralabh Kumar
---
.../apache/hadoop/yarn/conf/YarnConfiguration.java | 18 ++
.../src/main/resources/yarn-default.xml | 21 ++
.../server/resourcemanager/ClientRMService.java | 24 +++
.../preprocessor/ContextProcessor.java | 44 ++++
.../preprocessor/NodeLabelProcessor.java | 33 +++
.../preprocessor/QueueProcessor.java | 34 ++++
.../SubmissionContextPreProcessor.java | 223 +++++++++++++++++++++
.../preprocessor/TagAddProcessor.java | 44 ++++
.../resourcemanager/preprocessor/package-info.java | 28 +++
.../resourcemanager/TestClientRMService.java | 178 ++++++++++++++++
.../preprocessor/TestContextProcessor.java | 63 ++++++
.../preprocessor/TestNodeLabelProcessor.java | 45 +++++
.../preprocessor/TestQueueProcessor.java | 43 ++++
.../preprocessor/TestTagAddProcessor.java | 47 +++++
14 files changed, 845 insertions(+)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index c69d857..87d2f0c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -532,6 +532,24 @@ public class YarnConfiguration extends Configuration {
public static final String RM_NODES_INCLUDE_FILE_PATH =
RM_PREFIX + "nodes.include-path";
public static final String DEFAULT_RM_NODES_INCLUDE_FILE_PATH = "";
+
+ /** Enable submission pre-processor.*/
+ public static final String RM_SUBMISSION_PREPROCESSOR_ENABLED =
+ RM_PREFIX + "submission-preprocessor.enabled";
+ public static final boolean DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED =
+ false;
+
+ /** Path to file with hosts for the submission processor to handle.*/
+ public static final String RM_SUBMISSION_PREPROCESSOR_FILE_PATH =
+ RM_PREFIX + "submission-preprocessor.file-path";
+ public static final String DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH =
+ "";
+
+ /** Submission processor refresh interval.*/
+ public static final String RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS =
+ RM_PREFIX + "submission-preprocessor.file-refresh-interval-ms";
+ public static final int
+ DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS = 0;
/** Path to file with nodes to exclude.*/
public static final String RM_NODES_EXCLUDE_FILE_PATH =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 3b9cf5d..0835d02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -4082,4 +4082,25 @@
<name>yarn.nodemanager.containers-launcher.class</name>
<value>org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher</value>
</property>
+
+ <property>
+ <description>
+ Enable the Pre processing of Application Submission context with server side configuration
+ </description>
+ <name>yarn.resourcemanager.submission-preprocessor.enabled</name>
+ <value>false</value>
+ </property>
+
+ <property>
+ <description>Path to file with hosts for the submission processor to handle.</description>
+ <name>yarn.resourcemanager.submission-preprocessor.file-path</name>
+ <value></value>
+ </property>
+
+ <property>
+ <description>Submission processor refresh interval</description>
+ <name>yarn.resourcemanager.submission-preprocessor.file-refresh-interval-ms</name>
+ <value>60000</value>
+ </property>
+
</configuration>
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
index 6cc7b43..3d1f01d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
@@ -167,6 +167,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.Keys;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.preprocessor.SubmissionContextPreProcessor;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation;
@@ -231,6 +232,8 @@ public class ClientRMService extends AbstractService implements
private ReservationSystem reservationSystem;
private ReservationInputValidator rValidator;
+ private SubmissionContextPreProcessor contextPreProcessor;
+
private boolean filterAppsByUser = false;
private static final EnumSet<RMAppState> ACTIVE_APP_STATES = EnumSet.of(
@@ -311,6 +314,14 @@ public class ClientRMService extends AbstractService implements
server.getListenerAddress());
this.timelineServiceV2Enabled = YarnConfiguration.
timelineServiceV2Enabled(conf);
+
+ if (conf.getBoolean(
+ YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED,
+ YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_ENABLED)) {
+ this.contextPreProcessor = new SubmissionContextPreProcessor();
+ this.contextPreProcessor.start(conf);
+ }
+
super.serviceStart();
}
@@ -319,6 +330,9 @@ public class ClientRMService extends AbstractService implements
if (this.server != null) {
this.server.stop();
}
+ if (this.contextPreProcessor != null) {
+ this.contextPreProcessor.stop();
+ }
super.serviceStop();
}
@@ -330,6 +344,11 @@ public class ClientRMService extends AbstractService implements
YarnConfiguration.DEFAULT_RM_PORT);
}
+ @VisibleForTesting
+ SubmissionContextPreProcessor getContextPreProcessor() {
+ return this.contextPreProcessor;
+ }
+
@Private
public InetSocketAddress getBindAddress() {
return clientBindAddress;
@@ -661,6 +680,11 @@ public class ClientRMService extends AbstractService implements
checkReservationACLs(submissionContext.getQueue(), AuditConstants
.SUBMIT_RESERVATION_REQUEST, reservationId);
+ if (this.contextPreProcessor != null) {
+ this.contextPreProcessor.preProcess(Server.getRemoteIp().getHostName(),
+ applicationId, submissionContext);
+ }
+
try {
// call RMAppManager to submit application directly
rmAppManager.submitApplication(submissionContext,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/ContextProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/ContextProcessor.java
new file mode 100644
index 0000000..b33069c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/ContextProcessor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+
+
+/**
+ * This is the interface providing functionality to process
+ * application submission context.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface ContextProcessor {
+ /**
+ * It will enrich the application submission context with value provided.
+ * @param host Address of the host from where application launched.
+ * @param value Value to be filled in ApplicationSubmissionContext.
+ * @param applicationId Application Id of the application.
+ * @param submissionContext Context of the application.
+ */
+ void process(String host, String value, ApplicationId applicationId,
+ ApplicationSubmissionContext submissionContext);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/NodeLabelProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/NodeLabelProcessor.java
new file mode 100644
index 0000000..30ea422
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/NodeLabelProcessor.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+/**
+ * Processor will add the node label to application submission context.
+ */
+class NodeLabelProcessor implements ContextProcessor {
+ @Override
+ public void process(String host, String value, ApplicationId applicationId,
+ ApplicationSubmissionContext submissionContext) {
+ submissionContext.setNodeLabelExpression(value);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/QueueProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/QueueProcessor.java
new file mode 100644
index 0000000..a47dcfe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/QueueProcessor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+
+/**
+ * Processor will add queue to application submission context.
+ */
+class QueueProcessor implements ContextProcessor {
+ @Override
+ public void process(String host, String value, ApplicationId applicationId,
+ ApplicationSubmissionContext submissionContext) {
+ submissionContext.setQueue(value);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/SubmissionContextPreProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/SubmissionContextPreProcessor.java
new file mode 100644
index 0000000..68cc4cf
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/SubmissionContextPreProcessor.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Pre process the ApplicationSubmissionContext with server side info.
+ */
+public class SubmissionContextPreProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ SubmissionContextPreProcessor.class);
+ private static final String DEFAULT_COMMANDS = "*";
+ private static final int INITIAL_DELAY = 1000;
+
+ enum ContextProp {
+ // Node label Expression
+ NL(new NodeLabelProcessor()),
+ // Queue
+ Q(new QueueProcessor()),
+ // Tag Add
+ TA(new TagAddProcessor());
+
+ private ContextProcessor cp;
+ ContextProp(ContextProcessor cp) {
+ this.cp = cp;
+ }
+ }
+
+ private String hostsFilePath;
+ private volatile long lastModified = -1;
+ private volatile Map<String, Map<ContextProp, String>> hostCommands =
+ new HashMap<>();
+ private ScheduledExecutorService executorService;
+
+ public void start(Configuration conf) {
+ this.hostsFilePath =
+ conf.get(
+ YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH,
+ YarnConfiguration.DEFAULT_RM_SUBMISSION_PREPROCESSOR_FILE_PATH);
+ int refreshPeriod =
+ conf.getInt(
+ YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS,
+ YarnConfiguration.
+ DEFAULT_RM_SUBMISSION_PREPROCESSOR_REFRESH_INTERVAL_MS);
+
+ LOG.info("Submission Context Preprocessor enabled: file=[{}], "
+ + "interval=[{}]", this.hostsFilePath, refreshPeriod);
+
+ executorService = Executors.newSingleThreadScheduledExecutor();
+ Runnable refreshConf = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ refresh();
+ } catch (Exception ex) {
+ LOG.error("Error while refreshing Submission PreProcessor file [{}]",
+ hostsFilePath, ex);
+ }
+ }
+ };
+ if (refreshPeriod > 0) {
+ executorService.scheduleAtFixedRate(refreshConf, INITIAL_DELAY,
+ refreshPeriod, TimeUnit.MILLISECONDS);
+ } else {
+ executorService.schedule(refreshConf, INITIAL_DELAY,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void stop() {
+ if (this.executorService != null) {
+ this.executorService.shutdownNow();
+ }
+ }
+
+ public void preProcess(String host, ApplicationId applicationId,
+ ApplicationSubmissionContext submissionContext) {
+ Map<ContextProp, String> cMap = hostCommands.get(host);
+
+ // Try regex match
+ if (cMap == null) {
+ for (Map.Entry<String, Map<ContextProp, String>> entry :
+ hostCommands.entrySet()) {
+ if (entry.getKey().equals(DEFAULT_COMMANDS)) {
+ continue;
+ }
+ try {
+ Pattern p = Pattern.compile(entry.getKey());
+ Matcher m = p.matcher(host);
+ if (m.find()) {
+ cMap = hostCommands.get(entry.getKey());
+ }
+ } catch (PatternSyntaxException exception) {
+ LOG.warn("Invalid regex pattern: " + entry.getKey());
+ }
+ }
+ }
+ // Set to default value
+ if (cMap == null) {
+ cMap = hostCommands.get(DEFAULT_COMMANDS);
+ }
+ if (cMap != null) {
+ for (Map.Entry<ContextProp, String> entry : cMap.entrySet()) {
+ entry.getKey().cp.process(host, entry.getValue(),
+ applicationId, submissionContext);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public void refresh() throws Exception {
+ if (null == hostsFilePath || hostsFilePath.isEmpty()) {
+ LOG.warn("Host list file path [{}] is empty or does not exist !!",
+ hostsFilePath);
+ } else {
+ File hostFile = new File(hostsFilePath);
+ if (!hostFile.exists() || !hostFile.isFile()) {
+ LOG.warn("Host list file [{}] does not exist or is not a file !!",
+ hostFile);
+ } else if (hostFile.lastModified() <= lastModified) {
+ LOG.debug("Host list file [{}] has not been modified from last refresh",
+ hostFile);
+ } else {
+ FileInputStream fileInputStream = new FileInputStream(hostFile);
+ BufferedReader reader = null;
+ Map<String, Map<ContextProp, String>> tempHostCommands =
+ new HashMap<>();
+ try {
+ reader = new BufferedReader(new InputStreamReader(fileInputStream,
+ StandardCharsets.UTF_8));
+ String line;
+ while ((line = reader.readLine()) != null) {
+ // Lines should start with hostname and be followed with commands.
+ // Delimiter is any contiguous sequence of space or tab character.
+ // Commands are of the form:
+ // <KEY>=<VALUE>
+ // where KEY can be 'NL', 'Q' or 'TA' (more can be added later)
+ // (TA stands for 'Tag Add')
+ // Sample lines:
+ // ...
+ // host1 NL=foo Q=b
+ // host2 Q=c NL=bar
+ // ...
+ String[] commands = line.split("[ \t\n\f\r]+");
+ if (commands != null && commands.length > 1) {
+ String host = commands[0].trim();
+ if (host.startsWith("#")) {
+ // All lines starting with # is a comment
+ continue;
+ }
+ Map<ContextProp, String> cMap = null;
+ for (int i = 1; i < commands.length; i++) {
+ String[] cSplit = commands[i].split("=");
+ if (cSplit == null || cSplit.length != 2) {
+ LOG.error("No commands found for line [{}]", commands[i]);
+ continue;
+ }
+ if (cMap == null) {
+ cMap = new HashMap<>();
+ }
+ cMap.put(ContextProp.valueOf(cSplit[0]), cSplit[1]);
+ }
+ if (cMap != null && cMap.size() > 0) {
+ tempHostCommands.put(host, cMap);
+ LOG.info("Following commands registered for host[{}] : {}",
+ host, cMap);
+ }
+ }
+ }
+ lastModified = hostFile.lastModified();
+ } catch (Exception ex) {
+ // Do not commit the new map if we have an Exception..
+ tempHostCommands = null;
+ throw ex;
+ } finally {
+ if (tempHostCommands != null && tempHostCommands.size() > 0) {
+ hostCommands = tempHostCommands;
+ }
+ IOUtils.cleanupWithLogger(LOG, reader, fileInputStream);
+ }
+ }
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TagAddProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TagAddProcessor.java
new file mode 100644
index 0000000..22bf805
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TagAddProcessor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+
+
+/**
+ * This processor will add the tag to application submission context.
+ */
+class TagAddProcessor implements ContextProcessor {
+ @Override
+ public void process(String host, String value, ApplicationId applicationId,
+ ApplicationSubmissionContext submissionContext) {
+ Set<String> applicationTags = submissionContext.getApplicationTags();
+ if (applicationTags == null) {
+ applicationTags = new HashSet<>();
+ } else {
+ applicationTags = new HashSet<>(applicationTags);
+ }
+ applicationTags.add(value);
+ submissionContext.setApplicationTags(applicationTags);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/package-info.java
new file mode 100644
index 0000000..70648b7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains classes to pre process the application submission
+ * context with server side configs.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
index 0784d1e..e17cb1a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java
@@ -30,11 +30,15 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
+import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
@@ -54,6 +58,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
@@ -195,6 +200,7 @@ public class TestClientRMService {
private final static String QUEUE_1 = "Q-1";
private final static String QUEUE_2 = "Q-2";
+ private final static String APPLICATION_TAG_SC_PREPROCESSOR ="mytag:foo";
private File resourceTypesFile = null;
@Test
@@ -972,6 +978,178 @@ public class TestClientRMService {
Assert.assertEquals(0, applications1.size());
}
+ @Test (timeout = 30000)
+ @SuppressWarnings ("rawtypes")
+ public void testAppSubmitWithSubmissionPreProcessor() throws Exception {
+ ResourceScheduler scheduler = mockResourceScheduler();
+ RMContext rmContext = mock(RMContext.class);
+ mockRMContext(scheduler, rmContext);
+ YarnConfiguration yConf = new YarnConfiguration();
+ yConf.setBoolean(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_ENABLED,
+ true);
+ yConf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+ // Override the YARN configuration.
+ when(rmContext.getYarnConfiguration()).thenReturn(yConf);
+ RMStateStore stateStore = mock(RMStateStore.class);
+ when(rmContext.getStateStore()).thenReturn(stateStore);
+ RMAppManager appManager = new RMAppManager(rmContext, scheduler,
+ null, mock(ApplicationACLsManager.class), new Configuration());
+ when(rmContext.getDispatcher().getEventHandler()).thenReturn(
+ new EventHandler<Event>() {
+ public void handle(Event event) {}
+ });
+ ApplicationId appId1 = getApplicationId(100);
+ ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
+ when(
+ mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
+ ApplicationAccessType.VIEW_APP, null, appId1)).thenReturn(true);
+
+ QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
+ when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
+ any(QueueACL.class), any(RMApp.class), any(String.class),
+ any()))
+ .thenReturn(true);
+
+ ClientRMService rmService =
+ new ClientRMService(rmContext, scheduler, appManager,
+ mockAclsManager, mockQueueACLsManager, null);
+ File rulesFile = File.createTempFile("submission_rules", ".tmp");
+ rulesFile.deleteOnExit();
+ rulesFile.createNewFile();
+
+ yConf.set(YarnConfiguration.RM_SUBMISSION_PREPROCESSOR_FILE_PATH,
+ rulesFile.getAbsolutePath());
+ rmService.serviceInit(yConf);
+ rmService.serviceStart();
+
+ BufferedWriter writer = new BufferedWriter(new FileWriter(rulesFile));
+ writer.write("host.cluster1.com NL=foo Q=bar TA=cluster:cluster1");
+ writer.newLine();
+ writer.write("host.cluster2.com Q=hello NL=zuess TA=cluster:cluster2");
+ writer.newLine();
+ writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg");
+ writer.newLine();
+ writer.write("host.cluster.*.com Q=hello NL=reg TA=cluster:reg");
+ writer.newLine();
+ writer.write("* TA=cluster:other Q=default NL=barfoo");
+ writer.newLine();
+ writer.write("host.testcluster1.com Q=default");
+ writer.flush();
+ writer.close();
+ rmService.getContextPreProcessor().refresh();
+ setupCurrentCall("host.cluster1.com");
+ SubmitApplicationRequest submitRequest1 = mockSubmitAppRequest(
+ appId1, null, null);
+ try {
+ rmService.submitApplication(submitRequest1);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app1 = rmContext.getRMApps().get(appId1);
+ Assert.assertNotNull("app doesn't exist", app1);
+ Assert.assertEquals("app name doesn't match",
+ YarnConfiguration.DEFAULT_APPLICATION_NAME, app1.getName());
+ Assert.assertTrue("custom tag not present",
+ app1.getApplicationTags().contains("cluster:cluster1"));
+ Assert.assertEquals("app queue doesn't match", "bar", app1.getQueue());
+ Assert.assertEquals("app node label doesn't match",
+ "foo", app1.getApplicationSubmissionContext().getNodeLabelExpression());
+ setupCurrentCall("host.cluster2.com");
+ ApplicationId appId2 = getApplicationId(101);
+ SubmitApplicationRequest submitRequest2 = mockSubmitAppRequest(
+ appId2, null, null);
+ submitRequest2.getApplicationSubmissionContext().setApplicationType(
+ "matchType");
+ Set<String> aTags = new HashSet<String>();
+ aTags.add(APPLICATION_TAG_SC_PREPROCESSOR);
+ submitRequest2.getApplicationSubmissionContext().setApplicationTags(aTags);
+ try {
+ rmService.submitApplication(submitRequest2);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app2 = rmContext.getRMApps().get(appId2);
+ Assert.assertNotNull("app doesn't exist", app2);
+ Assert.assertEquals("app name doesn't match",
+ YarnConfiguration.DEFAULT_APPLICATION_NAME, app2.getName());
+ Assert.assertTrue("client tag not present",
+ app2.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR));
+ Assert.assertTrue("custom tag not present",
+ app2.getApplicationTags().contains("cluster:cluster2"));
+ Assert.assertEquals("app queue doesn't match", "hello", app2.getQueue());
+ Assert.assertEquals("app node label doesn't match",
+ "zuess",
+ app2.getApplicationSubmissionContext().getNodeLabelExpression());
+ // Test Default commands
+ setupCurrentCall("host2.cluster3.com");
+ ApplicationId appId3 = getApplicationId(102);
+ SubmitApplicationRequest submitRequest3 = mockSubmitAppRequest(
+ appId3, null, null);
+ submitRequest3.getApplicationSubmissionContext().setApplicationType(
+ "matchType");
+ submitRequest3.getApplicationSubmissionContext().setApplicationTags(aTags);
+ try {
+ rmService.submitApplication(submitRequest3);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app3 = rmContext.getRMApps().get(appId3);
+ Assert.assertNotNull("app doesn't exist", app3);
+ Assert.assertEquals("app name doesn't match",
+ YarnConfiguration.DEFAULT_APPLICATION_NAME, app3.getName());
+ Assert.assertTrue("client tag not present",
+ app3.getApplicationTags().contains(APPLICATION_TAG_SC_PREPROCESSOR));
+ Assert.assertTrue("custom tag not present",
+ app3.getApplicationTags().contains("cluster:other"));
+ Assert.assertEquals("app queue doesn't match", "default", app3.getQueue());
+ Assert.assertEquals("app node label doesn't match",
+ "barfoo",
+ app3.getApplicationSubmissionContext().getNodeLabelExpression());
+ // Test regex
+ setupCurrentCall("host.cluster100.com");
+ ApplicationId appId4 = getApplicationId(103);
+ SubmitApplicationRequest submitRequest4 = mockSubmitAppRequest(
+ appId4, null, null);
+ try {
+ rmService.submitApplication(submitRequest4);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app4 = rmContext.getRMApps().get(appId4);
+ Assert.assertTrue("custom tag not present",
+ app4.getApplicationTags().contains("cluster:reg"));
+ Assert.assertEquals("app node label doesn't match",
+ "reg", app4.getApplicationSubmissionContext().getNodeLabelExpression());
+ testSubmissionContextWithAbsentTAG(rmService, rmContext);
+ rmService.serviceStop();
+ }
+
+ private void testSubmissionContextWithAbsentTAG(ClientRMService rmService,
+ RMContext rmContext) throws Exception {
+ setupCurrentCall("host.testcluster1.com");
+ ApplicationId appId5 = getApplicationId(104);
+ SubmitApplicationRequest submitRequest5 = mockSubmitAppRequest(
+ appId5, null, null);
+ try {
+ rmService.submitApplication(submitRequest5);
+ } catch (YarnException e) {
+ Assert.fail("Exception is not expected.");
+ }
+ RMApp app5 = rmContext.getRMApps().get(appId5);
+ Assert.assertEquals("custom tag present",
+ app5.getApplicationTags().size(), 0);
+ Assert.assertNull("app node label present",
+ app5.getApplicationSubmissionContext().getNodeLabelExpression());
+ Assert.assertEquals("Queue name is not present",
+ app5.getQueue(), "default");
+ }
+ private void setupCurrentCall(String hostName) throws UnknownHostException {
+ Server.Call mockCall = mock(Server.Call.class);
+ when(mockCall.getHostInetAddress()).thenReturn(
+ InetAddress.getByAddress(hostName,
+ new byte[]{123, 123, 123, 123}));
+ Server.getCurCall().set(mockCall);
+ }
@Test (timeout = 30000)
@SuppressWarnings ("rawtypes")
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestContextProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestContextProcessor.java
new file mode 100644
index 0000000..039e794
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestContextProcessor.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * This class will test the functionality of all the three
+ * processor(Node, Queue, Tag) together on same
+ * ApplicationSubmissionContext.
+ */
+public class TestContextProcessor {
+ @Test
+ public void testContextProcessor() {
+ Map<ContextProcessor, String> contextProcessorsAndValues =
+ new HashMap<>();
+ contextProcessorsAndValues.put(new NodeLabelProcessor(), "foo");
+ contextProcessorsAndValues.put(new QueueProcessor(), "queue1");
+ contextProcessorsAndValues.put(new TagAddProcessor(), "cluster:cluster1");
+ ApplicationId app = ApplicationId.newInstance(123456, 111);
+ ApplicationSubmissionContext applicationSubmissionContext =
+ mock(ApplicationSubmissionContext.class);
+ for(Map.Entry<ContextProcessor, String> entry :
+ contextProcessorsAndValues.entrySet()){
+ entry.getKey().process("host.cluster2.com", entry.getValue(),
+ app, applicationSubmissionContext);
+ }
+ Set<String> applicationTags =new HashSet<String>();
+ applicationTags.add("cluster:cluster1");
+ verify(applicationSubmissionContext, times(1))
+ .setNodeLabelExpression("foo");
+ verify(applicationSubmissionContext, times(1))
+ .setQueue("queue1");
+ verify(applicationSubmissionContext, times(1))
+ .setApplicationTags(applicationTags);
+
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestNodeLabelProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestNodeLabelProcessor.java
new file mode 100644
index 0000000..bb55b72
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestNodeLabelProcessor.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+/**
+ * This class will test the functionality of NodeLabelProcessor.
+ */
+public class TestNodeLabelProcessor {
+
+ @Test
+ public void testNodeLabelProcessor() {
+ ContextProcessor nodeLabelProcessor = new NodeLabelProcessor();
+ ApplicationId app = ApplicationId.newInstance(123456, 111);
+ ApplicationSubmissionContext applicationSubmissionContext =
+ mock(ApplicationSubmissionContext.class);
+ when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
+ nodeLabelProcessor.process("host.cluster2.com", "foo", app,
+ applicationSubmissionContext);
+ verify(applicationSubmissionContext, times(1))
+ .setNodeLabelExpression("foo");
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestQueueProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestQueueProcessor.java
new file mode 100644
index 0000000..fc032bb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestQueueProcessor.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.junit.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/**
+ * This class will test the functionality of QueueProcessor.
+ */
+public class TestQueueProcessor {
+ @Test
+ public void testQueueProcessor() {
+ ContextProcessor queueProcessor = new QueueProcessor();
+ ApplicationId app = ApplicationId.newInstance(123456, 111);
+ ApplicationSubmissionContext applicationSubmissionContext =
+ mock(ApplicationSubmissionContext.class);
+ when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
+ queueProcessor.process("host.cluster2.com", "queue1",
+ app, applicationSubmissionContext);
+ verify(applicationSubmissionContext, times(1)).setQueue("queue1");
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestTagAddProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestTagAddProcessor.java
new file mode 100644
index 0000000..abe0674
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/preprocessor/TestTagAddProcessor.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.preprocessor;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * This class will test the functionality of TagAddProcessor.
+ */
+public class TestTagAddProcessor {
+ @Test
+ public void testTagAddProcessor() {
+ ContextProcessor tagAddProcessor = new TagAddProcessor();
+ ApplicationId app = ApplicationId.newInstance(123456, 111);
+ ApplicationSubmissionContext applicationSubmissionContext =
+ mock(ApplicationSubmissionContext.class);
+ when(applicationSubmissionContext.getApplicationId()).thenReturn(app);
+ tagAddProcessor.process("host.cluster2.com",
+ "cluster:cluster1", app, applicationSubmissionContext);
+ Set<String> applicationTags = new HashSet<String>();
+ applicationTags.add("cluster:cluster1");
+ verify(applicationSubmissionContext, times(1))
+ .setApplicationTags(applicationTags);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org