You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:19:16 UTC

svn commit: r1077479 - in /hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred: TestQueueManager.java TestQueueManagerForJobKillAndJobPriority.java TestQueueManagerForJobKillAndNonDefaultQueue.java

Author: omalley
Date: Fri Mar  4 04:19:16 2011
New Revision: 1077479

URL: http://svn.apache.org/viewvc?rev=1077479&view=rev
Log:
commit 5e5a0404458032decbedc973230431f22419b9ae
Author: Arun C Murthy <ac...@apache.org>
Date:   Thu May 20 23:01:43 2010 -0700

    MAPREDUCE-1807. Re-factor TestQueueManager. Contributed by Richard King.
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1807. Re-factor TestQueueManager. (Richard King via acmurthy)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManagerForJobKillAndJobPriority.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManagerForJobKillAndNonDefaultQueue.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=1077479&r1=1077478&r2=1077479&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java Fri Mar  4 04:19:16 2011
@@ -18,12 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.PrintWriter;
 import java.security.PrivilegedExceptionAction;
-import java.util.Properties;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -33,7 +29,6 @@ import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -48,8 +43,8 @@ public class TestQueueManager extends Te
   String submitAcl = QueueACL.SUBMIT_JOB.getAclName();
   String adminAcl  = QueueACL.ADMINISTER_JOBS.getAclName();
 
-  private MiniDFSCluster miniDFSCluster;
-  private MiniMRCluster miniMRCluster;
+  MiniDFSCluster miniDFSCluster;
+  MiniMRCluster miniMRCluster = null;
   
   /**
    * For some tests it is necessary to sandbox them in a doAs with a fake user
@@ -57,7 +52,7 @@ public class TestQueueManager extends Te
    * necessary to then add the real user running the test to the fake users
    * so that child processes can write to the DFS.
    */
