You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/11/27 05:48:29 UTC
svn commit: r884743 - in /hadoop/mapreduce/branches/branch-0.21: ./
src/java/org/apache/hadoop/mapred/ src/test/
src/test/mapred/org/apache/hadoop/mapred/
Author: sharad
Date: Fri Nov 27 04:48:28 2009
New Revision: 884743
URL: http://svn.apache.org/viewvc?rev=884743&view=rev
Log:
MAPREDUCE-28. Merge revision 883621 from trunk.
Added:
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
Removed:
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerForHierarchialQueues.java
Modified:
hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/QueueManager.java
hadoop/mapreduce/branches/branch-0.21/src/test/commit-tests
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=884743&r1=884742&r2=884743&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Fri Nov 27 04:48:28 2009
@@ -830,6 +830,9 @@
MAPREDUCE-1007. Fix NPE in CapacityTaskScheduler.getJobs().
(V.V.Chaitanya Krishna via sharad)
+ MAPREDUCE-28. Refactor TestQueueManager and fix default ACLs.
+ (V.V.Chaitanya Krishna and Rahul K Singh via sharad)
+
MAPREDUCE-1182. Fix overflow in reduce causing allocations to exceed the
configured threshold. (cdouglas)
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java?rev=884743&r1=884742&r2=884743&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java Fri Nov 27 04:48:28 2009
@@ -19,6 +19,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.Queue.QueueOperation;
import org.apache.hadoop.mapreduce.QueueState;
import org.apache.hadoop.security.SecurityUtil.AccessControlList;
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
@@ -232,6 +233,9 @@
validate(queueNode);
List<Element> subQueues = new ArrayList<Element>();
+ String submitKey = "";
+ String adminKey = "";
+
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element)) {
@@ -253,6 +257,10 @@
//parent.child
name += nameValue;
newQueue.setName(name);
+ submitKey = toFullPropertyName(name,
+ Queue.QueueOperation.SUBMIT_JOB.getAclName());
+ adminKey = toFullPropertyName(name,
+ Queue.QueueOperation.ADMINISTER_JOBS.getAclName());
}
if (QUEUE_TAG.equals(field.getTagName()) && field.hasChildNodes()) {
@@ -260,17 +268,11 @@
}
if(isAclsEnabled()) {
if (ACL_SUBMIT_JOB_TAG.equals(field.getTagName())) {
- String submitList = field.getTextContent();
- String aclKey = toFullPropertyName(
- name, Queue.QueueOperation.SUBMIT_JOB.getAclName());
- acls.put(aclKey, new AccessControlList(submitList));
+ acls.put(submitKey, new AccessControlList(field.getTextContent()));
}
if (ACL_ADMINISTER_JOB_TAG.equals(field.getTagName())) {
- String administerList = field.getTextContent();
- String aclKey = toFullPropertyName(
- name, Queue.QueueOperation.ADMINISTER_JOBS.getAclName());
- acls.put(aclKey, new AccessControlList(administerList));
+ acls.put(adminKey, new AccessControlList(field.getTextContent()));
}
}
@@ -284,6 +286,15 @@
newQueue.setState(QueueState.getState(state));
}
}
+
+ if (!acls.containsKey(submitKey)) {
+ acls.put(submitKey, new AccessControlList("*"));
+ }
+
+ if (!acls.containsKey(adminKey)) {
+ acls.put(adminKey, new AccessControlList("*"));
+ }
+
//Set acls
newQueue.setAcls(acls);
//At this point we have the queue ready at current height level.
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=884743&r1=884742&r2=884743&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/QueueManager.java Fri Nov 27 04:48:28 2009
@@ -121,7 +121,8 @@
return new DeprecatedQueueConfigurationParser(conf);
} else {
URL filePath =
- ClassLoader.getSystemClassLoader().getResource(QUEUE_CONF_FILE_NAME);
+ Thread.currentThread().getContextClassLoader()
+ .getResource(QUEUE_CONF_FILE_NAME);
return new QueueConfigurationParser(filePath.getPath());
}
}
Modified: hadoop/mapreduce/branches/branch-0.21/src/test/commit-tests
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/commit-tests?rev=884743&r1=884742&r2=884743&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/commit-tests (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/commit-tests Fri Nov 27 04:48:28 2009
@@ -38,7 +38,7 @@
**/TestTaskTrackerBlacklisting.java
**/TestTaskTrackerLocalization
**/TestTrackerDistributedCacheManager
-**/TestQueueManagerForHierarchialQueues
+**/TestQueueManager
**/TestContainerQueue
**/TestCapacityScheduler
**/TestRefreshOfQueues
Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java?rev=884743&r1=884742&r2=884743&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/QueueManagerTestUtils.java Fri Nov 27 04:48:28 2009
@@ -19,11 +19,25 @@
package org.apache.hadoop.mapred;
//import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.security.UnixUserGroupInformation;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import static org.apache.hadoop.mapred.Queue.*;
import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createAcls;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createProperties;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueue;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesNode;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createState;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.transform.TransformerException;
@@ -35,11 +49,15 @@
import java.util.Properties;
import java.util.Set;
import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
//@Private
public class QueueManagerTestUtils {
final static String CONFIG = new File("test-mapred-queues.xml")
- .getAbsolutePath();
+ .getAbsolutePath();
+ private static final Log LOG = LogFactory.getLog(QueueManagerTestUtils.class);
/**
* Create and return a new instance of a DOM Document object to build a queue
@@ -49,16 +67,15 @@
* @throws Exception
*/
public static Document createDocument() throws Exception {
- Document doc = DocumentBuilderFactory
- .newInstance().newDocumentBuilder().newDocument();
+ Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder()
+ .newDocument();
return doc;
}
- public static void createSimpleDocument(
- Document doc) throws Exception {
+ public static void createSimpleDocument(Document doc) throws Exception {
Element queues = createQueuesNode(doc, "true");
- //Create parent level queue q1.
+ // Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
Properties props = new Properties();
props.setProperty("capacity", "10");
@@ -66,30 +83,74 @@
q1.appendChild(createProperties(doc, props));
queues.appendChild(q1);
- //Create another parent level p1
+ // Create another parent level p1
Element p1 = createQueue(doc, "p1");
- //append child p11 to p1
+ // append child p11 to p1
p1.appendChild(createQueue(doc, "p11"));
Element p12 = createQueue(doc, "p12");
p12.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
- p12.appendChild(createAcls(doc, QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "u1"));
- p12.appendChild(createAcls(doc, QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
+ p12.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "u1"));
+ p12.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
- //append p12 to p1.
+ // append p12 to p1.
p1.appendChild(p12);
+ queues.appendChild(p1);
+ }
+
+ static void createSimpleDocumentWithAcls(Document doc, String aclsEnabled) {
+ Element queues = createQueuesNode(doc, aclsEnabled);
+
+ // Create parent level queue q1.
+ Element q1 = createQueue(doc, "q1");
+ Properties props = new Properties();
+ props.setProperty("capacity", "10");
+ props.setProperty("maxCapacity", "35");
+ q1.appendChild(createProperties(doc, props));
+ queues.appendChild(q1);
+
+ // Create another parent level p1
+ Element p1 = createQueue(doc, "p1");
+
+ // append child p11 to p1
+ Element p11 = createQueue(doc, "p11");
+ p11.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "u1"));
+ p11.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
+ p1.appendChild(p11);
+
+ // append child p12 to p1
+ Element p12 = createQueue(doc, "p12");
+ p12.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+ p12.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "*"));
+ p12.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "*"));
+ p1.appendChild(p12);
+
+ // append child p13 to p1
+ Element p13 = createQueue(doc, "p13");
+ p13.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+ p1.appendChild(p13);
+
+ // append child p14 to p1
+ Element p14 = createQueue(doc, "p14");
+ p14.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
+ p1.appendChild(p14);
queues.appendChild(p1);
}
- public static void refreshSimpleDocument(
- Document doc) throws Exception {
+ public static void refreshSimpleDocument(Document doc) throws Exception {
Element queues = createQueuesNode(doc, "true");
- //Create parent level queue q1.
+ // Create parent level queue q1.
Element q1 = createQueue(doc, "q1");
Properties props = new Properties();
props.setProperty("capacity", "70");
@@ -97,10 +158,10 @@
q1.appendChild(createProperties(doc, props));
queues.appendChild(q1);
- //Create another parent level p1
+ // Create another parent level p1
Element p1 = createQueue(doc, "p1");
- //append child p11 to p1
+ // append child p11 to p1
Element p11 = createQueue(doc, "p11");
p11.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
p1.appendChild(p11);
@@ -111,10 +172,9 @@
p12.appendChild(createAcls(doc, "acl-submit-job", "u3"));
p12.appendChild(createAcls(doc, "acl-administer-jobs", "u4"));
- //append p12 to p1.
+ // append p12 to p1.
p1.appendChild(p12);
-
queues.appendChild(p1);
}
@@ -134,7 +194,7 @@
}
public static void writeToFile(Document doc, String filePath)
- throws TransformerException {
+ throws TransformerException {
Transformer trans = TransformerFactory.newInstance().newTransformer();
trans.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
trans.setOutputProperty(OutputKeys.INDENT, "yes");
@@ -150,8 +210,8 @@
return queue;
}
- public static Element createAcls(
- Document doc, String aclName, String listNames) {
+ public static Element createAcls(Document doc, String aclName,
+ String listNames) {
Element acls = doc.createElement(aclName);
acls.setTextContent(listNames);
return acls;
@@ -192,8 +252,7 @@
* @throws Exception
*/
public static void writeQueueConfigurationFile(String filePath,
- JobQueueInfo[] rootQueues)
- throws Exception {
+ JobQueueInfo[] rootQueues) throws Exception {
Document doc = createDocument();
Element queueElements = createQueuesNode(doc, String.valueOf(true));
for (JobQueueInfo rootQ : rootQueues) {
@@ -202,4 +261,67 @@
}
writeToFile(doc, filePath);
}
+
+ static class QueueManagerConfigurationClassLoader extends ClassLoader {
+ @Override
+ public URL getResource(String name) {
+ if (!name.equals(QueueManager.QUEUE_CONF_FILE_NAME)) {
+ return super.getResource(name);
+ } else {
+ File resourceFile = new File(CONFIG);
+ if (!resourceFile.exists()) {
+ throw new IllegalStateException(
+ "Queue Manager configuration file not found");
+ }
+ try {
+ return resourceFile.toURL();
+ } catch (MalformedURLException e) {
+ LOG.fatal("Unable to form URL for the resource file : ");
+ }
+ return super.getResource(name);
+ }
+ }
+ }
+
+ static Job submitSleepJob(int numMappers, int numReducers, long mapSleepTime,
+ long reduceSleepTime, boolean shouldComplete, String userInfo,
+ String queueName, Configuration clientConf) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ clientConf.set(JTConfig.JT_IPC_ADDRESS, "localhost:"
+ + miniMRCluster.getJobTrackerPort());
+ if (userInfo != null) {
+ clientConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
+ }
+ if (queueName != null) {
+ clientConf.set(JobContext.QUEUE_NAME, queueName);
+ }
+ SleepJob sleep = new SleepJob();
+ sleep.setConf(clientConf);
+ Job job = sleep.createJob(numMappers, numReducers, mapSleepTime,
+ (int) mapSleepTime, reduceSleepTime, (int) reduceSleepTime);
+ if (shouldComplete) {
+ job.waitForCompletion(false);
+ } else {
+ job.submit();
+ // miniMRCluster.getJobTrackerRunner().getJobTracker().jobsToComplete()[]
+ Cluster cluster = new Cluster(miniMRCluster.createJobConf());
+ JobStatus[] status = miniMRCluster.getJobTrackerRunner().getJobTracker()
+ .jobsToComplete();
+ JobID id = status[status.length -1].getJobID();
+ Job newJob = cluster.getJob(id);
+ cluster.close();
+ return newJob;
+ }
+ return job;
+ }
+
+ static MiniMRCluster miniMRCluster;
+
+ static void setUpCluster(Configuration conf) throws IOException {
+ JobConf jobConf = new JobConf(conf);
+ String namenode = "file:///";
+ Thread.currentThread().setContextClassLoader(
+ new QueueManagerTestUtils.QueueManagerConfigurationClassLoader());
+ miniMRCluster = new MiniMRCluster(0, namenode, 3, null, null, jobConf);
+ }
}
Modified: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java?rev=884743&r1=884742&r2=884743&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java Fri Nov 27 04:48:28 2009
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,685 +18,564 @@
package org.apache.hadoop.mapred;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
+import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Collection;
+import java.io.StringWriter;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.TreeSet;
+import java.util.Map.Entry;
-import javax.security.auth.login.LoginException;
-import junit.framework.TestCase;
+public class TestQueueManager {
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.mapreduce.QueueState;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.mapreduce.SleepJob;
-import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation;
-import static org.apache.hadoop.mapred.DeprecatedQueueConfigurationParser.*;
+ private static final Log LOG = LogFactory.getLog(
+ TestQueueManager.class);
-public class TestQueueManager extends TestCase {
+ @After
+ public void tearDown() throws Exception {
+ new File(CONFIG).delete();
+ }
+
+ @Test
+ public void testDefault() throws Exception {
+ QueueManager qm = new QueueManager();
+ Queue root = qm.getRoot();
+ assertEquals(root.getChildren().size(), 1);
+ assertEquals(root.getChildren().iterator().next().getName(), "default");
+ assertFalse(qm.isAclsEnabled());
+ assertNull(root.getChildren().iterator().next().getChildren());
+ }
+
+ @Test
+ public void testXMLParsing() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+ Set<Queue> rootQueues = qm.getRoot().getChildren();
+ List<String> names = new ArrayList<String>();
+ for (Queue q : rootQueues) {
+ names.add(q.getName());
+ }
+
+ //Size of root.
+ assertEquals(rootQueues.size(), 2);
+
+ //check root level queues
+ assertTrue(names.contains("q1"));
+ assertTrue(names.contains("p1"));
+
+
+ //check for leaf names
+ Set<String> leafNames = qm.getLeafQueueNames();
+ Queue p = qm.getQueue("p1");
+ Set<Queue> children = p.getChildren();
+ assertTrue(children.size() == 2);
+
+ //check leaf level queues
+ assertTrue(
+ leafNames.contains(
+ "p1" + NAME_SEPARATOR + "p11"));
+ assertTrue(
+ leafNames.contains(
+ "p1" + NAME_SEPARATOR + "p12"));
+
+
+ Queue q = qm.getQueue(
+ "p1" + NAME_SEPARATOR + "p12");
+
+ assertTrue(
+ q.getAcls().get(
+ QueueManager.toFullPropertyName(
+ q.getName(), ACL_SUBMIT_JOB_TAG)).getUsers().contains(
+ "u1"));
+
+ assertTrue(
+ q.getAcls().get(
+ QueueManager.toFullPropertyName(
+ q.getName(),
+ ACL_ADMINISTER_JOB_TAG))
+ .getUsers().contains("u2"));
+ assertTrue(q.getState().equals(QueueState.STOPPED));
+ }
+
+ @Test
+ public void testhasAccess() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocumentWithAcls(doc,"true");
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+
+ UserGroupInformation ugi;
+ // test for acls access when acls are set with *
+ ugi = new UnixUserGroupInformation("u1", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
+ Queue.QueueOperation.SUBMIT_JOB, ugi));
+ ugi = new UnixUserGroupInformation("u2", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
+ Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+
+ // test for acls access when acls are not set with *
+ ugi = new UnixUserGroupInformation("u1", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
+ Queue.QueueOperation.SUBMIT_JOB, ugi));
+ ugi = new UnixUserGroupInformation("u2", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
+ Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+
+ // test for acls access when acls are not specified but acls is enabled
+ ugi = new UnixUserGroupInformation("u1", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+ Queue.QueueOperation.SUBMIT_JOB, ugi));
+ ugi = new UnixUserGroupInformation("u2", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+ Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+
+ assertTrue(qm.isRunning("p1" + NAME_SEPARATOR + "p13"));
+ }
+
+ @Test
+ public void testQueueView() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+
+ for (Queue queue : qm.getRoot().getChildren()) {
+ checkHierarchy(queue, qm);
+ }
+ }
+
+ private void checkHierarchy(Queue queue, QueueManager queueManager) {
+ JobQueueInfo jobQueueInfo = queueManager.getJobQueueInfo(queue.getName());
+ assertEquals(queue.getName(),jobQueueInfo.getQueueName());
+ assertEquals(queue.getState(),jobQueueInfo.getState());
+ if (queue.getChildren() !=null && queue.getChildren().size() > 0) {
+ for (Queue childQueue : queue.getChildren()) {
+ checkHierarchy(childQueue, queueManager);
+ }
+ }
+ }
+
+ @Test
+ public void testhasAccessForParent() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+
+ UserGroupInformation ugi =
+ new UnixUserGroupInformation("u1", new String[]{" "});
+ assertFalse(
+ qm.hasAccess(
+ "p1",
+ Queue.QueueOperation.SUBMIT_JOB, ugi));
+ }
+
+ @Test
+ public void testValidation() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ Element queues = createQueuesNode(doc, "false");
+ Element q1 = createQueue(doc, "q1");
+
+ q1.appendChild(createAcls(doc, "acl-submit-job", "u1"));
+ q1.appendChild(createAcls(doc, "acl-administer-jobs", "u2"));
+ q1.appendChild(createQueue(doc, "p15"));
+ q1.appendChild(createQueue(doc, "p16"));
- static final Log LOG = LogFactory.getLog(TestQueueManager.class);
-
- private MiniDFSCluster miniDFSCluster;
- private MiniMRCluster miniMRCluster;
-
- public void testMultipleQueues() {
- JobConf conf = new JobConf();
- conf.set("mapred.queue.names", "q1,q2,Q3");
- QueueManager qMgr = new QueueManager(conf);
- Set<String> expQueues = new TreeSet<String>();
- expQueues.add("q1");
- expQueues.add("q2");
- expQueues.add("Q3");
- verifyQueues(expQueues, qMgr.getLeafQueueNames());
- }
-
- public void testSchedulerInfo() {
- JobConf conf = new JobConf();
- conf.set("mapred.queue.names", "qq1,qq2");
- QueueManager qMgr = new QueueManager(conf);
- qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
- qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
- assertEquals(qMgr.getSchedulerInfo("qq2"), "queueInfoForqq2");
- assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
- }
-
- public void testAllEnabledACLForJobSubmission()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
- verifyJobSubmission(conf, true);
- }
-
- public void testAllDisabledACLForJobSubmission()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "");
- verifyJobSubmission(conf, false);
- }
-
- public void testUserDisabledACLForJobSubmission()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
- "3698-non-existent-user");
- verifyJobSubmission(conf, false);
- }
-
- public void testDisabledACLForNonDefaultQueue()
- throws IOException, InterruptedException, ClassNotFoundException {
- // allow everyone in default queue
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
- // setup a different queue
- conf.set("mapred.queue.names", "default,q1");
- // setup a different acl for this queue.
- conf.set("mapred.queue.q1.acl-submit-job", "dummy-user");
- // verify job submission to other queue fails.
- verifyJobSubmission(conf, false, "q1");
- }
-
- public void testSubmissionToInvalidQueue()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = new JobConf();
- conf.set("mapred.queue.names","default");
- setUpCluster(conf);
- String queueName = "q1";
+ queues.appendChild(q1);
+ writeToFile(doc, CONFIG);
try {
- Job rjob = submitSleepJob(1, 1, 100, 100, true, null, queueName);
- } catch (IOException ioe) {
- assertTrue(ioe.getMessage().contains("Queue \"" + queueName + "\" does " +
- "not exist"));
- return;
- } finally {
- tearDownCluster();
- }
- fail("Job submission to invalid queue job shouldnot complete " +
- ", it should fail with proper exception ");
- }
-
- public void testEnabledACLForNonDefaultQueue() throws IOException,
- LoginException, InterruptedException, ClassNotFoundException {
- // login as self...
- UserGroupInformation ugi = UnixUserGroupInformation.login();
- String userName = ugi.getUserName();
- // allow everyone in default queue
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
- // setup a different queue
- conf.set("mapred.queue.names", "default,q2");
- // setup a different acl for this queue.
- conf.set("mapred.queue.q2.acl-submit-job", userName);
- // verify job submission to other queue fails.
- verifyJobSubmission(conf, true, "q2");
- }
-
- public void testUserEnabledACLForJobSubmission()
- throws IOException, LoginException,
- InterruptedException, ClassNotFoundException {
- // login as self...
- UserGroupInformation ugi = UnixUserGroupInformation.login();
- String userName = ugi.getUserName();
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
- "3698-junk-user," + userName
- + " 3698-junk-group1,3698-junk-group2");
- verifyJobSubmission(conf, true);
- }
-
- public void testGroupsEnabledACLForJobSubmission()
- throws IOException, LoginException,
- InterruptedException, ClassNotFoundException {
- // login as self, get one group, and add in allowed list.
- UserGroupInformation ugi = UnixUserGroupInformation.login();
- String[] groups = ugi.getGroupNames();
- assertTrue(groups.length > 0);
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
- "3698-junk-user1,3698-junk-user2 "
- + groups[groups.length-1]
- + ",3698-junk-group");
- verifyJobSubmission(conf, true);
- }
-
- public void testAllEnabledACLForJobKill()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
- verifyJobKill(conf, true);
- }
-
- public void testAllDisabledACLForJobKill()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
- verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
- }
-
- public void testOwnerAllowedForJobKill()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
- "junk-user");
- verifyJobKill(conf, true);
- }
-
- public void testUserDisabledACLForJobKill()
- throws IOException, InterruptedException, ClassNotFoundException {
- //setup a cluster allowing a user to submit
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
- "dummy-user");
- verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
- }
-
- public void testUserEnabledACLForJobKill() throws IOException,
- LoginException, InterruptedException, ClassNotFoundException {
- // login as self...
- UserGroupInformation ugi = UnixUserGroupInformation.login();
- String userName = ugi.getUserName();
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
- "dummy-user,"+userName);
- verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-user-group");
- }
-
- public void testUserDisabledForJobPriorityChange()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
- "junk-user");
- verifyJobPriorityChangeAsOtherUser(conf, false,
- "junk-user,junk-user-group");
+ new QueueManager(CONFIG);
+ fail("Should throw an exception as configuration is wrong ");
+ } catch (RuntimeException re) {
+ LOG.info(re.getMessage());
+ }
}
-
- /**
- * Test to verify refreshing of queue properties by using MRAdmin tool.
- *
- * @throws Exception
- */
- public void testACLRefresh() throws Exception {
- String queueConfigPath =
- System.getProperty("test.build.extraconf", "build/test/extraconf");
- File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
+ @Test
+ public void testInvalidName() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ Element queues = createQueuesNode(doc, "false");
+ Element q1 = createQueue(doc, "");
+ queues.appendChild(q1);
+ writeToFile(doc, CONFIG);
try {
- //Setting up default mapred-site.xml
- Properties hadoopConfProps = new Properties();
- //these properties should be retained.
- hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
- hadoopConfProps.put("mapred.acls.enabled", "true");
- //These property should always be overridden
- hadoopConfProps.put("mapred.queue.default.acl-submit-job", "u1");
- hadoopConfProps.put("mapred.queue.q1.acl-submit-job", "u2");
- hadoopConfProps.put("mapred.queue.q2.acl-submit-job", "u1");
- //Actual property which would be used.
- hadoopConfProps.put("mapred.queue.default.acl-submit-job", " ");
- //Writing out the queue configuration file.
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-
- //Create a new configuration to be used with QueueManager
- JobConf conf = new JobConf();
- QueueManager queueManager = new QueueManager(conf);
- UserGroupInformation ugi =
- new UnixUserGroupInformation("unknownUser",new String[]{" "});
- //Job Submission should fail because ugi to be used is set to blank.
- assertFalse("User Job Submission Succeeded before refresh.",
- queueManager.hasAccess("default", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertFalse("User Job Submission Succeeded before refresh.",
- queueManager.hasAccess("q1", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertFalse("User Job Submission Succeeded before refresh.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
-
- //Test job submission as alternate user.
- Configuration alternateUserConfig = new Configuration();
- alternateUserConfig.set("hadoop.job.ugi","u1,users");
- UserGroupInformation alternateUgi =
- UserGroupInformation.readFrom(alternateUserConfig);
- assertTrue("Alternate User Job Submission failed before refresh.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, alternateUgi));
-
- //Set acl for the current user.
- hadoopConfProps.put(MAPRED_QUEUE_NAMES_KEY, "default,q1,q2");
- hadoopConfProps.put("mapred.acls.enabled", "true");
- hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
- hadoopConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
- hadoopConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
- //refresh configuration
- queueManager.refreshQueues(conf, null);
- //Submission should succeed
- assertTrue("User Job Submission failed after refresh.",
- queueManager.hasAccess("default", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed after refresh.",
- queueManager.hasAccess("q1", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed after refresh.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertFalse("Alternate User Job Submission succeeded after refresh.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, alternateUgi));
- //rewrite the mapred-site.xml
- hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
- hadoopConfProps.put("mapred.acls.enabled", "true");
- hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
- queueManager.refreshQueues(conf, null);
- assertTrue("User Job Submission failed after refresh and no queue acls file.",
- queueManager.hasAccess("default", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- } finally{
- if(hadoopConfigFile.exists()) {
- hadoopConfigFile.delete();
- }
+ new QueueManager(CONFIG);
+ fail("Should throw an exception as configuration is wrong ");
+ } catch (Exception re) {
+ re.printStackTrace();
+ LOG.info(re.getMessage());
+ }
+ checkForConfigFile();
+ doc = createDocument();
+ queues = createQueuesNode(doc, "false");
+ q1 = doc.createElement("queue");
+ queues.appendChild(q1);
+ writeToFile(doc, CONFIG);
+ try {
+ new QueueManager(CONFIG);
+ fail("Should throw an exception as configuration is wrong ");
+ } catch (RuntimeException re) {
+ re.printStackTrace();
+ LOG.info(re.getMessage());
}
}
- /**
- * Test to verify refreshing of queue properties by using MRAdmin tool.
- *
- * @throws Exception
- */
- public void testStateRefresh() throws Exception {
- String queueConfigPath =
- System.getProperty("test.build.extraconf", "build/test/extraconf");
- File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
-
+ @Test
+ public void testEmptyProperties() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ Element queues = createQueuesNode(doc, "false");
+ Element q1 = createQueue(doc, "q1");
+ Element p = createProperties(doc, null);
+ q1.appendChild(p);
+ queues.appendChild(q1);
+ }
+
+ @Test
+ public void testEmptyFile() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ writeToFile(doc, CONFIG);
try {
- //Setting up default mapred-site.xml
- Properties queueConfProps = new Properties();
- //these properties should be retained.
- queueConfProps.put("mapred.queue.names", "default,qu1");
- queueConfProps.put("mapred.acls.enabled", "true");
- //These property should always be overridden
- queueConfProps.put("mapred.queue.default.state", "running");
- queueConfProps.put("mapred.queue.qu1.state", "stopped");
- UtilsForTests.setUpConfigFile(queueConfProps, hadoopConfigFile);
-
- //Create a new configuration to be used with QueueManager
- JobConf conf = new JobConf();
- setUpCluster(conf);
- QueueManager queueManager =
- this.miniMRCluster.getJobTrackerRunner().getJobTracker().getQueueManager();
-
- try{
- Job job = submitSleepJob(10, 2, 10, 10, true,null, "default" );
- assert(job.isSuccessful());
- }catch(Exception e){
- fail("submit job in default queue should be sucessful ");
- }
-
- try{
- submitSleepJob(10, 2, 10, 10, true,null, "qu1" );
- fail("submit job in default queue should be failed ");
- }catch(Exception e){
- assert(e.getMessage().contains("Queue \"" + "qu1" + "\" is not running"));
- }
-
- // verify state of queues before refresh
- JobQueueInfo queueInfo = queueManager.getJobQueueInfo("default");
- assertEquals(QueueState.RUNNING.getStateName(),
- queueInfo.getQueueState());
- queueInfo = queueManager.getJobQueueInfo("qu1");
- assertEquals(QueueState.STOPPED.getStateName(),
- queueInfo.getQueueState());
- queueConfProps.put("mapred.queue.names", "default,qu1");
- queueConfProps.put("mapred.acls.enabled", "true");
- queueConfProps.put("mapred.queue.default.state", "stopped");
- queueConfProps.put("mapred.queue.qu1.state", "running");
- UtilsForTests.setUpConfigFile(queueConfProps, hadoopConfigFile);
-
- //refresh configuration
- queueManager.refreshQueues(conf, null);
-
- //Job Submission should pass now because ugi to be used is set to blank.
- try{
- submitSleepJob(10, 2, 10, 10, true,null,"qu1");
- }catch(Exception e){
- fail("submit job in qu1 queue should be sucessful ");
- }
-
- try{
- submitSleepJob(10, 2, 10, 10, true,null, "default" );
- fail("submit job in default queue should be failed ");
- }catch(Exception e){
- assert(e.getMessage().contains("Queue \"" + "default" + "\" is not running"));
- }
+ new QueueManager(CONFIG);
+ fail("Should throw an exception as configuration is wrong ");
+ } catch (Exception re) {
+ re.printStackTrace();
+ LOG.info(re.getMessage());
+ }
+ }
- // verify state of queues after refresh
- queueInfo = queueManager.getJobQueueInfo("default");
- assertEquals(QueueState.STOPPED.getStateName(),
- queueInfo.getQueueState());
- queueInfo = queueManager.getJobQueueInfo("qu1");
- assertEquals(QueueState.RUNNING.getStateName(),
- queueInfo.getQueueState());
- } finally{
- if(hadoopConfigFile.exists()) {
- hadoopConfigFile.delete();
+ @Test
+ public void testJobQueueInfoGeneration() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+
+ List<JobQueueInfo> rootQueues =
+ qm.getRoot().getJobQueueInfo().getChildren();
+ assertEquals(rootQueues.size(), 2);
+ List<String> names = new ArrayList<String>();
+ for (JobQueueInfo q : rootQueues) {
+ names.add(q.getQueueName());
+ if (q.getQueueName().equals("q1")) {
+ Properties p = q.getProperties();
+ assertEquals(p.getProperty("capacity"), "10");
+ assertEquals(p.getProperty("maxCapacity"), "35");
+
+ assertTrue(q.getChildren().isEmpty());
+ } else if (q.getQueueName().equals("p1")) {
+ List<JobQueueInfo> children = q.getChildren();
+ assertEquals(children.size(), 2);
+ for (JobQueueInfo child : children) {
+ if (child.getQueueName().equals(
+ "p1" + NAME_SEPARATOR + "p12")) {
+ assertEquals(
+ child.getQueueState(), QueueState.STOPPED.getStateName());
+ } else if (child.getQueueName().equals(
+ "p1" + NAME_SEPARATOR + "p11")) {
+ assertEquals(
+ child.getQueueState(), QueueState.RUNNING.getStateName());
+ } else {
+ fail("Only 2 children");
+ }
+ }
+ } else {
+ fail("Only 2 queues with q1 and p1 ");
}
- this.tearDownCluster();
}
}
- public void testQueueAclRefreshWithInvalidConfFile() throws IOException {
- String queueConfigPath =
- System.getProperty("test.build.extraconf", "build/test/extraconf");
-
- File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
- try {
- // queue properties with which the cluster is started.
- Properties hadoopConfProps = new Properties();
- hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
- hadoopConfProps.put("mapred.acls.enabled", "true");
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-
- //properties for mapred-queue-acls.xml
- UserGroupInformation ugi =
- new UnixUserGroupInformation("unknownUser",new String[]{" "});
- hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
- hadoopConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
- hadoopConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-
- Configuration conf = new JobConf();
- QueueManager queueManager = new QueueManager(conf);
- //Testing access to queue.
- assertTrue("User Job Submission failed.",
- queueManager.hasAccess("default", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed.",
- queueManager.hasAccess("q1", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
-
- //Write out a new incomplete invalid configuration file.
- PrintWriter writer = new PrintWriter(new FileOutputStream(hadoopConfigFile));
- writer.println("<configuration>");
- writer.println("<property>");
- writer.flush();
- writer.close();
- try {
- //Exception to be thrown by queue manager because configuration passed
- //is invalid.
- queueManager.refreshQueues(conf, null);
- fail("Refresh of ACLs should have failed with invalid conf file.");
- } catch (Exception e) {
+ /**
+ * Test the refresh of queues.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testRefresh() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+ Queue beforeRefreshRoot = qm.getRoot();
+ //remove the file and create new one.
+ Set<Queue> rootQueues = beforeRefreshRoot.getChildren();
+ for (Queue qs : rootQueues) {
+ if (qs.getName().equals("q1")) {
+
+ assertEquals(qs.getProperties().getProperty("capacity"), "10");
+ assertEquals(qs.getProperties().getProperty("maxCapacity"), "35");
+
+ } else if (qs.getName().equals("p1")) {
+
+ Set<Queue> children = qs.getChildren();
+ for (Queue child : children) {
+ if (child.getName().equals(
+ "p1" + NAME_SEPARATOR + "p12")) {
+ assertTrue(
+ child.getAcls().get(
+ QueueManager.toFullPropertyName(
+ child.getName(), ACL_SUBMIT_JOB_TAG))
+ .getUsers().contains("u1"));
+
+ assertTrue(
+ child.getAcls().get(
+ QueueManager.toFullPropertyName(
+ child.getName(),
+ ACL_ADMINISTER_JOB_TAG))
+ .getUsers().contains("u2"));
+ assertTrue(child.getState().equals(QueueState.STOPPED));
+ } else {
+ assertTrue(child.getState().equals(QueueState.RUNNING));
+ }
+ }
}
- assertTrue("User Job Submission failed after invalid conf file refresh.",
- queueManager.hasAccess("default", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed after invalid conf file refresh.",
- queueManager.hasAccess("q1", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed after invalid conf file refresh.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- } finally {
- //Cleanup the configuration files in all cases
- if(hadoopConfigFile.exists()) {
- hadoopConfigFile.delete();
+ }
+ checkForConfigFile();
+ doc = createDocument();
+ refreshSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+ qm.getRoot().isHierarchySameAs(cp.getRoot());
+ qm.setQueues(
+ cp.getRoot().getChildren().toArray(
+ new Queue[cp.getRoot().getChildren().size()]));
+ Queue afterRefreshRoot = qm.getRoot();
+ //remove the file and create new one.
+ rootQueues = afterRefreshRoot.getChildren();
+ for (Queue qs : rootQueues) {
+ if (qs.getName().equals("q1")) {
+
+ assertEquals(qs.getProperties().getProperty("capacity"), "70");
+ assertEquals(qs.getProperties().getProperty("maxCapacity"), "35");
+
+ } else if (qs.getName().equals("p1")) {
+
+ Set<Queue> children = qs.getChildren();
+ for (Queue child : children) {
+ if (child.getName().equals(
+ "p1" + NAME_SEPARATOR + "p12")) {
+ assertTrue(
+ child.getAcls().get(
+ QueueManager.toFullPropertyName(
+ child.getName(),
+ ACL_SUBMIT_JOB_TAG))
+ .getUsers().contains("u3"));
+
+ assertTrue(
+ child.getAcls().get(
+ QueueManager.toFullPropertyName(
+ child.getName(),
+ ACL_ADMINISTER_JOB_TAG))
+ .getUsers().contains("u4"));
+ assertTrue(child.getState().equals(QueueState.RUNNING));
+ } else {
+ assertTrue(child.getState().equals(QueueState.STOPPED));
+ }
+ }
}
}
}
- private JobConf setupConf(String aclName, String aclValue) {
- JobConf conf = new JobConf();
- if(conf.get("mapred.queue.names") == null) {
- conf.set("mapred.queue.names","default");
- }
- conf.setBoolean("mapred.acls.enabled", true);
- conf.set(aclName, aclValue);
- return conf;
- }
-
- private void verifyQueues(Set<String> expectedQueues,
- Set<String> actualQueues) {
- assertEquals(expectedQueues.size(), actualQueues.size());
- for (String queue : expectedQueues) {
- assertTrue(actualQueues.contains(queue));
+ @Test
+ public void testRefreshWithInvalidFile() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+
+ checkForConfigFile();
+ doc = createDocument();
+ Element queues = createQueuesNode(doc, "false");
+ Element q1 = createQueue(doc, "");
+ queues.appendChild(q1);
+ writeToFile(doc, CONFIG);
+ try {
+ QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+
+ fail("Should throw an exception as configuration is wrong ");
+ } catch (Throwable re) {
+ re.printStackTrace();
+ LOG.info(re.getMessage());
}
}
-
- private void verifyJobSubmission(JobConf conf, boolean shouldSucceed)
- throws IOException, InterruptedException, ClassNotFoundException {
- verifyJobSubmission(conf, shouldSucceed, "default");
- }
- private void verifyJobSubmission(JobConf conf, boolean shouldSucceed,
- String queue)
- throws IOException, InterruptedException, ClassNotFoundException {
- if(conf.get("mapred.queue.names") == null) {
- conf.set("mapred.queue.names","default");
- }
- setUpCluster(conf);
- try {
- runAndVerifySubmission(conf, shouldSucceed, queue, null);
- } finally {
- tearDownCluster();
+ /**
+ * Class to store the array of queues retrieved by parsing the string
+ * that is dumped in Json format
+ */
+ static class JsonQueueTree {
+ boolean acls_enabled;
+
+ JsonQueue[] queues;
+
+ public JsonQueue[] getQueues() {
+ return queues;
}
- }
- private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
- String queue, String userInfo)
- throws IOException, InterruptedException, ClassNotFoundException {
- try {
- Job rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
- if (shouldSucceed) {
- assertTrue(rjob.isSuccessful());
- } else {
- fail("Job submission should have failed.");
- }
- } catch (IOException ioe) {
- if (shouldSucceed) {
- throw ioe;
- } else {
- LOG.info("exception while submitting job: " + ioe.getMessage());
- assertTrue(ioe.getMessage().
- contains("cannot perform operation " +
- "SUBMIT_JOB on queue " + queue));
- // check if the system directory gets cleaned up or not
- JobTracker jobtracker = miniMRCluster.getJobTrackerRunner().getJobTracker();
- Path sysDir = new Path(jobtracker.getSystemDir());
- FileSystem fs = sysDir.getFileSystem(conf);
- int size = fs.listStatus(sysDir).length;
- while (size > 1) { // ignore the jobtracker.info file
- System.out.println("Waiting for the job files in sys directory to be cleaned up");
- UtilsForTests.waitFor(100);
- size = fs.listStatus(sysDir).length;
- }
- }
- } finally {
- tearDownCluster();
+ public void setQueues(JsonQueue[] queues) {
+ this.queues = queues;
}
-}
- private void verifyJobKill(JobConf conf, boolean shouldSucceed)
- throws IOException, InterruptedException, ClassNotFoundException {
- setUpCluster(conf);
- try {
- Job rjob = submitSleepJob(1, 1, 1000, 1000, false);
- assertFalse(rjob.isComplete());
- while(rjob.mapProgress() == 0.0f) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException ie) {
- break;
- }
- }
- rjob.killJob();
- while (!rjob.isComplete()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException ie) {
- break;
- }
- }
- if (shouldSucceed) {
- assertTrue(!rjob.isSuccessful());
- } else {
- fail("Job kill should have failed.");
- }
- } catch (IOException ioe) {
- if (shouldSucceed) {
- throw ioe;
- } else {
- LOG.info("exception while submitting job: " + ioe.getMessage());
- assertTrue(ioe.getMessage().
- contains("cannot perform operation " +
- "ADMINISTER_JOBS on queue default"));
- }
- } finally {
- tearDownCluster();
+ public boolean isAcls_enabled() {
+ return acls_enabled;
}
- }
-
- private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
- String otherUserInfo)
- throws IOException, InterruptedException, ClassNotFoundException {
- setUpCluster(conf);
- try {
- // submit a job as another user.
- String userInfo = otherUserInfo;
- Job job = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
- assertFalse(job.isComplete());
-
- //try to kill as self
- try {
- JobClient jc = new JobClient(miniMRCluster.createJobConf());
- RunningJob rjob = jc.getJob((JobID)job.getID());
- rjob.killJob();
- if (!shouldSucceed) {
- fail("should fail kill operation");
- }
- } catch (IOException ioe) {
- if (shouldSucceed) {
- throw ioe;
- }
- //verify it fails
- LOG.info("exception while submitting job: " + ioe.getMessage());
- assertTrue(ioe.getMessage().
- contains("cannot perform operation " +
- "ADMINISTER_JOBS on queue default"));
- }
- //wait for job to complete on its own
- while (!job.isComplete()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- break;
- }
- }
- } finally {
- tearDownCluster();
+ public void setAcls_enabled(boolean aclsEnabled) {
+ acls_enabled = aclsEnabled;
}
}
- private void verifyJobPriorityChangeAsOtherUser(JobConf conf,
- boolean shouldSucceed, String otherUserInfo)
- throws IOException, InterruptedException, ClassNotFoundException {
- setUpCluster(conf);
- try {
- // submit job as another user.
- String userInfo = otherUserInfo;
- Job job = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
- assertFalse(job.isComplete());
-
- // try to change priority as self
- try {
- JobClient jc = new JobClient(miniMRCluster.createJobConf());
- RunningJob rjob = jc.getJob((JobID)job.getID());
- rjob.setJobPriority("VERY_LOW");
- if (!shouldSucceed) {
- fail("changing priority should fail.");
- }
- } catch (IOException ioe) {
- //verify it fails
- LOG.info("exception while submitting job: " + ioe.getMessage());
- assertTrue(ioe.getMessage().
- contains("cannot perform operation " +
- "ADMINISTER_JOBS on queue default"));
- }
- //wait for job to complete on its own
- while (!job.isComplete()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- break;
- }
- }
- } finally {
- tearDownCluster();
+ /**
+ * Class to store the contents of each queue that is dumped in JSON format.
+ */
+ static class JsonQueue {
+ String name;
+ String state;
+ String acl_submit_job;
+ String acl_administer_jobs;
+ JsonProperty[] properties;
+ JsonQueue[] children;
+ public String getName() {
+ return name;
+ }
+ public String getState() {
+ return state;
+ }
+ public JsonProperty[] getProperties() {
+ return properties;
+ }
+ public JsonQueue[] getChildren() {
+ return children;
+ }
+ public void setName(String name) {
+ this.name = name;
+ }
+ public void setState(String state) {
+ this.state = state;
+ }
+ public void setProperties(JsonProperty[] properties) {
+ this.properties = properties;
+ }
+ public void setChildren(JsonQueue[] children) {
+ this.children = children;
+ }
+ public String getAcl_submit_job() {
+ return acl_submit_job;
+ }
+ public void setAcl_submit_job(String aclSubmitJob) {
+ acl_submit_job = aclSubmitJob;
+ }
+ public String getAcl_administer_jobs() {
+ return acl_administer_jobs;
+ }
+ public void setAcl_administer_jobs(String aclAdministerJobs) {
+ acl_administer_jobs = aclAdministerJobs;
}
}
- private void setUpCluster(JobConf conf) throws IOException {
- if(conf.get("mapred.queue.names") == null)
- conf.set("mapred.queue.names","default");
- miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
- FileSystem fileSys = miniDFSCluster.getFileSystem();
- String namenode = fileSys.getUri().toString();
- miniMRCluster = new MiniMRCluster(1, namenode, 3,
- null, null, conf);
- }
-
- private void tearDownCluster() throws IOException {
- if (miniMRCluster != null) { miniMRCluster.shutdown(); }
- if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
- }
-
- private Job submitSleepJob(int numMappers, int numReducers,
- long mapSleepTime, long reduceSleepTime,
- boolean shouldComplete)
- throws IOException, InterruptedException, ClassNotFoundException {
- return submitSleepJob(numMappers, numReducers, mapSleepTime,
- reduceSleepTime, shouldComplete, null);
- }
-
- private Job submitSleepJob(int numMappers, int numReducers,
- long mapSleepTime, long reduceSleepTime,
- boolean shouldComplete, String userInfo)
- throws IOException, InterruptedException, ClassNotFoundException {
- return submitSleepJob(numMappers, numReducers, mapSleepTime,
- reduceSleepTime, shouldComplete, userInfo, null);
- }
-
- private Job submitSleepJob(int numMappers, int numReducers,
- long mapSleepTime, long reduceSleepTime,
- boolean shouldComplete, String userInfo,
- String queueName)
- throws IOException, InterruptedException, ClassNotFoundException {
- Configuration clientConf = new Configuration();
- clientConf.set(JTConfig.JT_IPC_ADDRESS, "localhost:"
- + miniMRCluster.getJobTrackerPort());
- if (userInfo != null) {
- clientConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
- }
- if (queueName != null) {
- clientConf.set(JobContext.QUEUE_NAME, queueName);
- }
- SleepJob sleep = new SleepJob();
- sleep.setConf(clientConf);
- Job job = sleep.createJob(numMappers, numReducers,
- mapSleepTime, (int)mapSleepTime/100,
- reduceSleepTime, (int)reduceSleepTime/100);
- if (shouldComplete) {
- job.waitForCompletion(false);
- } else {
- job.submit();
+ /**
+ * Class to store the contents of attribute "properties" in Json dump
+ */
+ static class JsonProperty {
+ String key;
+ String value;
+ public String getKey() {
+ return key;
+ }
+ public void setKey(String key) {
+ this.key = key;
+ }
+ public String getValue() {
+ return value;
+ }
+ public void setValue(String value) {
+ this.value = value;
}
- return job;
}
+ /**
+ * checks the format of the dump in JSON format when
+ * QueueManager.dumpConfiguration(Writer) is called.
+ * @throws Exception
+ */
+ @Test
+ public void testDumpConfiguration() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ StringWriter out = new StringWriter();
+ QueueManager.dumpConfiguration(out,CONFIG,null);
+ ObjectMapper mapper = new ObjectMapper();
+ // parse the Json dump
+ JsonQueueTree queueTree =
+ mapper.readValue(out.toString(), JsonQueueTree.class);
+
+ // check if acls_enabled is correct
+ assertEquals(true, queueTree.isAcls_enabled());
+ // check for the number of top-level queues
+ assertEquals(2, queueTree.getQueues().length);
+
+ HashMap<String, JsonQueue> topQueues = new HashMap<String, JsonQueue>();
+ for (JsonQueue topQueue : queueTree.getQueues()) {
+ topQueues.put(topQueue.getName(), topQueue);
+ }
+
+ // check for consistency in number of children
+ assertEquals(2, topQueues.get("p1").getChildren().length);
+
+ HashMap<String, JsonQueue> childQueues = new HashMap<String, JsonQueue>();
+ for (JsonQueue child : topQueues.get("p1").getChildren()) {
+ childQueues.put(child.getName(), child);
+ }
+
+ // check for consistency in state
+ assertEquals("stopped", childQueues.get("p1:p12").getState());
+
+ // check for consistency in properties
+ HashMap<String, JsonProperty> q1_properties =
+ new HashMap<String, JsonProperty>();
+ for (JsonProperty prop : topQueues.get("q1").getProperties()) {
+ q1_properties.put(prop.getKey(), prop);
+ }
+ assertEquals("10", q1_properties.get("capacity").getValue());
+ assertEquals("35", q1_properties.get("maxCapacity").getValue());
+
+ // check for acls
+ assertEquals("u1", childQueues.get("p1:p12").getAcl_submit_job());
+ assertEquals("u2", childQueues.get("p1:p12").getAcl_administer_jobs());
+ }
}
Added: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java?rev=884743&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithDeprecatedConf.java Fri Nov 27 04:48:28 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+
+import javax.security.auth.login.LoginException;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import static org.apache.hadoop.mapred.DeprecatedQueueConfigurationParser.*;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
+
+public class TestQueueManagerWithDeprecatedConf extends TestCase {
+
+ static final Log LOG = LogFactory.getLog(TestQueueManagerWithDeprecatedConf.class);
+
+
+
+ public void testMultipleQueues() {
+ JobConf conf = new JobConf();
+ conf.set("mapred.queue.names", "q1,q2,Q3");
+ QueueManager qMgr = new QueueManager(conf);
+ Set<String> expQueues = new TreeSet<String>();
+ expQueues.add("q1");
+ expQueues.add("q2");
+ expQueues.add("Q3");
+ verifyQueues(expQueues, qMgr.getLeafQueueNames());
+ }
+
+ public void testSchedulerInfo() {
+ JobConf conf = new JobConf();
+ conf.set("mapred.queue.names", "qq1,qq2");
+ QueueManager qMgr = new QueueManager(conf);
+ qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
+ qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
+ assertEquals(qMgr.getSchedulerInfo("qq2"), "queueInfoForqq2");
+ assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
+ }
+
+
+ public void testQueueManagerWithDeprecatedConf() throws IOException {
+ String queueConfigPath =
+ System.getProperty("test.build.extraconf", "build/test/extraconf");
+
+ File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
+ try {
+ // queue properties with which the cluster is started.
+ Properties hadoopConfProps = new Properties();
+ hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
+ hadoopConfProps.put("mapred.acls.enabled", "true");
+ UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+
+ //properties for mapred-queue-acls.xml
+ UserGroupInformation ugi =
+ new UnixUserGroupInformation("unknownUser",new String[]{" "});
+ hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
+ hadoopConfProps.put("mapred.queue.q1.acl-submit-job", "u1");
+ hadoopConfProps.put("mapred.queue.q2.acl-submit-job", "*");
+ hadoopConfProps.put("mapred.queue.default.acl-administer-jobs", ugi.getUserName());
+ hadoopConfProps.put("mapred.queue.q1.acl-administer-jobs", "u1");
+ hadoopConfProps.put("mapred.queue.q2.acl-administer-jobs", "*");
+
+ UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+
+ Configuration conf = new JobConf();
+ QueueManager queueManager = new QueueManager(conf);
+ //Testing access to queue.
+ assertTrue("User Job Submission failed.",
+ queueManager.hasAccess("default", Queue.QueueOperation.
+ SUBMIT_JOB, ugi));
+ assertFalse("User Job Submission failed.",
+ queueManager.hasAccess("q1", Queue.QueueOperation.
+ SUBMIT_JOB, ugi));
+ assertTrue("User Job Submission failed.",
+ queueManager.hasAccess("q2", Queue.QueueOperation.
+ SUBMIT_JOB, ugi));
+ //Testing the admin acls
+ assertTrue("User Job Submission failed.",
+ queueManager.hasAccess("default", Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+ assertFalse("User Job Submission failed.",
+ queueManager.hasAccess("q1", Queue.QueueOperation.
+ ADMINISTER_JOBS, ugi));
+ assertTrue("User Job Submission failed.",
+ queueManager.hasAccess("q2", Queue.QueueOperation.
+ ADMINISTER_JOBS, ugi));
+
+
+ } finally {
+ //Cleanup the configuration files in all cases
+ if(hadoopConfigFile.exists()) {
+ hadoopConfigFile.delete();
+ }
+ }
+ }
+
+ private void verifyQueues(Set<String> expectedQueues,
+ Set<String> actualQueues) {
+ assertEquals(expectedQueues.size(), actualQueues.size());
+ for (String queue : expectedQueues) {
+ assertTrue(actualQueues.contains(queue));
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java?rev=884743&view=auto
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java (added)
+++ hadoop/mapreduce/branches/branch-0.21/src/test/mapred/org/apache/hadoop/mapred/TestQueueManagerWithJobTracker.java Fri Nov 27 04:48:28 2009
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.apache.hadoop.mapred.QueueConfigurationParser.NAME_SEPARATOR;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.CONFIG;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.checkForConfigFile;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createAcls;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createDocument;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createProperties;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueue;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createQueuesNode;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createSimpleDocumentWithAcls;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.createState;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.miniMRCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.setUpCluster;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.submitSleepJob;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.writeToFile;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.tools.MRAdmin;
+import org.apache.hadoop.mapreduce.Cluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+
+public class TestQueueManagerWithJobTracker {
+
+ private static Configuration conf;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocumentWithAcls(doc, "true");
+ writeToFile(doc, CONFIG);
+ conf = new Configuration();
+ conf.addResource(CONFIG);
+ conf.set("mapred.committer.job.setup.cleanup.needed", "false");
+ setUpCluster(conf);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ new File(CONFIG).delete();
+ }
+
+ /**
+ * Test to check that jobs cannot be submitted to a queue in STOPPED state
+ * @throws Exception
+ */
+ @Test(expected = IOException.class)
+ public void testSubmitJobForStoppedQueue() throws Exception {
+ submitSleepJob(10, 10, 100, 100, false, null,
+ "p1" + NAME_SEPARATOR + "p14", conf);
+ fail("queue p1:p14 is in stopped state and should not accept jobs");
+ }
+
+ /**
+ * Test to check that jobs cannot be submitted to a container queue
+ * @throws Exception
+ */
+ @Test(expected = IOException.class)
+ public void testSubmitJobForContainerQueue() throws Exception {
+ submitSleepJob(10, 10, 100, 100, false, null, "p1", conf);
+ fail("queue p1 is a container queue and cannot have jobs");
+ }
+
+ /**
+ * Tests the submission of job with specified acls
+ * @throws Exception
+ */
+ @Test
+ public void testAclsForSubmitJob() throws Exception {
+ Job job;
+ UserGroupInformation.setCurrentUGI(UnixUserGroupInformation.login());
+ // submit job to queue p1:p13 with unspecified acls
+ job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1" + NAME_SEPARATOR
+ + "p13", conf);
+ assertTrue("Job submission for u1 failed in queue : p1:p13.",
+ job.isSuccessful());
+ // check for access to submit the job
+ try {
+ job = submitSleepJob(0, 0, 0, 0, false, "u2,g1", "p1" + NAME_SEPARATOR
+ + "p11", conf);
+ fail("user u2 cannot submit jobs to queue p1:p11");
+ } catch (Exception e) {
+ }
+ // submit job to queue p1:p11 with acls-submit-job as u1
+ job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1"
+ + NAME_SEPARATOR + "p11", conf);
+ assertTrue("Job submission for u1 failed in queue : p1:p11.",
+ job.isSuccessful());
+ }
+
+ /**
+ * Tests the accessibility to kill a job
+ * @throws Exception
+ */
+ @Test
+ public void testAccessToKillJob() throws Exception {
+ Job job = submitSleepJob(1, 1, 100, 100, false, "u1,g1", "p1"
+ + NAME_SEPARATOR + "p11", conf);
+ UserGroupInformation.setCurrentUGI(UnixUserGroupInformation.login());
+ JobConf jobConf = miniMRCluster.createJobConf();
+ Cluster cluster = null;
+ JobID jobID = job.getStatus().getJobID();
+ //Ensure that the jobinprogress is initied before we issue a kill
+ //signal to the job.
+ JobTracker tracker = miniMRCluster.getJobTrackerRunner().getJobTracker();
+ JobInProgress jip = tracker.getJob(org.apache.hadoop.mapred.JobID
+ .downgrade(jobID));
+ tracker.initJob(jip);
+ try {
+ tracker.killJob(jobID);
+ fail("current user is neither u1 nor in the administer group list");
+ } catch (Exception e) {
+ Configuration userConf = new Configuration(miniMRCluster.createJobConf());
+ userConf.set("hadoop.job.ugi", "u1,g1");
+ cluster = new Cluster(userConf);
+ cluster.getJob(jobID).killJob();
+ // kill the running job
+ assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
+ cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
+ }
+
+ job = submitSleepJob(1, 1, 100, 100, false, "u1,g1", "p1" + NAME_SEPARATOR
+ + "p12", conf);
+ jobID = job.getStatus().getJobID();
+ //Ensure that the jobinprogress is initied before we issue a kill
+ //signal to the job.
+ jip = tracker.getJob(org.apache.hadoop.mapred.JobID.downgrade(jobID));
+ tracker.initJob(jip);
+ tracker.killJob(job.getID());
+ // kill the job by the user who submitted the job
+ assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
+ cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
+
+ Configuration userConf = new Configuration(miniMRCluster.createJobConf());
+ userConf.set("hadoop.job.ugi", "u1,g1");
+ cluster = new Cluster(userConf);
+ job = submitSleepJob(1, 1, 10, 10, false, "u1,g1", "p1" + NAME_SEPARATOR
+ + "p11", conf);
+ jobID = job.getStatus().getJobID();
+ //Ensure that the jobinprogress is initied before we issue a kill
+ //signal to the job.
+ jip = tracker.getJob(org.apache.hadoop.mapred.JobID.downgrade(jobID));
+ tracker.initJob(jip);
+ jobConf.set("hadoop.job.ugi", "u3,g3");
+ cluster = new Cluster(jobConf);
+ // try killing job with user not in administer list
+ try {
+ cluster.getJob(jobID).killJob();
+ fail("u3 not in administer list");
+ } catch (Exception e) {
+ jobConf.set("hadoop.job.ugi", "u1,g1");
+ cluster = new Cluster(jobConf);
+ assertFalse(cluster.getJob(jobID).isComplete());
+ cluster.getJob(jobID).killJob();
+ // kill the running job
+ assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
+ cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
+ }
+ }
+
+ /**
+ * Tests job submission after refresh
+ * @throws Exception
+ */
+ @Test
+ public void testSubmitJobsAfterRefresh() throws Exception {
+ // test for refresh
+ checkForConfigFile();
+ Document doc = createDocument();
+ refreshDocument(doc);
+ writeToFile(doc, CONFIG);
+ MRAdmin admin = new MRAdmin(miniMRCluster.createJobConf());
+ admin.run(new String[] { "-refreshQueues" });
+ try {
+ submitSleepJob(10, 10, 100, 100, false, "u1,g1", "p1"
+ + NAME_SEPARATOR + "p11", conf);
+ fail("user u1 is not in the submit jobs' list");
+ } catch (Exception e) {
+ }
+ checkForConfigFile();
+ doc = createDocument();
+ createSimpleDocumentWithAcls(doc, "true");
+ writeToFile(doc, CONFIG);
+ admin.run(new String[] { "-refreshQueues" });
+ }
+
+ private void refreshDocument(Document doc) {
+ Element queues = createQueuesNode(doc, "true");
+
+ // Create parent level queue q1.
+ Element q1 = createQueue(doc, "q1");
+ Properties props = new Properties();
+ props.setProperty("capacity", "10");
+ props.setProperty("maxCapacity", "35");
+ q1.appendChild(createProperties(doc, props));
+ queues.appendChild(q1);
+
+ // Create another parent level p1
+ Element p1 = createQueue(doc, "p1");
+
+ // append child p11 to p1
+ Element p11 = createQueue(doc, "p11");
+ p11.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, " "));
+ p11.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "u2"));
+ p1.appendChild(p11);
+
+ Element p12 = createQueue(doc, "p12");
+
+ p12.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+ p12.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_SUBMIT_JOB_TAG, "*"));
+ p12.appendChild(createAcls(doc,
+ QueueConfigurationParser.ACL_ADMINISTER_JOB_TAG, "*"));
+
+ // append p12 to p1.
+ p1.appendChild(p12);
+ // append child p13 to p1
+ Element p13 = createQueue(doc, "p13");
+ p13.appendChild(createState(doc, QueueState.RUNNING.getStateName()));
+ p1.appendChild(p13);
+ // append child p14 to p1
+ Element p14 = createQueue(doc, "p14");
+ p14.appendChild(createState(doc, QueueState.STOPPED.getStateName()));
+ p1.appendChild(p14);
+ queues.appendChild(p1);
+ }
+
+ /**
+ * Tests job submission when acls are disabled
+ * @throws Exception
+ */
+ @Test
+ public void testAclsDisabled() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocumentWithAcls(doc, "false");
+ writeToFile(doc, CONFIG);
+ MRAdmin admin = new MRAdmin(miniMRCluster.createJobConf());
+ admin.run(new String[] { "-refreshQueues" });
+
+ UserGroupInformation.setCurrentUGI(UnixUserGroupInformation.login());
+ // submit job to queue p1:p11 by any user not in acls-submit-job
+ Job job = submitSleepJob(0, 0, 0, 0, true, "u2,g1", "p1" + NAME_SEPARATOR
+ + "p11", conf);
+ assertTrue("Job submitted for u2 in queue p1:p11 is not successful.",
+ job.isSuccessful());
+
+ // submit job to queue p1:p11 by user in acls-submit-job
+ job = submitSleepJob(0, 0, 0, 0, true, "u1,g1", "p1" + NAME_SEPARATOR
+ + "p11", conf);
+ assertTrue("Job submitted for u2 in queue p1:p11 is not successful.",
+ job.isSuccessful());
+
+ job = submitSleepJob(1, 1, 0, 0, false, "u1,g1", "p1" + NAME_SEPARATOR
+ + "p11", conf);
+ // kill the job by any user
+ JobConf jobConf = miniMRCluster.createJobConf();
+ jobConf.set("hadoop.job.ugi", "u3,g3");
+ Cluster cluster = new Cluster(jobConf);
+ JobID jobID = job.getStatus().getJobID();
+ //Ensure that the jobinprogress is initied before we issue a kill
+ //signal to the job.
+ JobInProgress jip = miniMRCluster.getJobTrackerRunner().getJobTracker()
+ .getJob(org.apache.hadoop.mapred.JobID.downgrade(jobID));
+ miniMRCluster.getJobTrackerRunner().getJobTracker().initJob(jip);
+ cluster.getJob(jobID).killJob();
+ assertEquals("job submitted for u1 and queue p1:p11 is not killed.",
+ cluster.getJob(jobID).getStatus().getState(), (State.KILLED));
+ }
+}