-  private UserGroupInformation createNecessaryUsers() throws IOException {
+  UserGroupInformation createNecessaryUsers() throws IOException {
     // Add real user to fake groups mapping so that child processes (tasks)
     // will have permissions on the dfs
     String j = UserGroupInformation.getCurrentUser().getShortUserName();
@@ -102,419 +97,102 @@ public class TestQueueManager extends Te
   
   public void testAllEnabledACLForJobSubmission() 
   throws IOException, InterruptedException {
-    JobConf conf = setupConf(QueueManager.toFullPropertyName(
-        "default", submitAcl), "*");
-    UserGroupInformation ugi = createNecessaryUsers();
-    String[] groups = ugi.getGroupNames();
-    verifyJobSubmissionToDefaultQueue(conf, true,
-        ugi.getShortUserName() + "," + groups[groups.length-1]);
+    try {
+      JobConf conf = setupConf(QueueManager.toFullPropertyName(
+                                                               "default", submitAcl), "*");
+      UserGroupInformation ugi = createNecessaryUsers();
+      String[] groups = ugi.getGroupNames();
+      verifyJobSubmissionToDefaultQueue(conf, true,
+                                        ugi.getShortUserName() + "," + groups[groups.length-1]);
+    } finally {
+      tearDownCluster();
+    }
   }
   
   public void testAllDisabledACLForJobSubmission() 
   throws IOException, InterruptedException {
-    createNecessaryUsers();
-    JobConf conf = setupConf(QueueManager.toFullPropertyName(
-        "default", submitAcl), " ");
-    String userName = "user1";
-    String groupName = "group1";
-    verifyJobSubmissionToDefaultQueue(conf, false, userName + "," + groupName);
+    try {
+      createNecessaryUsers();
+      JobConf conf = setupConf(QueueManager.toFullPropertyName(
+                                                               "default", submitAcl), " ");
+      String userName = "user1";
+      String groupName = "group1";
+      verifyJobSubmissionToDefaultQueue(conf, false, userName + "," + groupName);
     
-    // Check if admins can submit job
-    String user2 = "user2";
-    String group2 = "group2";
-    conf.set(JobConf.MR_ADMINS, user2 + " " + groupName);
-    verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
-    verifyJobSubmissionToDefaultQueue(conf, true, user2 + "," + group2);
+      // Check if admins can submit job
+      String user2 = "user2";
+      String group2 = "group2";
+      conf.set(JobConf.MR_ADMINS, user2 + " " + groupName);
+      tearDownCluster();
+      verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
+      verifyJobSubmissionToDefaultQueue(conf, true, user2 + "," + group2);
     
-    // Check if MROwner(user who started the mapreduce cluster) can submit job
-    UserGroupInformation mrOwner = UserGroupInformation.getCurrentUser();
-    userName = mrOwner.getShortUserName();
-    String[] groups = mrOwner.getGroupNames();
-    groupName = groups[groups.length - 1];
-    verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
+      // Check if MROwner(user who started the mapreduce cluster) can submit job
+      UserGroupInformation mrOwner = UserGroupInformation.getCurrentUser();
+      userName = mrOwner.getShortUserName();
+      String[] groups = mrOwner.getGroupNames();
+      groupName = groups[groups.length - 1];
+      verifyJobSubmissionToDefaultQueue(conf, true, userName + "," + groupName);
+    } finally {
+      tearDownCluster();
+    }
   }
   
   public void testUserDisabledACLForJobSubmission() 
   throws IOException, InterruptedException {
-    JobConf conf = setupConf(QueueManager.toFullPropertyName(
-        "default", submitAcl), "3698-non-existent-user");
-    verifyJobSubmissionToDefaultQueue(conf, false, "user1,group1");
-  }
-  
-  public void testDisabledACLForNonDefaultQueue() 
-  throws IOException, InterruptedException {
-    // allow everyone in default queue
-    JobConf conf = setupConf(QueueManager.toFullPropertyName(
-        "default", submitAcl), "*");
-    // setup a different queue
-    conf.set("mapred.queue.names", "default,q1");
-    // setup a different acl for this queue.
-    conf.set(QueueManager.toFullPropertyName(
-        "q1", submitAcl), "dummy-user");
-    // verify job submission to other queue fails.
-    verifyJobSubmission(conf, false, "user1,group1", "q1");
+    try {
+      JobConf conf = setupConf(QueueManager.toFullPropertyName(
+                                                               "default", submitAcl), "3698-non-existent-user");
+      verifyJobSubmissionToDefaultQueue(conf, false, "user1,group1");
+    } finally {
+      tearDownCluster();
+    }
   }
 
   public void testSubmissionToInvalidQueue() 
   throws IOException, InterruptedException{
+    try {
     JobConf conf = new JobConf();
     conf.set("mapred.queue.names","default");
     setUpCluster(conf);
     String queueName = "q1";
     try {
-      RunningJob rjob = submitSleepJob(1, 1, 100, 100, true, null, queueName);
+      submitSleepJob(1, 1, 100, 100, true, null, queueName);
     } catch (IOException ioe) {      
        assertTrue(ioe.getMessage().contains("Queue \"" + queueName + "\" does not exist"));
        return;
     } finally {
       tearDownCluster();
     }
-    fail("Job submission to invalid queue job shouldnot complete , it should fail with proper exception ");   
-  }
-
-  public void testEnabledACLForNonDefaultQueue() 
-  throws IOException, LoginException, InterruptedException {
-    UserGroupInformation ugi = createNecessaryUsers();
-    String[] groups = ugi.getGroupNames();
-    String userName = ugi.getShortUserName();
-    // allow everyone in default queue
-    JobConf conf = setupConf(QueueManager.toFullPropertyName(
-        "default", submitAcl), "*");
-    // setup a different queue
-    conf.set("mapred.queue.names", "default,q2");
-    // setup a different acl for this queue.
-    conf.set(QueueManager.toFullPropertyName(
-        "q2", submitAcl), userName);
-    // verify job submission to other queue fails.
-    verifyJobSubmission(conf, true,
-        userName + "," + groups[groups.length-1], "q2");
+    fail("Job submission to invalid queue job shouldnot complete , it should fail with proper exception ");  
+    } finally {
+      tearDownCluster();
+    } 
   }
 
   public void testUserEnabledACLForJobSubmission() 
   throws IOException, LoginException, InterruptedException {
-    String userName = "user1";
-    JobConf conf = setupConf(QueueManager.toFullPropertyName(
-        "default", submitAcl), "3698-junk-user," + userName 
-                                    + " 3698-junk-group1,3698-junk-group2");
-    verifyJobSubmissionToDefaultQueue(conf, true, userName+",group1");
-  }
-
-  public void testGroupsEnabledACLForJobSubmission() 
-  throws IOException, LoginException, InterruptedException {
-    // login as self, get one group, and add in allowed list.
-    UserGroupInformation ugi = createNecessaryUsers();
-
-    String[] groups = ugi.getGroupNames();
-    JobConf conf = setupConf(QueueManager.toFullPropertyName(
-        "default", submitAcl), "3698-junk-user1,3698-junk-user2 " 
-                               + groups[groups.length-1] 
-                               + ",3698-junk-group");
-    verifyJobSubmissionToDefaultQueue(conf, true,
-        		ugi.getShortUserName()+","+groups[groups.length-1]);
-  }
-
-  public void testAllEnabledACLForJobKill() 
-  throws IOException, InterruptedException {
-    UserGroupInformation ugi = createNecessaryUsers();
-    // create other user who will try to kill the job of ugi.
-    final UserGroupInformation otherUGI = UserGroupInformation.
-        createUserForTesting("user1", new String [] {"group1"});
-
-    ugi.doAs(new PrivilegedExceptionAction<Object>() {
-
-      @Override
-      public Object run() throws Exception {
-        JobConf conf = setupConf(QueueManager.toFullPropertyName(
-            "default", adminAcl), "*");
-        verifyJobKill(otherUGI, conf, true);
-        return null;
-      }
-    });
-  }
-
-  public void testAllDisabledACLForJobKill() 
-  throws IOException, InterruptedException {
-    // Create a fake superuser for all processes to execute within
-    final UserGroupInformation ugi = createNecessaryUsers();
-
-    // create other users who will try to kill the job of ugi.
-    final UserGroupInformation otherUGI1 = UserGroupInformation.
-        createUserForTesting("user1", new String [] {"group1"});
-    final UserGroupInformation otherUGI2 = UserGroupInformation.
-    createUserForTesting("user2", new String [] {"group2"});
-
-    ugi.doAs(new PrivilegedExceptionAction<Object>() {
-
-      @Override
-      public Object run() throws Exception {
-        // No queue admins
-        JobConf conf = setupConf(QueueManager.toFullPropertyName(
-            "default", adminAcl), " ");
-        // Run job as ugi and try to kill job as user1, who (obviously)
-        // should not be able to kill the job.
-        verifyJobKill(otherUGI1, conf, false);
-        verifyJobKill(otherUGI2, conf, false);
-
-        // Check if cluster administrator can kill job
-        conf.set(JobConf.MR_ADMINS, "user1 group2");
-        verifyJobKill(otherUGI1, conf, true);
-        verifyJobKill(otherUGI2, conf, true);
-        
-        // Check if MROwner(user who started the mapreduce cluster) can kill job
-        verifyJobKill(ugi, conf, true);
-
-        return null;
-      }
-    });
-  }
-  
-  public void testOwnerAllowedForJobKill() 
-  throws IOException, InterruptedException {
-    final UserGroupInformation ugi = createNecessaryUsers();
-    
-    ugi.doAs(new PrivilegedExceptionAction<Object>() {
-
-      @Override
-      public Object run() throws Exception {
-
-        JobConf conf = setupConf(QueueManager.toFullPropertyName(
-            "default", adminAcl), "junk-user");
-        verifyJobKill(ugi, conf, true);
-        return null;
-      }
-    });
-  }
-  
-  public void testUserDisabledACLForJobKill() 
-  throws IOException, InterruptedException {
-    UserGroupInformation ugi = createNecessaryUsers();
-    // create other user who will try to kill the job of ugi.
-    final UserGroupInformation otherUGI = UserGroupInformation.
-        createUserForTesting("user1", new String [] {"group1"});
-
-    ugi.doAs(new PrivilegedExceptionAction<Object>() {
-
-      @Override
-      public Object run() throws Exception {
-        //setup a cluster allowing a user to submit
-        JobConf conf = setupConf(QueueManager.toFullPropertyName(
-            "default", adminAcl), "dummy-user");
-        // Run job as ugi and try to kill job as user1, who (obviously)
-        // should not able to kill the job.
-        verifyJobKill(otherUGI, conf, false);
-        return null;
-      }
-    });
-  }
-  
-  public void testUserEnabledACLForJobKill() 
-  throws IOException, LoginException, InterruptedException {
-  UserGroupInformation ugi = createNecessaryUsers();
-  // create other user who will try to kill the job of ugi.
-  final UserGroupInformation otherUGI = UserGroupInformation.
-      createUserForTesting("user1", new String [] {"group1"});
-
-  ugi.doAs(new PrivilegedExceptionAction<Object>() {
-    @Override
-    public Object run() throws Exception {
-      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-      String userName = ugi.getShortUserName();
-      JobConf conf = setupConf(QueueManager.toFullPropertyName(
-          "default", adminAcl), "user1");
-      // user1 should be able to kill the job
-      verifyJobKill(otherUGI, conf, true);
-      return null;
-      }
-    });
-  }
-
-  public void testUserDisabledForJobPriorityChange() 
-  throws IOException, InterruptedException {
-    UserGroupInformation ugi = createNecessaryUsers();
-    // create other user who will try to change priority of the job of ugi.
-    final UserGroupInformation otherUGI = UserGroupInformation.
-        createUserForTesting("user1", new String [] {"group1"});
-
-    ugi.doAs(new PrivilegedExceptionAction<Object>() {
-
-      @Override
-      public Object run() throws Exception {
-
-        JobConf conf = setupConf(QueueManager.toFullPropertyName(
-            "default", adminAcl), "junk-user");
-        verifyJobPriorityChangeAsOtherUser(otherUGI, conf, false);
-        return null;
-      }
-    });
-  }
-
-  /**
-   * Test to verify refreshing of queue properties by using MRAdmin tool.
-   * 
-   * @throws Exception
-   */
-  public void testACLRefresh() throws Exception {
-    String queueConfigPath =
-        System.getProperty("test.build.extraconf", "build/test/extraconf");
-    File queueConfigFile =
-        new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
-    File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
-    try {
-      //Setting up default mapred-site.xml
-      Properties hadoopConfProps = new Properties();
-      //these properties should be retained.
-      hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
-      hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
-      //These property should always be overridden
-      hadoopConfProps.put(QueueManager.toFullPropertyName(
-          "default", submitAcl), "u1");
-      hadoopConfProps.put(QueueManager.toFullPropertyName(
-          "q1", submitAcl), "u2");
-      hadoopConfProps.put(QueueManager.toFullPropertyName(
-          "q2", submitAcl), "u1");
-      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-      
-      //Actual property which would be used.
-      Properties queueConfProps = new Properties();
-      queueConfProps.put(QueueManager.toFullPropertyName(
-          "default", submitAcl), " ");
-      //Writing out the queue configuration file.
-      UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
-      
-      //Create a new configuration to be used with QueueManager
-      JobConf conf = new JobConf();
-      QueueManager queueManager = new QueueManager(conf);
-      UserGroupInformation ugi = UserGroupInformation.
-          createUserForTesting("user1", new String [] {"group1"});
-
-      //Job Submission should fail because ugi to be used is set to blank.
-      assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
-      assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
-      assertFalse("User Job Submission Succeeded before refresh.",
-          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
-      
-      //Test job submission as alternate user.
-      UserGroupInformation alternateUgi = 
-        UserGroupInformation.createUserForTesting("u1", new String[]{"user"});
-      assertTrue("Alternate User Job Submission failed before refresh.",
-          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, alternateUgi));
-      
-      //Set acl for user1.
-      queueConfProps.put(QueueManager.toFullPropertyName(
-          "default", submitAcl), ugi.getShortUserName());
-      queueConfProps.put(QueueManager.toFullPropertyName(
-          "q1", submitAcl), ugi.getShortUserName());
-      queueConfProps.put(QueueManager.toFullPropertyName(
-          "q2", submitAcl), ugi.getShortUserName());
-      //write out queue-acls.xml.
-      UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
-      //refresh configuration
-      queueManager.refreshAcls(conf);
-      //Submission should succeed
-      assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed after refresh.",
-          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
-      assertFalse("Alternate User Job Submission succeeded after refresh.",
-          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, alternateUgi));
-      //delete the ACL file.
-      queueConfigFile.delete();
-      
-      //rewrite the mapred-site.xml
-      hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
-      hadoopConfProps.put(QueueManager.toFullPropertyName(
-          "q1", submitAcl), ugi.getShortUserName());
-      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-      queueManager.refreshAcls(conf);
-      assertTrue("User Job Submission allowed after refresh and no queue acls file.",
-          queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
-    } finally{
-      if(queueConfigFile.exists()) {
-        queueConfigFile.delete();
-      }
-      if(hadoopConfigFile.exists()) {
-        hadoopConfigFile.delete();
-      }
-    }
-  }
-
-  public void testQueueAclRefreshWithInvalidConfFile() throws IOException {
-    String queueConfigPath =
-      System.getProperty("test.build.extraconf", "build/test/extraconf");
-    File queueConfigFile =
-      new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
-    File hadoopConfigFile = new File(queueConfigPath, "hadoop-site.xml");
-    try {
-      // queue properties with which the cluster is started.
-      Properties hadoopConfProps = new Properties();
-      hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
-      hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
-      UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-      
-      //properties for mapred-queue-acls.xml
-      Properties queueConfProps = new Properties();
-      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-      queueConfProps.put(QueueManager.toFullPropertyName(
-          "default", submitAcl), ugi.getShortUserName());
-      queueConfProps.put(QueueManager.toFullPropertyName(
-          "q1", submitAcl), ugi.getShortUserName());
-      queueConfProps.put(QueueManager.toFullPropertyName(
-          "q2", submitAcl), ugi.getShortUserName());
-      UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
-      
-      Configuration conf = new JobConf();
-      QueueManager queueManager = new QueueManager(conf);
-      //Testing access to queue.
-      assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed.",
-          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
-      
-      //Write out a new incomplete invalid configuration file.
-      PrintWriter writer = new PrintWriter(new FileOutputStream(queueConfigFile));
-      writer.println("<configuration>");
-      writer.println("<property>");
-      writer.flush();
-      writer.close();
-      try {
-        //Exception to be thrown by queue manager because configuration passed
-        //is invalid.
-        queueManager.refreshAcls(conf);
-        fail("Refresh of ACLs should have failed with invalid conf file.");
-      } catch (Exception e) {
-      }
-      assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
-      assertTrue("User Job Submission failed after invalid conf file refresh.",
-          queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
-    } finally {
-      //Cleanup the configuration files in all cases
-      if(hadoopConfigFile.exists()) {
-        hadoopConfigFile.delete();
-      }
-      if(queueConfigFile.exists()) {
-        queueConfigFile.delete();
-      }
+    try {
+      String userName = "user1";
+      JobConf conf
+        = setupConf(QueueManager.toFullPropertyName
+                    ("default", submitAcl), "3698-junk-user," + userName 
+                    + " 3698-junk-group1,3698-junk-group2");
+      verifyJobSubmissionToDefaultQueue(conf, true, userName+",group1");
+    } finally {
+      tearDownCluster();
     }
   }
   
   
-  private JobConf setupConf(String aclName, String aclValue) {
+  JobConf setupConf(String aclName, String aclValue) {
     JobConf conf = new JobConf();
     conf.setBoolean(JobConf.MR_ACLS_ENABLED, true);
     conf.set(aclName, aclValue);
     return conf;
   }
   
-  private void verifyQueues(Set<String> expectedQueues, 
+  void verifyQueues(Set<String> expectedQueues, 
                                           Set<String> actualQueues) {
     assertEquals(expectedQueues.size(), actualQueues.size());
     for (String queue : expectedQueues) {
@@ -525,7 +203,7 @@ public class TestQueueManager extends Te
   /**
    *  Verify job submission as given user to the default queue
    */
-  private void verifyJobSubmissionToDefaultQueue(JobConf conf, boolean shouldSucceed,
+  void verifyJobSubmissionToDefaultQueue(JobConf conf, boolean shouldSucceed,
 		  String userInfo) throws IOException, InterruptedException {
     verifyJobSubmission(conf, shouldSucceed, userInfo, "default");
   }
@@ -533,20 +211,20 @@ public class TestQueueManager extends Te
   /**
    * Verify job submission as given user to the given queue
    */
-  private void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 
+  void verifyJobSubmission(JobConf conf, boolean shouldSucceed, 
       String userInfo, String queue) throws IOException, InterruptedException {
     setUpCluster(conf);
     try {
       runAndVerifySubmission(conf, shouldSucceed, queue, userInfo);
     } finally {
-      tearDownCluster();
+      // tearDownCluster();
     }
   }
 
   /**
    * Verify if submission of job to the given queue will succeed or not
    */
-  private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
+  void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
       String queue, String userInfo)
       throws IOException, InterruptedException {
     try {
@@ -576,9 +254,9 @@ public class TestQueueManager extends Te
         }
       }
     } finally {
-      tearDownCluster();
+      // tearDownCluster();
     }
-}
+  }
 
   /**
    * Submit job as current user and kill the job as user of ugi.
@@ -588,7 +266,7 @@ public class TestQueueManager extends Te
    * @throws IOException
    * @throws InterruptedException
    */
-  private void verifyJobKill(UserGroupInformation ugi, JobConf conf,
+  void verifyJobKill(UserGroupInformation ugi, JobConf conf,
 		  boolean shouldSucceed) throws IOException, InterruptedException {
     setUpCluster(conf);
     try {
@@ -636,12 +314,12 @@ public class TestQueueManager extends Te
             contains(" cannot perform operation KILL_JOB on "));
       }
     } finally {
-      tearDownCluster();
+      // tearDownCluster();
     }
   }
 
   
-  private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
+  void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
                                         String otherUserInfo) 
                         throws IOException, InterruptedException {
     setUpCluster(conf);
@@ -679,7 +357,7 @@ public class TestQueueManager extends Te
         }
       }
     } finally {
-      tearDownCluster();
+      // tearDownCluster();
     }
   }
   
@@ -692,7 +370,7 @@ public class TestQueueManager extends Te
    * @throws IOException
    * @throws InterruptedException
    */
-  private void verifyJobPriorityChangeAsOtherUser(UserGroupInformation otherUGI,
+  void verifyJobPriorityChangeAsOtherUser(UserGroupInformation otherUGI,
       JobConf conf, final boolean shouldSucceed)
       throws IOException, InterruptedException {
     setUpCluster(conf);
@@ -737,28 +415,42 @@ public class TestQueueManager extends Te
         }
       }
     } finally {
-      tearDownCluster();
+      // tearDownCluster();
     }
   }
   
-  private void setUpCluster(JobConf conf) throws IOException {
-    miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
-    FileSystem fileSys = miniDFSCluster.getFileSystem();
-    TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys, "/user");
-    TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys,
-        conf.get("mapreduce.jobtracker.staging.root.dir",
-            "/tmp/hadoop/mapred/staging"));
-    String namenode = fileSys.getUri().toString();
-    miniMRCluster = new MiniMRCluster(1, namenode, 3, 
-                      null, null, conf);
-  }
-  
-  private void tearDownCluster() throws IOException {
-    if (miniMRCluster != null) { miniMRCluster.shutdown(); }
-    if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
+  void setUpCluster(JobConf conf) throws IOException {
+    if(miniMRCluster == null) {
+      miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
+      FileSystem fileSys = miniDFSCluster.getFileSystem();
+      TestMiniMRWithDFSWithDistinctUsers.mkdir(fileSys, "/user");
+      TestMiniMRWithDFSWithDistinctUsers.mkdir
+        (fileSys, conf.get("mapreduce.jobtracker.staging.root.dir",
+                           "/tmp/hadoop/mapred/staging"));
+      String namenode = fileSys.getUri().toString();
+      miniMRCluster = new MiniMRCluster(1, namenode, 3, 
+                                        null, null, conf);
+    }
   }
   
-  private RunningJob submitSleepJob(int numMappers, int numReducers, 
+  void tearDownCluster() throws IOException {
+    if (miniMRCluster != null) {
+      long mrTeardownStart = new java.util.Date().getTime();
+      if (miniMRCluster != null) { miniMRCluster.shutdown(); }
+      long mrTeardownEnd = new java.util.Date().getTime();
+      if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
+      long dfsTeardownEnd = new java.util.Date().getTime();
+      miniMRCluster = null;
+      miniDFSCluster = null;
+      System.err.println("An MR teardown took "
+                         + (mrTeardownEnd - mrTeardownStart)
+                         + " milliseconds.  A DFS teardown took "
+                         + ( dfsTeardownEnd - mrTeardownEnd )
+                         + " milliseconds.");
+    }
+  }
+
+  RunningJob submitSleepJob(int numMappers, int numReducers, 
                             long mapSleepTime, long reduceSleepTime,
                             boolean shouldComplete) 
                               throws IOException, InterruptedException {
@@ -766,7 +458,7 @@ public class TestQueueManager extends Te
                           reduceSleepTime, shouldComplete, null);
   }
   
-  private RunningJob submitSleepJob(int numMappers, int numReducers, 
+  RunningJob submitSleepJob(int numMappers, int numReducers, 
                                       long mapSleepTime, long reduceSleepTime,
                                       boolean shouldComplete, String userInfo) 
                                      throws IOException, InterruptedException {
@@ -774,7 +466,7 @@ public class TestQueueManager extends Te
                           reduceSleepTime, shouldComplete, userInfo, null);
   }
 
-  private RunningJob submitSleepJob(final int numMappers, final int numReducers, 
+  RunningJob submitSleepJob(final int numMappers, final int numReducers, 
       final long mapSleepTime,
       final long reduceSleepTime, final boolean shouldComplete, String userInfo,
                                     String queueName) 

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManagerForJobKillAndJobPriority.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManagerForJobKillAndJobPriority.java?rev=1077479&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManagerForJobKillAndJobPriority.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManagerForJobKillAndJobPriority.java Fri Mar  4 04:19:16 2011
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.QueueManager.QueueACL;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class TestQueueManagerForJobKillAndJobPriority extends TestQueueManager {
+
+  public void testOwnerAllowedForJobKill() 
+  throws IOException, InterruptedException {
+    try {
+      final UserGroupInformation ugi = createNecessaryUsers();
+    
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+
+                 @Override
+                   public Object run() throws Exception {
+
+                   JobConf conf
+                     = setupConf(QueueManager.toFullPropertyName
+                                 ("default", adminAcl), "junk-user");
+                   verifyJobKill(ugi, conf, true);
+                   return null;
+                 }
+               });
+    } finally {
+      tearDownCluster();
+    }
+  }
+  
+  public void testUserDisabledACLForJobKill() 
+  throws IOException, InterruptedException {
+    try {
+      UserGroupInformation ugi = createNecessaryUsers();
+      // create other user who will try to kill the job of ugi.
+      final UserGroupInformation otherUGI = UserGroupInformation.
+        createUserForTesting("user1", new String [] {"group1"});
+
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+
+                 @Override
+                   public Object run() throws Exception {
+                   //setup a cluster allowing a user to submit
+                   JobConf conf
+                     = setupConf(QueueManager.toFullPropertyName
+                                 ("default", adminAcl), "dummy-user");
+                   // Run job as ugi and try to kill job as user1, who (obviously)
+                   // should not able to kill the job.
+                   verifyJobKill(otherUGI, conf, false);
+                   return null;
+                 }
+               });
+    } finally {
+      tearDownCluster();
+    }
+  }
+  
+  public void testUserEnabledACLForJobKill() 
+  throws IOException, LoginException, InterruptedException {
+    try {
+      UserGroupInformation ugi = createNecessaryUsers();
+      // create other user who will try to kill the job of ugi.
+      final UserGroupInformation otherUGI = UserGroupInformation.
+        createUserForTesting("user1", new String [] {"group1"});
+
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+                 @Override
+                   public Object run() throws Exception {
+                   UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+                   JobConf conf
+                     = setupConf(QueueManager.toFullPropertyName
+                                 ("default", adminAcl), "user1");
+                   // user1 should be able to kill the job
+                   verifyJobKill(otherUGI, conf, true);
+                   return null;
+                 }
+               });
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  public void testUserDisabledForJobPriorityChange() 
+  throws IOException, InterruptedException {
+    try {
+      UserGroupInformation ugi = createNecessaryUsers();
+      // create other user who will try to change priority of the job of ugi.
+      final UserGroupInformation otherUGI = UserGroupInformation.
+        createUserForTesting("user1", new String [] {"group1"});
+
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+
+                 @Override
+                   public Object run() throws Exception {
+
+                   JobConf conf
+                     = setupConf(QueueManager.toFullPropertyName
+                                 ("default", adminAcl), "junk-user");
+                   verifyJobPriorityChangeAsOtherUser(otherUGI, conf, false);
+                   return null;
+                 }
+               });
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  /**
+   * Test to verify refreshing of queue properties by using MRAdmin tool.
+   * 
+   * @throws Exception
+   */
+  public void testACLRefresh() throws Exception {
+    try {
+      String queueConfigPath =
+        System.getProperty("test.build.extraconf", "build/test/extraconf");
+      File queueConfigFile =
+        new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
+      File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
+      try {
+        //Setting up default mapred-site.xml
+        Properties hadoopConfProps = new Properties();
+        //these properties should be retained.
+        hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
+        hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
+        //These property should always be overridden
+        hadoopConfProps.put(QueueManager.toFullPropertyName
+                            ("default", submitAcl), "u1");
+        hadoopConfProps.put(QueueManager.toFullPropertyName
+                            ("q1", submitAcl), "u2");
+        hadoopConfProps.put(QueueManager.toFullPropertyName
+                            ("q2", submitAcl), "u1");
+        UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+      
+        //Actual property which would be used.
+        Properties queueConfProps = new Properties();
+        queueConfProps.put(QueueManager.toFullPropertyName
+                           ("default", submitAcl), " ");
+        //Writing out the queue configuration file.
+        UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
+      
+        //Create a new configuration to be used with QueueManager
+        JobConf conf = new JobConf();
+        QueueManager queueManager = new QueueManager(conf);
+        UserGroupInformation ugi = UserGroupInformation.
+          createUserForTesting("user1", new String [] {"group1"});
+
+        //Job Submission should fail because ugi to be used is set to blank.
+        assertFalse("User Job Submission Succeeded before refresh.",
+                    queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
+        assertFalse("User Job Submission Succeeded before refresh.",
+                    queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
+        assertFalse("User Job Submission Succeeded before refresh.",
+                    queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
+      
+        //Test job submission as alternate user.
+        UserGroupInformation alternateUgi = 
+          UserGroupInformation.createUserForTesting("u1", new String[]{"user"});
+        assertTrue("Alternate User Job Submission failed before refresh.",
+                   queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, alternateUgi));
+      
+        //Set acl for user1.
+        queueConfProps.put(QueueManager.toFullPropertyName
+                           ("default", submitAcl), ugi.getShortUserName());
+        queueConfProps.put(QueueManager.toFullPropertyName
+                           ("q1", submitAcl), ugi.getShortUserName());
+        queueConfProps.put(QueueManager.toFullPropertyName
+                           ("q2", submitAcl), ugi.getShortUserName());
+        //write out queue-acls.xml.
+        UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
+        //refresh configuration
+        queueManager.refreshAcls(conf);
+        //Submission should succeed
+        assertTrue("User Job Submission failed after refresh.",
+                   queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
+        assertTrue("User Job Submission failed after refresh.",
+                   queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
+        assertTrue("User Job Submission failed after refresh.",
+                   queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
+        assertFalse("Alternate User Job Submission succeeded after refresh.",
+                    queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, alternateUgi));
+        //delete the ACL file.
+        queueConfigFile.delete();
+      
+        //rewrite the mapred-site.xml
+        hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
+        hadoopConfProps.put(QueueManager.toFullPropertyName
+                            ("q1", submitAcl), ugi.getShortUserName());
+        UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+        queueManager.refreshAcls(conf);
+        assertTrue("User Job Submission allowed after refresh and no queue acls file.",
+                   queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
+      } finally{
+        if(queueConfigFile.exists()) {
+          queueConfigFile.delete();
+        }
+        if(hadoopConfigFile.exists()) {
+          hadoopConfigFile.delete();
+        }
+      }
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  public void testQueueAclRefreshWithInvalidConfFile() throws IOException {
+    try {
+      String queueConfigPath =
+        System.getProperty("test.build.extraconf", "build/test/extraconf");
+      File queueConfigFile =
+        new File(queueConfigPath, QueueManager.QUEUE_ACLS_FILE_NAME);
+      File hadoopConfigFile = new File(queueConfigPath, "hadoop-site.xml");
+      try {
+        // queue properties with which the cluster is started.
+        Properties hadoopConfProps = new Properties();
+        hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
+        hadoopConfProps.put(JobConf.MR_ACLS_ENABLED, "true");
+        UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
+      
+        //properties for mapred-queue-acls.xml
+        Properties queueConfProps = new Properties();
+        UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+        queueConfProps.put(QueueManager.toFullPropertyName
+                           ("default", submitAcl), ugi.getShortUserName());
+        queueConfProps.put(QueueManager.toFullPropertyName
+                           ("q1", submitAcl), ugi.getShortUserName());
+        queueConfProps.put(QueueManager.toFullPropertyName
+                           ("q2", submitAcl), ugi.getShortUserName());
+        UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
+      
+        Configuration conf = new JobConf();
+        QueueManager queueManager = new QueueManager(conf);
+        //Testing access to queue.
+        assertTrue("User Job Submission failed.",
+                   queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
+        assertTrue("User Job Submission failed.",
+                   queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
+        assertTrue("User Job Submission failed.",
+                   queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
+      
+        //Write out a new incomplete invalid configuration file.
+        PrintWriter writer = new PrintWriter(new FileOutputStream(queueConfigFile));
+        writer.println("<configuration>");
+        writer.println("<property>");
+        writer.flush();
+        writer.close();
+        try {
+          //Exception to be thrown by queue manager because configuration passed
+          //is invalid.
+          queueManager.refreshAcls(conf);
+          fail("Refresh of ACLs should have failed with invalid conf file.");
+        } catch (Exception e) {
+        }
+        assertTrue("User Job Submission failed after invalid conf file refresh.",
+                   queueManager.hasAccess("default", QueueACL.SUBMIT_JOB, ugi));
+        assertTrue("User Job Submission failed after invalid conf file refresh.",
+                   queueManager.hasAccess("q1", QueueACL.SUBMIT_JOB, ugi));
+        assertTrue("User Job Submission failed after invalid conf file refresh.",
+                   queueManager.hasAccess("q2", QueueACL.SUBMIT_JOB, ugi));
+      } finally {
+        //Cleanup the configuration files in all cases
+        if(hadoopConfigFile.exists()) {
+          hadoopConfigFile.delete();
+        }
+        if(queueConfigFile.exists()) {
+          queueConfigFile.delete();
+        }
+      }
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  public void testGroupsEnabledACLForJobSubmission() 
+  throws IOException, LoginException, InterruptedException {
+    try {
+      // login as self, get one group, and add in allowed list.
+      UserGroupInformation ugi = createNecessaryUsers();
+
+      String[] groups = ugi.getGroupNames();
+      JobConf conf
+        = setupConf(QueueManager.toFullPropertyName
+                    ("default", submitAcl), "3698-junk-user1,3698-junk-user2 " 
+                    + groups[groups.length-1] 
+                    + ",3698-junk-group");
+      verifyJobSubmissionToDefaultQueue
+        (conf, true, ugi.getShortUserName()+","+groups[groups.length-1]);
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+}

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManagerForJobKillAndNonDefaultQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManagerForJobKillAndNonDefaultQueue.java?rev=1077479&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManagerForJobKillAndNonDefaultQueue.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManagerForJobKillAndNonDefaultQueue.java Fri Mar  4 04:19:16 2011
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class TestQueueManagerForJobKillAndNonDefaultQueue extends TestQueueManager {
+
+  public void testDisabledACLForNonDefaultQueue() 
+  throws IOException, InterruptedException {
+    try {
+      // allow everyone in default queue
+      JobConf conf = setupConf(QueueManager.toFullPropertyName
+                               ("default", submitAcl), "*");
+      // setup a different queue
+      conf.set("mapred.queue.names", "default,q1");
+      // setup a different acl for this queue.
+      conf.set(QueueManager.toFullPropertyName
+               ("q1", submitAcl), "dummy-user");
+      // verify job submission to other queue fails.
+      verifyJobSubmission(conf, false, "user1,group1", "q1");
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  public void testEnabledACLForNonDefaultQueue() 
+  throws IOException, LoginException, InterruptedException {
+    try {
+      UserGroupInformation ugi = createNecessaryUsers();
+      String[] groups = ugi.getGroupNames();
+      String userName = ugi.getShortUserName();
+      // allow everyone in default queue
+      JobConf conf = setupConf(QueueManager.toFullPropertyName
+                               ("default", submitAcl), "*");
+      // setup a different queue
+      conf.set("mapred.queue.names", "default,q2");
+      // setup a different acl for this queue.
+      conf.set(QueueManager.toFullPropertyName
+               ("q2", submitAcl), userName);
+      // verify job submission to other queue fails.
+      verifyJobSubmission(conf, true,
+                          userName + "," + groups[groups.length-1], "q2");
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  public void testAllEnabledACLForJobKill() 
+  throws IOException, InterruptedException {
+    try {
+      UserGroupInformation ugi = createNecessaryUsers();
+      // create other user who will try to kill the job of ugi.
+      final UserGroupInformation otherUGI = UserGroupInformation.
+        createUserForTesting("user1", new String [] {"group1"});
+
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+
+                 @Override
+                   public Object run() throws Exception {
+                   JobConf conf = setupConf(QueueManager.toFullPropertyName
+                                            ("default", adminAcl), "*");
+                   verifyJobKill(otherUGI, conf, true);
+                   return null;
+                 }
+               });
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  public void testAllDisabledACLForJobKill() 
+  throws IOException, InterruptedException {
+    try {
+      // Create a fake superuser for all processes to execute within
+      final UserGroupInformation ugi = createNecessaryUsers();
+
+      // create other users who will try to kill the job of ugi.
+      final UserGroupInformation otherUGI1 = UserGroupInformation.
+        createUserForTesting("user1", new String [] {"group1"});
+      final UserGroupInformation otherUGI2 = UserGroupInformation.
+        createUserForTesting("user2", new String [] {"group2"});
+
+      ugi.doAs(new PrivilegedExceptionAction<Object>() {
+
+                 @Override
+                   public Object run() throws Exception {
+                   // No queue admins
+                   JobConf conf = setupConf(QueueManager.toFullPropertyName
+                                            ("default", adminAcl), " ");
+                   // Run job as ugi and try to kill job as user1, who
+                   // (obviously) should not be able to kill the job.
+                   verifyJobKill(otherUGI1, conf, false);
+                   verifyJobKill(otherUGI2, conf, false);
+
+                   // Check if cluster administrator can kill job
+                   conf.set(JobConf.MR_ADMINS, "user1 group2");
+                   tearDownCluster();
+                   verifyJobKill(otherUGI1, conf, true);
+                   verifyJobKill(otherUGI2, conf, true);
+        
+                   // Check if MROwner(user who started
+                   // the mapreduce cluster) can kill job
+                   verifyJobKill(ugi, conf, true);
+
+                   return null;
+                 }
+               });
+    } finally {
+      tearDownCluster();
+    }
+  }
+
+  
+}