You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [30/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233:
./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/sr...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestNodeRefresh.java Sat Nov 28 20:26:01 2009
@@ -33,6 +33,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UnixUserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation;
@@ -128,13 +129,13 @@
}
/**
- * Check default value of mapred.hosts.exclude. Also check if only
+ * Check default value of HOSTS_EXCLUDE. Also check if only
* owner/supergroup user is allowed to this command.
*/
public void testMRRefreshDefault() throws IOException {
// start a cluster with 2 hosts and no exclude-hosts file
Configuration conf = new Configuration();
- conf.set("mapred.hosts.exclude", "");
+ conf.set(JTConfig.JT_HOSTS_EXCLUDE_FILENAME, "");
startCluster(2, 1, 0, conf);
conf = mr.createJobConf(new JobConf(conf));
@@ -204,7 +205,7 @@
UnixUserGroupInformation.saveToConf(conf,
UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
// set the supergroup
- conf.set("mapred.permissions.supergroup", "abc");
+ conf.set(JTConfig.JT_SUPERGROUP, "abc");
startCluster(2, 1, 0, conf);
conf = mr.createJobConf(new JobConf(conf));
@@ -378,85 +379,4 @@
stopCluster();
}
- /**
- * Check if excluded hosts are decommissioned across restart
- */
- public void testMRExcludeHostsAcrossRestarts() throws IOException {
- // start a cluster with 2 hosts and empty exclude-hosts file
- Configuration conf = new Configuration();
- conf.setBoolean("mapred.jobtracker.restart.recover", true);
-
- File file = new File("hosts.exclude");
- file.delete();
- startCluster(1, 1, 0, conf);
- String hostToDecommission = getHostname(1);
- conf = mr.createJobConf(new JobConf(conf));
-
- // submit a job
- Path inDir = new Path("input");
- Path outDir = new Path("output");
- Path signalFilename = new Path("share");
- JobConf newConf = new JobConf(conf);
- UtilsForTests.configureWaitingJobConf(newConf, inDir, outDir, 30, 1,
- "restart-decommission", signalFilename.toString(),
- signalFilename.toString());
-
- JobClient jobClient = new JobClient(newConf);
- RunningJob job = jobClient.submitJob(newConf);
- JobID id = job.getID();
-
- // wait for 50%
- UtilsForTests.waitForJobHalfDone(job);
-
-
- // change the exclude-hosts file to include one host
- FileOutputStream out = new FileOutputStream(file);
- LOG.info("Writing excluded nodes to log file " + file.toString());
- BufferedWriter writer = null;
- try {
- writer = new BufferedWriter(new OutputStreamWriter(out));
- writer.write( hostToDecommission + "\n"); // decommission first host
- } finally {
- if (writer != null) {
- writer.close();
- }
- out.close();
- }
- file.deleteOnExit();
-
- // restart the jobtracker
- mr.stopJobTracker();
- mr.startJobTracker();
-
- // Wait for the JT to be ready
- UtilsForTests.waitForJobTracker(jobClient);
-
- jt = mr.getJobTrackerRunner().getJobTracker();
- UtilsForTests.signalTasks(dfs, dfs.getFileSystem(),
- signalFilename.toString(), signalFilename.toString(), 1);
-
- assertTrue("Decommissioning of tracker has no effect restarted job",
- jt.getJob(job.getID()).failedMapTasks > 0);
-
- // check the cluster status and tracker size
- assertEquals("Tracker is not lost upon host decommissioning",
- 0, jt.getClusterStatus(false).getTaskTrackers());
- assertEquals("Excluded node count is incorrect",
- 1, jt.getClusterStatus(false).getNumExcludedNodes());
-
- // check if the host is disallowed
- for (TaskTrackerStatus status : jt.taskTrackers()) {
- assertFalse("Tracker from decommissioned host still exist",
- status.getHost().equals(hostToDecommission));
- }
-
- // start a tracker so that the jobs move to completion
- String newTrackerHostName = getHostname(2);
- mr.startTaskTracker(newTrackerHostName, null, 2, 1);
-
- // wait for the job
- job.waitForCompletion();
-
- stopCluster();
- }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java Sat Nov 28 20:26:01 2009
@@ -27,6 +27,7 @@
import junit.framework.TestCase;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
@@ -43,14 +44,16 @@
class FakeJobInProgress extends JobInProgress {
public FakeJobInProgress(JobConf jobConf,
- FakeTaskTrackerManager taskTrackerManager) throws IOException {
- super(new JobID("test", ++jobCounter), jobConf, null);
+ FakeTaskTrackerManager taskTrackerManager,
+ JobTracker jt) throws IOException {
+ super(new JobID("test", ++jobCounter), jobConf, jt);
this.startTime = System.currentTimeMillis();
this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP,
jobConf.getUser(),
jobConf.getJobName(), "", "");
this.status.setJobPriority(JobPriority.NORMAL);
this.status.setStartTime(startTime);
+ this.jobHistory = new FakeJobHistory();
}
@Override
@@ -231,7 +234,8 @@
// will be inited first and that will hang
for (int i = 0; i < NUM_JOBS; i++) {
- jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager);
+ jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager,
+ UtilsForTests.getJobTracker());
jobs[i].getStatus().setRunState(JobStatus.PREP);
taskTrackerManager.submitJob(jobs[i]);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestQueueManager.java Sat Nov 28 20:26:01 2009
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,690 +18,564 @@
package org.apache.hadoop.mapred;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.apache.hadoop.mapred.QueueManagerTestUtils.*;
+import static org.apache.hadoop.mapred.QueueConfigurationParser.*;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.mapreduce.QueueState;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.After;
+import org.junit.Test;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+
import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.TreeSet;
-
-import javax.security.auth.login.LoginException;
+import java.util.Map.Entry;
-import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.security.UnixUserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation;
+public class TestQueueManager {
-public class TestQueueManager extends TestCase {
+ private static final Log LOG = LogFactory.getLog(
+ TestQueueManager.class);
- private static final Log LOG = LogFactory.getLog(TestQueueManager.class);
-
- private MiniDFSCluster miniDFSCluster;
- private MiniMRCluster miniMRCluster;
+ @After
+ public void tearDown() throws Exception {
+ new File(CONFIG).delete();
+ }
+
+ @Test
+ public void testDefault() throws Exception {
+ QueueManager qm = new QueueManager();
+ Queue root = qm.getRoot();
+ assertEquals(root.getChildren().size(), 1);
+ assertEquals(root.getChildren().iterator().next().getName(), "default");
+ assertFalse(qm.isAclsEnabled());
+ assertNull(root.getChildren().iterator().next().getChildren());
+ }
+
+ @Test
+ public void testXMLParsing() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+ Set<Queue> rootQueues = qm.getRoot().getChildren();
+ List<String> names = new ArrayList<String>();
+ for (Queue q : rootQueues) {
+ names.add(q.getName());
+ }
+
+ //Size of root.
+ assertEquals(rootQueues.size(), 2);
+
+ //check root level queues
+ assertTrue(names.contains("q1"));
+ assertTrue(names.contains("p1"));
+
+
+ //check for leaf names
+ Set<String> leafNames = qm.getLeafQueueNames();
+ Queue p = qm.getQueue("p1");
+ Set<Queue> children = p.getChildren();
+ assertTrue(children.size() == 2);
+
+ //check leaf level queues
+ assertTrue(
+ leafNames.contains(
+ "p1" + NAME_SEPARATOR + "p11"));
+ assertTrue(
+ leafNames.contains(
+ "p1" + NAME_SEPARATOR + "p12"));
+
+
+ Queue q = qm.getQueue(
+ "p1" + NAME_SEPARATOR + "p12");
+
+ assertTrue(
+ q.getAcls().get(
+ QueueManager.toFullPropertyName(
+ q.getName(), ACL_SUBMIT_JOB_TAG)).getUsers().contains(
+ "u1"));
+
+ assertTrue(
+ q.getAcls().get(
+ QueueManager.toFullPropertyName(
+ q.getName(),
+ ACL_ADMINISTER_JOB_TAG))
+ .getUsers().contains("u2"));
+ assertTrue(q.getState().equals(QueueState.STOPPED));
+ }
+
+ @Test
+ public void testhasAccess() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocumentWithAcls(doc,"true");
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+
+ UserGroupInformation ugi;
+ // test for acls access when acls are set with *
+ ugi = new UnixUserGroupInformation("u1", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
+ Queue.QueueOperation.SUBMIT_JOB, ugi));
+ ugi = new UnixUserGroupInformation("u2", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p12",
+ Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+
+ // test for acls access when acls are not set with *
+ ugi = new UnixUserGroupInformation("u1", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
+ Queue.QueueOperation.SUBMIT_JOB, ugi));
+ ugi = new UnixUserGroupInformation("u2", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p11",
+ Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+
+ // test for acls access when acls are not specified but acls is enabled
+ ugi = new UnixUserGroupInformation("u1", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+ Queue.QueueOperation.SUBMIT_JOB, ugi));
+ ugi = new UnixUserGroupInformation("u2", new String[]{" "});
+ assertTrue(qm.hasAccess("p1" + NAME_SEPARATOR + "p13",
+ Queue.QueueOperation.ADMINISTER_JOBS, ugi));
+
+ assertTrue(qm.isRunning("p1" + NAME_SEPARATOR + "p13"));
+ }
+
+ @Test
+ public void testQueueView() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+
+ for (Queue queue : qm.getRoot().getChildren()) {
+ checkHierarchy(queue, qm);
+ }
+ }
+
+ private void checkHierarchy(Queue queue, QueueManager queueManager) {
+ JobQueueInfo jobQueueInfo = queueManager.getJobQueueInfo(queue.getName());
+ assertEquals(queue.getName(),jobQueueInfo.getQueueName());
+ assertEquals(queue.getState(),jobQueueInfo.getState());
+ if (queue.getChildren() !=null && queue.getChildren().size() > 0) {
+ for (Queue childQueue : queue.getChildren()) {
+ checkHierarchy(childQueue, queueManager);
+ }
+ }
+ }
+
+ @Test
+ public void testhasAccessForParent() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+
+ UserGroupInformation ugi =
+ new UnixUserGroupInformation("u1", new String[]{" "});
+ assertFalse(
+ qm.hasAccess(
+ "p1",
+ Queue.QueueOperation.SUBMIT_JOB, ugi));
+ }
+
+ @Test
+ public void testValidation() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ Element queues = createQueuesNode(doc, "false");
+ Element q1 = createQueue(doc, "q1");
+
+ q1.appendChild(createAcls(doc, "acl-submit-job", "u1"));
+ q1.appendChild(createAcls(doc, "acl-administer-jobs", "u2"));
+ q1.appendChild(createQueue(doc, "p15"));
+ q1.appendChild(createQueue(doc, "p16"));
- public void testDefaultQueueConfiguration() {
- JobConf conf = new JobConf();
- QueueManager qMgr = new QueueManager(conf);
- Set<String> expQueues = new TreeSet<String>();
- expQueues.add("default");
- verifyQueues(expQueues, qMgr.getQueues());
- // pass true so it will fail if the key is not found.
- assertFalse(conf.getBoolean("mapred.acls.enabled", true));
- }
-
- public void testMultipleQueues() {
- JobConf conf = new JobConf();
- conf.set("mapred.queue.names", "q1,q2,Q3");
- QueueManager qMgr = new QueueManager(conf);
- Set<String> expQueues = new TreeSet<String>();
- expQueues.add("q1");
- expQueues.add("q2");
- expQueues.add("Q3");
- verifyQueues(expQueues, qMgr.getQueues());
- }
-
- public void testSchedulerInfo() {
- JobConf conf = new JobConf();
- conf.set("mapred.queue.names", "qq1,qq2");
- QueueManager qMgr = new QueueManager(conf);
- qMgr.setSchedulerInfo("qq1", "queueInfoForqq1");
- qMgr.setSchedulerInfo("qq2", "queueInfoForqq2");
- assertEquals(qMgr.getSchedulerInfo("qq2"), "queueInfoForqq2");
- assertEquals(qMgr.getSchedulerInfo("qq1"), "queueInfoForqq1");
- }
-
- public void testAllEnabledACLForJobSubmission()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
- verifyJobSubmission(conf, true);
- }
-
- public void testAllDisabledACLForJobSubmission()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "");
- verifyJobSubmission(conf, false);
- }
-
- public void testUserDisabledACLForJobSubmission()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
- "3698-non-existent-user");
- verifyJobSubmission(conf, false);
- }
-
- public void testDisabledACLForNonDefaultQueue()
- throws IOException, InterruptedException, ClassNotFoundException {
- // allow everyone in default queue
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
- // setup a different queue
- conf.set("mapred.queue.names", "default,q1");
- // setup a different acl for this queue.
- conf.set("mapred.queue.q1.acl-submit-job", "dummy-user");
- // verify job submission to other queue fails.
- verifyJobSubmission(conf, false, "q1");
- }
-
- public void testSubmissionToInvalidQueue()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = new JobConf();
- conf.set("mapred.queue.names","default");
- setUpCluster(conf);
- String queueName = "q1";
+ queues.appendChild(q1);
+ writeToFile(doc, CONFIG);
try {
- Job rjob = submitSleepJob(1, 1, 100, 100, true, null, queueName);
- } catch (IOException ioe) {
- assertTrue(ioe.getMessage().contains("Queue \"" + queueName + "\" does not exist"));
- return;
- } finally {
- tearDownCluster();
+ new QueueManager(CONFIG);
+ fail("Should throw an exception as configuration is wrong ");
+ } catch (RuntimeException re) {
+ LOG.info(re.getMessage());
}
- fail("Job submission to invalid queue job shouldnot complete , it should fail with proper exception ");
- }
-
- public void testEnabledACLForNonDefaultQueue() throws IOException,
- LoginException, InterruptedException, ClassNotFoundException {
- // login as self...
- UserGroupInformation ugi = UnixUserGroupInformation.login();
- String userName = ugi.getUserName();
- // allow everyone in default queue
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job", "*");
- // setup a different queue
- conf.set("mapred.queue.names", "default,q2");
- // setup a different acl for this queue.
- conf.set("mapred.queue.q2.acl-submit-job", userName);
- // verify job submission to other queue fails.
- verifyJobSubmission(conf, true, "q2");
- }
-
- public void testUserEnabledACLForJobSubmission()
- throws IOException, LoginException,
- InterruptedException, ClassNotFoundException {
- // login as self...
- UserGroupInformation ugi = UnixUserGroupInformation.login();
- String userName = ugi.getUserName();
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
- "3698-junk-user," + userName
- + " 3698-junk-group1,3698-junk-group2");
- verifyJobSubmission(conf, true);
- }
-
- public void testGroupsEnabledACLForJobSubmission()
- throws IOException, LoginException,
- InterruptedException, ClassNotFoundException {
- // login as self, get one group, and add in allowed list.
- UserGroupInformation ugi = UnixUserGroupInformation.login();
- String[] groups = ugi.getGroupNames();
- assertTrue(groups.length > 0);
- JobConf conf = setupConf("mapred.queue.default.acl-submit-job",
- "3698-junk-user1,3698-junk-user2 "
- + groups[groups.length-1]
- + ",3698-junk-group");
- verifyJobSubmission(conf, true);
- }
-
- public void testAllEnabledACLForJobKill()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "*");
- verifyJobKill(conf, true);
}
- public void testAllDisabledACLForJobKill()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs", "");
- verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
- }
-
- public void testOwnerAllowedForJobKill()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
- "junk-user");
- verifyJobKill(conf, true);
- }
-
- public void testUserDisabledACLForJobKill()
- throws IOException, InterruptedException, ClassNotFoundException {
- //setup a cluster allowing a user to submit
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
- "dummy-user");
- verifyJobKillAsOtherUser(conf, false, "dummy-user,dummy-user-group");
+ @Test
+ public void testInvalidName() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ Element queues = createQueuesNode(doc, "false");
+ Element q1 = createQueue(doc, "");
+ queues.appendChild(q1);
+ writeToFile(doc, CONFIG);
+ try {
+ new QueueManager(CONFIG);
+ fail("Should throw an exception as configuration is wrong ");
+ } catch (Exception re) {
+ re.printStackTrace();
+ LOG.info(re.getMessage());
+ }
+ checkForConfigFile();
+ doc = createDocument();
+ queues = createQueuesNode(doc, "false");
+ q1 = doc.createElement("queue");
+ queues.appendChild(q1);
+ writeToFile(doc, CONFIG);
+ try {
+ new QueueManager(CONFIG);
+ fail("Should throw an exception as configuration is wrong ");
+ } catch (RuntimeException re) {
+ re.printStackTrace();
+ LOG.info(re.getMessage());
+ }
}
-
- public void testUserEnabledACLForJobKill() throws IOException,
- LoginException, InterruptedException, ClassNotFoundException {
- // login as self...
- UserGroupInformation ugi = UnixUserGroupInformation.login();
- String userName = ugi.getUserName();
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
- "dummy-user,"+userName);
- verifyJobKillAsOtherUser(conf, true, "dummy-user,dummy-user-group");
+
+ @Test
+ public void testEmptyProperties() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ Element queues = createQueuesNode(doc, "false");
+ Element q1 = createQueue(doc, "q1");
+ Element p = createProperties(doc, null);
+ q1.appendChild(p);
+ queues.appendChild(q1);
+ }
+
+ @Test
+ public void testEmptyFile() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ writeToFile(doc, CONFIG);
+ try {
+ new QueueManager(CONFIG);
+ fail("Should throw an exception as configuration is wrong ");
+ } catch (Exception re) {
+ re.printStackTrace();
+ LOG.info(re.getMessage());
+ }
}
-
- public void testUserDisabledForJobPriorityChange()
- throws IOException, InterruptedException, ClassNotFoundException {
- JobConf conf = setupConf("mapred.queue.default.acl-administer-jobs",
- "junk-user");
- verifyJobPriorityChangeAsOtherUser(conf, false,
- "junk-user,junk-user-group");
+
+ @Test
+ public void testJobQueueInfoGeneration() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+
+ List<JobQueueInfo> rootQueues =
+ qm.getRoot().getJobQueueInfo().getChildren();
+ assertEquals(rootQueues.size(), 2);
+ List<String> names = new ArrayList<String>();
+ for (JobQueueInfo q : rootQueues) {
+ names.add(q.getQueueName());
+ if (q.getQueueName().equals("q1")) {
+ Properties p = q.getProperties();
+ assertEquals(p.getProperty("capacity"), "10");
+ assertEquals(p.getProperty("maxCapacity"), "35");
+
+ assertTrue(q.getChildren().isEmpty());
+ } else if (q.getQueueName().equals("p1")) {
+ List<JobQueueInfo> children = q.getChildren();
+ assertEquals(children.size(), 2);
+ for (JobQueueInfo child : children) {
+ if (child.getQueueName().equals(
+ "p1" + NAME_SEPARATOR + "p12")) {
+ assertEquals(
+ child.getQueueState(), QueueState.STOPPED.getStateName());
+ } else if (child.getQueueName().equals(
+ "p1" + NAME_SEPARATOR + "p11")) {
+ assertEquals(
+ child.getQueueState(), QueueState.RUNNING.getStateName());
+ } else {
+ fail("Only 2 children");
+ }
+ }
+ } else {
+ fail("Only 2 queues with q1 and p1 ");
+ }
+ }
}
/**
- * Test to verify refreshing of queue properties by using MRAdmin tool.
+ * Test the refresh of queues.
*
* @throws Exception
*/
- public void testACLRefresh() throws Exception {
- String queueConfigPath =
- System.getProperty("test.build.extraconf", "build/test/extraconf");
- File queueConfigFile =
- new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
- File hadoopConfigFile = new File(queueConfigPath, "mapred-site.xml");
- try {
- //Setting up default mapred-site.xml
- Properties hadoopConfProps = new Properties();
- //these properties should be retained.
- hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
- hadoopConfProps.put("mapred.acls.enabled", "true");
- //These property should always be overridden
- hadoopConfProps.put("mapred.queue.default.acl-submit-job", "u1");
- hadoopConfProps.put("mapred.queue.q1.acl-submit-job", "u2");
- hadoopConfProps.put("mapred.queue.q2.acl-submit-job", "u1");
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-
- //Actual property which would be used.
- Properties queueConfProps = new Properties();
- queueConfProps.put("mapred.queue.default.acl-submit-job", " ");
- //Writing out the queue configuration file.
- UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
-
- //Create a new configuration to be used with QueueManager
- JobConf conf = new JobConf();
- QueueManager queueManager = new QueueManager(conf);
- UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI();
- //Job Submission should fail because ugi to be used is set to blank.
- assertFalse("User Job Submission Succeeded before refresh.",
- queueManager.hasAccess("default", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertFalse("User Job Submission Succeeded before refresh.",
- queueManager.hasAccess("q1", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertFalse("User Job Submission Succeeded before refresh.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
-
- //Test job submission as alternate user.
- Configuration alternateUserConfig = new Configuration();
- alternateUserConfig.set("hadoop.job.ugi","u1,users");
- UserGroupInformation alternateUgi =
- UserGroupInformation.readFrom(alternateUserConfig);
- assertTrue("Alternate User Job Submission failed before refresh.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, alternateUgi));
-
- //Set acl for the current user.
- queueConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
- queueConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
- queueConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
- //write out queue-acls.xml.
- UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
- //refresh configuration
- queueManager.refreshQueues(conf);
- //Submission should succeed
- assertTrue("User Job Submission failed after refresh.",
- queueManager.hasAccess("default", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed after refresh.",
- queueManager.hasAccess("q1", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed after refresh.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertFalse("Alternate User Job Submission succeeded after refresh.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, alternateUgi));
- //delete the ACL file.
- queueConfigFile.delete();
-
- //rewrite the mapred-site.xml
- hadoopConfProps.put("mapred.acls.enabled", "true");
- hadoopConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
- queueManager.refreshQueues(conf);
- assertTrue("User Job Submission failed after refresh and no queue acls file.",
- queueManager.hasAccess("default", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- } finally{
- if(queueConfigFile.exists()) {
- queueConfigFile.delete();
+ @Test
+ public void testRefresh() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+ Queue beforeRefreshRoot = qm.getRoot();
+ //remove the file and create new one.
+ Set<Queue> rootQueues = beforeRefreshRoot.getChildren();
+ for (Queue qs : rootQueues) {
+ if (qs.getName().equals("q1")) {
+
+ assertEquals(qs.getProperties().getProperty("capacity"), "10");
+ assertEquals(qs.getProperties().getProperty("maxCapacity"), "35");
+
+ } else if (qs.getName().equals("p1")) {
+
+ Set<Queue> children = qs.getChildren();
+ for (Queue child : children) {
+ if (child.getName().equals(
+ "p1" + NAME_SEPARATOR + "p12")) {
+ assertTrue(
+ child.getAcls().get(
+ QueueManager.toFullPropertyName(
+ child.getName(), ACL_SUBMIT_JOB_TAG))
+ .getUsers().contains("u1"));
+
+ assertTrue(
+ child.getAcls().get(
+ QueueManager.toFullPropertyName(
+ child.getName(),
+ ACL_ADMINISTER_JOB_TAG))
+ .getUsers().contains("u2"));
+ assertTrue(child.getState().equals(QueueState.STOPPED));
+ } else {
+ assertTrue(child.getState().equals(QueueState.RUNNING));
+ }
+ }
}
- if(hadoopConfigFile.exists()) {
- hadoopConfigFile.delete();
+ }
+ checkForConfigFile();
+ doc = createDocument();
+ refreshSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+ qm.getRoot().isHierarchySameAs(cp.getRoot());
+ qm.setQueues(
+ cp.getRoot().getChildren().toArray(
+ new Queue[cp.getRoot().getChildren().size()]));
+ Queue afterRefreshRoot = qm.getRoot();
+ //remove the file and create new one.
+ rootQueues = afterRefreshRoot.getChildren();
+ for (Queue qs : rootQueues) {
+ if (qs.getName().equals("q1")) {
+
+ assertEquals(qs.getProperties().getProperty("capacity"), "70");
+ assertEquals(qs.getProperties().getProperty("maxCapacity"), "35");
+
+ } else if (qs.getName().equals("p1")) {
+
+ Set<Queue> children = qs.getChildren();
+ for (Queue child : children) {
+ if (child.getName().equals(
+ "p1" + NAME_SEPARATOR + "p12")) {
+ assertTrue(
+ child.getAcls().get(
+ QueueManager.toFullPropertyName(
+ child.getName(),
+ ACL_SUBMIT_JOB_TAG))
+ .getUsers().contains("u3"));
+
+ assertTrue(
+ child.getAcls().get(
+ QueueManager.toFullPropertyName(
+ child.getName(),
+ ACL_ADMINISTER_JOB_TAG))
+ .getUsers().contains("u4"));
+ assertTrue(child.getState().equals(QueueState.RUNNING));
+ } else {
+ assertTrue(child.getState().equals(QueueState.STOPPED));
+ }
+ }
}
}
}
+ @Test
+ public void testRefreshWithInvalidFile() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ QueueManager qm = new QueueManager(CONFIG);
+
+ checkForConfigFile();
+ doc = createDocument();
+ Element queues = createQueuesNode(doc, "false");
+ Element q1 = createQueue(doc, "");
+ queues.appendChild(q1);
+ writeToFile(doc, CONFIG);
+ try {
+ QueueConfigurationParser cp = new QueueConfigurationParser(CONFIG);
+ fail("Should throw an exception as configuration is wrong ");
+ } catch (Throwable re) {
+ re.printStackTrace();
+ LOG.info(re.getMessage());
+ }
+ }
/**
- * Test to verify refreshing of queue properties by using MRAdmin tool.
- *
- * @throws Exception
+ * Class to store the array of queues retrieved by parsing the string
+ * that is dumped in Json format
*/
- public void testStateRefresh() throws Exception {
- String queueConfigPath =
- System.getProperty("test.build.extraconf", "build/test/extraconf");
- File queueConfigFile =
- new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
- try {
- //Setting up default mapred-site.xml
- Properties queueConfProps = new Properties();
- //these properties should be retained.
- queueConfProps.put("mapred.queue.names", "default,qu1");
- queueConfProps.put("mapred.acls.enabled", "true");
- //These property should always be overridden
- queueConfProps.put("mapred.queue.default.state", "running");
- queueConfProps.put("mapred.queue.qu1.state", "stopped");
- UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
-
- //Create a new configuration to be used with QueueManager
- JobConf conf = new JobConf();
- setUpCluster(conf);
- QueueManager queueManager =
- this.miniMRCluster.getJobTrackerRunner().getJobTracker().getQueueManager();
-
- try{
- Job job = submitSleepJob(10, 2, 10, 10, true,null, "default" );
- assert(job.isSuccessful());
- }catch(Exception e){
- fail("submit job in default queue should be sucessful ");
- }
+ static class JsonQueueTree {
+ boolean acls_enabled;
+
+ JsonQueue[] queues;
- try{
- submitSleepJob(10, 2, 10, 10, true,null, "qu1" );
- fail("submit job in default queue should be failed ");
- }catch(Exception e){
- assert(e.getMessage().contains("Queue \"" + "qu1" + "\" is not running"));
- }
+ public JsonQueue[] getQueues() {
+ return queues;
+ }
- // verify state of queues before refresh
- JobQueueInfo queueInfo = queueManager.getJobQueueInfo("default");
- assertEquals(Queue.QueueState.RUNNING.getStateName(),
- queueInfo.getQueueState());
- queueInfo = queueManager.getJobQueueInfo("qu1");
- assertEquals(Queue.QueueState.STOPPED.getStateName(),
- queueInfo.getQueueState());
-
- queueConfProps.put("mapred.queue.default.state", "stopped");
- queueConfProps.put("mapred.queue.qu1.state", "running");
- UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
-
- //refresh configuration
- queueManager.refreshQueues(conf);
-
- //Job Submission should pass now because ugi to be used is set to blank.
- try{
- submitSleepJob(10, 2, 10, 10, true,null,"qu1");
- }catch(Exception e){
- fail("submit job in qu1 queue should be sucessful ");
- }
+ public void setQueues(JsonQueue[] queues) {
+ this.queues = queues;
+ }
- try{
- submitSleepJob(10, 2, 10, 10, true,null, "default" );
- fail("submit job in default queue should be failed ");
- }catch(Exception e){
- assert(e.getMessage().contains("Queue \"" + "default" + "\" is not running"));
- }
-
- // verify state of queues after refresh
- queueInfo = queueManager.getJobQueueInfo("default");
- assertEquals(Queue.QueueState.STOPPED.getStateName(),
- queueInfo.getQueueState());
- queueInfo = queueManager.getJobQueueInfo("qu1");
- assertEquals(Queue.QueueState.RUNNING.getStateName(),
- queueInfo.getQueueState());
- } finally{
- if(queueConfigFile.exists()) {
- queueConfigFile.delete();
- }
- this.tearDownCluster();
+ public boolean isAcls_enabled() {
+ return acls_enabled;
}
- }
- public void testQueueAclRefreshWithInvalidConfFile() throws IOException {
- String queueConfigPath =
- System.getProperty("test.build.extraconf", "build/test/extraconf");
- File queueConfigFile =
- new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
- File hadoopConfigFile = new File(queueConfigPath, "hadoop-site.xml");
- try {
- // queue properties with which the cluster is started.
- Properties hadoopConfProps = new Properties();
- hadoopConfProps.put("mapred.queue.names", "default,q1,q2");
- hadoopConfProps.put("mapred.acls.enabled", "true");
- UtilsForTests.setUpConfigFile(hadoopConfProps, hadoopConfigFile);
-
- //properties for mapred-queue-acls.xml
- Properties queueConfProps = new Properties();
- UserGroupInformation ugi = UnixUserGroupInformation.getCurrentUGI();
- queueConfProps.put("mapred.queue.default.acl-submit-job", ugi.getUserName());
- queueConfProps.put("mapred.queue.q1.acl-submit-job", ugi.getUserName());
- queueConfProps.put("mapred.queue.q2.acl-submit-job", ugi.getUserName());
- UtilsForTests.setUpConfigFile(queueConfProps, queueConfigFile);
-
- Configuration conf = new JobConf();
- QueueManager queueManager = new QueueManager(conf);
- //Testing access to queue.
- assertTrue("User Job Submission failed.",
- queueManager.hasAccess("default", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed.",
- queueManager.hasAccess("q1", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
-
- //Write out a new incomplete invalid configuration file.
- PrintWriter writer = new PrintWriter(new FileOutputStream(queueConfigFile));
- writer.println("<configuration>");
- writer.println("<property>");
- writer.flush();
- writer.close();
- try {
- //Exception to be thrown by queue manager because configuration passed
- //is invalid.
- queueManager.refreshQueues(conf);
- fail("Refresh of ACLs should have failed with invalid conf file.");
- } catch (Exception e) {
- }
- assertTrue("User Job Submission failed after invalid conf file refresh.",
- queueManager.hasAccess("default", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed after invalid conf file refresh.",
- queueManager.hasAccess("q1", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- assertTrue("User Job Submission failed after invalid conf file refresh.",
- queueManager.hasAccess("q2", Queue.QueueOperation.
- SUBMIT_JOB, ugi));
- } finally {
- //Cleanup the configuration files in all cases
- if(hadoopConfigFile.exists()) {
- hadoopConfigFile.delete();
- }
- if(queueConfigFile.exists()) {
- queueConfigFile.delete();
- }
+ public void setAcls_enabled(boolean aclsEnabled) {
+ acls_enabled = aclsEnabled;
}
}
-
- private JobConf setupConf(String aclName, String aclValue) {
- JobConf conf = new JobConf();
- conf.setBoolean("mapred.acls.enabled", true);
- conf.set(aclName, aclValue);
- return conf;
- }
-
- private void verifyQueues(Set<String> expectedQueues,
- Set<String> actualQueues) {
- assertEquals(expectedQueues.size(), actualQueues.size());
- for (String queue : expectedQueues) {
- assertTrue(actualQueues.contains(queue));
+ /**
+ * Class to store the contents of each queue that is dumped in JSON format.
+ */
+ static class JsonQueue {
+ String name;
+ String state;
+ String acl_submit_job;
+ String acl_administer_jobs;
+ JsonProperty[] properties;
+ JsonQueue[] children;
+ public String getName() {
+ return name;
}
- }
-
- private void verifyJobSubmission(JobConf conf, boolean shouldSucceed)
- throws IOException, InterruptedException, ClassNotFoundException {
- verifyJobSubmission(conf, shouldSucceed, "default");
- }
-
- private void verifyJobSubmission(JobConf conf, boolean shouldSucceed,
- String queue)
- throws IOException, InterruptedException, ClassNotFoundException {
- setUpCluster(conf);
- try {
- runAndVerifySubmission(conf, shouldSucceed, queue, null);
- } finally {
- tearDownCluster();
+ public String getState() {
+ return state;
}
- }
-
- private void runAndVerifySubmission(JobConf conf, boolean shouldSucceed,
- String queue, String userInfo)
- throws IOException, InterruptedException, ClassNotFoundException {
- try {
- Job rjob = submitSleepJob(1, 1, 100, 100, true, null, queue);
- if (shouldSucceed) {
- assertTrue(rjob.isSuccessful());
- } else {
- fail("Job submission should have failed.");
- }
- } catch (IOException ioe) {
- if (shouldSucceed) {
- throw ioe;
- } else {
- LOG.info("exception while submitting job: " + ioe.getMessage());
- assertTrue(ioe.getMessage().
- contains("cannot perform operation " +
- "SUBMIT_JOB on queue " + queue));
- // check if the system directory gets cleaned up or not
- JobTracker jobtracker = miniMRCluster.getJobTrackerRunner().getJobTracker();
- Path sysDir = new Path(jobtracker.getSystemDir());
- FileSystem fs = sysDir.getFileSystem(conf);
- int size = fs.listStatus(sysDir).length;
- while (size > 1) { // ignore the jobtracker.info file
- System.out.println("Waiting for the job files in sys directory to be cleaned up");
- UtilsForTests.waitFor(100);
- size = fs.listStatus(sysDir).length;
- }
- }
- } finally {
- tearDownCluster();
+ public JsonProperty[] getProperties() {
+ return properties;
}
-}
-
- private void verifyJobKill(JobConf conf, boolean shouldSucceed)
- throws IOException, InterruptedException, ClassNotFoundException {
- setUpCluster(conf);
- try {
- Job rjob = submitSleepJob(1, 1, 1000, 1000, false);
- assertFalse(rjob.isComplete());
- while(rjob.mapProgress() == 0.0f) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException ie) {
- break;
- }
- }
- rjob.killJob();
- while (!rjob.isComplete()) {
- try {
- Thread.sleep(10);
- } catch (InterruptedException ie) {
- break;
- }
- }
- if (shouldSucceed) {
- assertTrue(!rjob.isSuccessful());
- } else {
- fail("Job kill should have failed.");
- }
- } catch (IOException ioe) {
- if (shouldSucceed) {
- throw ioe;
- } else {
- LOG.info("exception while submitting job: " + ioe.getMessage());
- assertTrue(ioe.getMessage().
- contains("cannot perform operation " +
- "ADMINISTER_JOBS on queue default"));
- }
- } finally {
- tearDownCluster();
+ public JsonQueue[] getChildren() {
+ return children;
}
- }
-
-
- private void verifyJobKillAsOtherUser(JobConf conf, boolean shouldSucceed,
- String otherUserInfo)
- throws IOException, InterruptedException, ClassNotFoundException {
- setUpCluster(conf);
- try {
- // submit a job as another user.
- String userInfo = otherUserInfo;
- Job job = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
- assertFalse(job.isComplete());
-
- //try to kill as self
- try {
- JobClient jc = new JobClient(miniMRCluster.createJobConf());
- RunningJob rjob = jc.getJob((JobID)job.getID());
- rjob.killJob();
- if (!shouldSucceed) {
- fail("should fail kill operation");
- }
- } catch (IOException ioe) {
- if (shouldSucceed) {
- throw ioe;
- }
- //verify it fails
- LOG.info("exception while submitting job: " + ioe.getMessage());
- assertTrue(ioe.getMessage().
- contains("cannot perform operation " +
- "ADMINISTER_JOBS on queue default"));
- }
- //wait for job to complete on its own
- while (!job.isComplete()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- break;
- }
- }
- } finally {
- tearDownCluster();
+ public void setName(String name) {
+ this.name = name;
}
- }
-
- private void verifyJobPriorityChangeAsOtherUser(JobConf conf,
- boolean shouldSucceed, String otherUserInfo)
- throws IOException, InterruptedException, ClassNotFoundException {
- setUpCluster(conf);
- try {
- // submit job as another user.
- String userInfo = otherUserInfo;
- Job job = submitSleepJob(1, 1, 1000, 1000, false, userInfo);
- assertFalse(job.isComplete());
-
- // try to change priority as self
- try {
- JobClient jc = new JobClient(miniMRCluster.createJobConf());
- RunningJob rjob = jc.getJob((JobID)job.getID());
- rjob.setJobPriority("VERY_LOW");
- if (!shouldSucceed) {
- fail("changing priority should fail.");
- }
- } catch (IOException ioe) {
- //verify it fails
- LOG.info("exception while submitting job: " + ioe.getMessage());
- assertTrue(ioe.getMessage().
- contains("cannot perform operation " +
- "ADMINISTER_JOBS on queue default"));
- }
- //wait for job to complete on its own
- while (!job.isComplete()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ie) {
- break;
- }
- }
- } finally {
- tearDownCluster();
+ public void setState(String state) {
+ this.state = state;
+ }
+ public void setProperties(JsonProperty[] properties) {
+ this.properties = properties;
+ }
+ public void setChildren(JsonQueue[] children) {
+ this.children = children;
+ }
+ public String getAcl_submit_job() {
+ return acl_submit_job;
+ }
+ public void setAcl_submit_job(String aclSubmitJob) {
+ acl_submit_job = aclSubmitJob;
+ }
+ public String getAcl_administer_jobs() {
+ return acl_administer_jobs;
+ }
+ public void setAcl_administer_jobs(String aclAdministerJobs) {
+ acl_administer_jobs = aclAdministerJobs;
}
}
- private void setUpCluster(JobConf conf) throws IOException {
- miniDFSCluster = new MiniDFSCluster(conf, 1, true, null);
- FileSystem fileSys = miniDFSCluster.getFileSystem();
- String namenode = fileSys.getUri().toString();
- miniMRCluster = new MiniMRCluster(1, namenode, 3,
- null, null, conf);
- }
-
- private void tearDownCluster() throws IOException {
- if (miniMRCluster != null) { miniMRCluster.shutdown(); }
- if (miniDFSCluster != null) { miniDFSCluster.shutdown(); }
- }
-
- private Job submitSleepJob(int numMappers, int numReducers,
- long mapSleepTime, long reduceSleepTime,
- boolean shouldComplete)
- throws IOException, InterruptedException, ClassNotFoundException {
- return submitSleepJob(numMappers, numReducers, mapSleepTime,
- reduceSleepTime, shouldComplete, null);
- }
-
- private Job submitSleepJob(int numMappers, int numReducers,
- long mapSleepTime, long reduceSleepTime,
- boolean shouldComplete, String userInfo)
- throws IOException, InterruptedException, ClassNotFoundException {
- return submitSleepJob(numMappers, numReducers, mapSleepTime,
- reduceSleepTime, shouldComplete, userInfo, null);
- }
-
- private Job submitSleepJob(int numMappers, int numReducers,
- long mapSleepTime, long reduceSleepTime,
- boolean shouldComplete, String userInfo,
- String queueName)
- throws IOException, InterruptedException, ClassNotFoundException {
- Configuration clientConf = new Configuration();
- clientConf.set("mapred.job.tracker", "localhost:"
- + miniMRCluster.getJobTrackerPort());
- if (userInfo != null) {
- clientConf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, userInfo);
- }
- if (queueName != null) {
- clientConf.set("mapred.job.queue.name", queueName);
- }
- SleepJob sleep = new SleepJob();
- sleep.setConf(clientConf);
- Job job = sleep.createJob(numMappers, numReducers,
- mapSleepTime, (int)mapSleepTime/100,
- reduceSleepTime, (int)reduceSleepTime/100);
- if (shouldComplete) {
- job.waitForCompletion(false);
- } else {
- job.submit();
+ /**
+ * Class to store the contents of attribute "properties" in Json dump
+ */
+ static class JsonProperty {
+ String key;
+ String value;
+ public String getKey() {
+ return key;
+ }
+ public void setKey(String key) {
+ this.key = key;
+ }
+ public String getValue() {
+ return value;
+ }
+ public void setValue(String value) {
+ this.value = value;
}
- return job;
}
+ /**
+ * checks the format of the dump in JSON format when
+ * QueueManager.dumpConfiguration(Writer) is called.
+ * @throws Exception
+ */
+ @Test
+ public void testDumpConfiguration() throws Exception {
+ checkForConfigFile();
+ Document doc = createDocument();
+ createSimpleDocument(doc);
+ writeToFile(doc, CONFIG);
+ StringWriter out = new StringWriter();
+ QueueManager.dumpConfiguration(out,CONFIG,null);
+ ObjectMapper mapper = new ObjectMapper();
+ // parse the Json dump
+ JsonQueueTree queueTree =
+ mapper.readValue(out.toString(), JsonQueueTree.class);
+
+ // check if acls_enabled is correct
+ assertEquals(true, queueTree.isAcls_enabled());
+ // check for the number of top-level queues
+ assertEquals(2, queueTree.getQueues().length);
+
+ HashMap<String, JsonQueue> topQueues = new HashMap<String, JsonQueue>();
+ for (JsonQueue topQueue : queueTree.getQueues()) {
+ topQueues.put(topQueue.getName(), topQueue);
+ }
+
+ // check for consistency in number of children
+ assertEquals(2, topQueues.get("p1").getChildren().length);
+
+ HashMap<String, JsonQueue> childQueues = new HashMap<String, JsonQueue>();
+ for (JsonQueue child : topQueues.get("p1").getChildren()) {
+ childQueues.put(child.getName(), child);
+ }
+
+ // check for consistency in state
+ assertEquals("stopped", childQueues.get("p1:p12").getState());
+
+ // check for consistency in properties
+ HashMap<String, JsonProperty> q1_properties =
+ new HashMap<String, JsonProperty>();
+ for (JsonProperty prop : topQueues.get("q1").getProperties()) {
+ q1_properties.put(prop.getKey(), prop);
+ }
+ assertEquals("10", q1_properties.get("capacity").getValue());
+ assertEquals("35", q1_properties.get("maxCapacity").getValue());
+
+ // check for acls
+ assertEquals("u1", childQueues.get("p1:p12").getAcl_submit_job());
+ assertEquals("u2", childQueues.get("p1:p12").getAcl_administer_jobs());
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRackAwareTaskPlacement.java Sat Nov 28 20:26:01 2009
@@ -26,10 +26,13 @@
import junit.framework.TestSuite;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.StaticMapping;
@@ -58,8 +61,8 @@
new TestSetup(new TestSuite(TestRackAwareTaskPlacement.class)) {
protected void setUp() throws Exception {
JobConf conf = new JobConf();
- conf.set("mapred.job.tracker", "localhost:0");
- conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+ conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+ conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
conf.setClass("topology.node.switch.mapping.impl",
StaticMapping.class, DNSToSwitchMapping.class);
jobTracker = new FakeJobTracker(conf, new FakeClock(), trackers);
@@ -85,11 +88,12 @@
this.profile = new JobProfile(jc.getUser(), jobid,
jobFile.toString(), null, jc.getJobName(),
jc.getQueueName());
+ this.jobHistory = new FakeJobHistory();
}
@Override
public void initTasks() throws IOException {
- JobClient.RawSplit[] splits = createSplits();
+ Job.RawSplit[] splits = createSplits();
numMapTasks = splits.length;
createMapTasks(null, splits);
nonRunningMapCache = createCache(splits, maxLevel);
@@ -99,8 +103,8 @@
}
- protected JobClient.RawSplit[] createSplits() throws IOException {
- RawSplit[] splits = new RawSplit[numMaps];
+ protected Job.RawSplit[] createSplits() throws IOException {
+ Job.RawSplit[] splits = new Job.RawSplit[numMaps];
// Hand code for now.
// M0,2,3 reside in Host1
// M1 resides in Host3
@@ -108,7 +112,7 @@
String[] splitHosts0 = new String[] { allHosts[0] };
for (int i = 0; i < numMaps; i++) {
- splits[i] = new RawSplit();
+ splits[i] = new Job.RawSplit();
splits[i].setDataLength(0);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestRecoveryManager.java Sat Nov 28 20:26:01 2009
@@ -30,8 +30,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobTracker.RecoveryManager;
-import org.apache.hadoop.mapred.MiniMRCluster.JobTrackerRunner;
-import org.apache.hadoop.mapred.TestJobInProgressListener.MyScheduler;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.security.UserGroupInformation;
/**
@@ -64,8 +63,7 @@
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(TEST_DIR, true); // cleanup
- conf.set("mapred.jobtracker.job.history.block.size", "1024");
- conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
+ conf.set(JTConfig.JT_JOBHISTORY_BLOCK_SIZE, "1024");
MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
@@ -100,7 +98,8 @@
String sysDir = mr.getJobTrackerRunner().getJobTracker().getSystemDir();
mr.stopJobTracker();
- // delete the job.xml of job #1 causing the job to fail in constructor
+ // delete the job.xml of job #1 causing the job to fail in submit Job
+ //while recovery itself
Path jobFile =
new Path(sysDir, rJob1.getID().toString() + Path.SEPARATOR + "job.xml");
LOG.info("Deleting job.xml file : " + jobFile.toString());
@@ -123,8 +122,7 @@
out.close();
// make sure that the jobtracker is in recovery mode
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
- true);
+ mr.getJobTrackerConf().setBoolean(JTConfig.JT_RESTART_ENABLED, true);
// start the jobtracker
LOG.info("Starting jobtracker");
mr.startJobTracker();
@@ -134,7 +132,12 @@
// check if the jobtracker came up or not
assertEquals("JobTracker crashed!",
JobTracker.State.RUNNING, status.getJobTrackerState());
-
+
+ // assert the no of recovered jobs
+ assertEquals("No of recovered jobs not correct",
+ 1, mr.getJobTrackerRunner().getJobTracker().
+ recoveryManager.getRecovered());
+
mr.shutdown();
}
@@ -159,8 +162,9 @@
fs.delete(TEST_DIR, true);
JobConf conf = new JobConf();
- conf.set("mapred.jobtracker.job.history.block.size", "1024");
- conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
+ conf.set(JTConfig.JT_JOBHISTORY_BLOCK_SIZE, "1024");
+ conf.set(
+ DeprecatedQueueConfigurationParser.MAPRED_QUEUE_NAMES_KEY, "default");
MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
@@ -216,6 +220,8 @@
LOG.info("Submitted job " + rJob3.getID() + " with different user");
jip = jobtracker.getJob(rJob3.getID());
+ assertEquals("Restart count is not correct",
+ 0, jip.getNumRestarts());
while (!jip.inited()) {
LOG.info("Waiting for job " + jip.getJobID() + " to be inited");
@@ -227,9 +233,9 @@
mr.stopJobTracker();
// make sure that the jobtracker is in recovery mode
- mr.getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
+ mr.getJobTrackerConf().setBoolean(JTConfig.JT_RESTART_ENABLED,
true);
- mr.getJobTrackerConf().setInt("mapred.jobtracker.maxtasks.per.job", 25);
+ mr.getJobTrackerConf().setInt(JTConfig.JT_TASKS_PER_JOB, 25);
mr.getJobTrackerConf().setBoolean("mapred.acls.enabled" , true);
UserGroupInformation ugi = UserGroupInformation.readFrom(job1);
@@ -247,6 +253,11 @@
assertEquals("Recovery manager failed to tolerate job failures",
2, jobtracker.getAllJobs().length);
+ // assert the no of recovered jobs
+ assertEquals("No of recovered jobs not correct",
+ 2, jobtracker.recoveryManager.getRecovered());
+ assertEquals("Restart count is not correct",
+ 1, jobtracker.getJob(rJob2.getID()).getNumRestarts());
// check if the job#1 has failed
JobStatus status = jobtracker.getJobStatus(rJob1.getID());
assertEquals("Faulty job not failed",
@@ -260,141 +271,6 @@
mr.shutdown();
}
-
- /**
- * Test if restart count of the jobtracker is correctly managed.
- * Steps are as follows :
- * - start the jobtracker and check if the info file gets created.
- * - stops the jobtracker, deletes the jobtracker.info file and checks if
- * upon restart the recovery is 'off'
- * - submit a job to the jobtracker.
- * - restart the jobtracker k times and check if the restart count on ith
- * iteration is i.
- * - submit a new job and check if its restart count is 0.
- * - garble the jobtracker.info file and restart he jobtracker, the
- * jobtracker should crash.
- */
- public void testRestartCount() throws Exception {
- LOG.info("Testing restart-count");
- String signalFile = new Path(TEST_DIR, "signal").toString();
-
- // clean up
- FileSystem fs = FileSystem.get(new Configuration());
- fs.delete(TEST_DIR, true);
-
- JobConf conf = new JobConf();
- conf.set("mapred.jobtracker.job.history.block.size", "1024");
- conf.set("mapred.jobtracker.job.history.buffer.size", "1024");
- conf.setBoolean("mapred.jobtracker.restart.recover", true);
- // since there is no need for initing
- conf.setClass("mapred.jobtracker.taskScheduler", MyScheduler.class,
- TaskScheduler.class);
-
- MiniMRCluster mr = new MiniMRCluster(1, "file:///", 1, null, null, conf);
- JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
- JobClient jc = new JobClient(mr.createJobConf());
-
- // check if the jobtracker info file exists
- Path infoFile = jobtracker.recoveryManager.getRestartCountFile();
- assertTrue("Jobtracker infomation is missing", fs.exists(infoFile));
-
- // check if garbling the system files disables the recovery process
- LOG.info("Stopping jobtracker for testing with system files deleted");
- mr.stopJobTracker();
-
- // delete the info file
- Path rFile = jobtracker.recoveryManager.getRestartCountFile();
- fs.delete(rFile,false);
-
- // start the jobtracker
- LOG.info("Starting jobtracker with system files deleted");
- mr.startJobTracker();
-
- UtilsForTests.waitForJobTracker(jc);
- jobtracker = mr.getJobTrackerRunner().getJobTracker();
-
- // check if the recovey is disabled
- assertFalse("Recovery is not disabled upon missing system files",
- jobtracker.recoveryManager.shouldRecover());
-
- // check if the system dir is sane
- assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
- Path tFile = jobtracker.recoveryManager.getTempRestartCountFile();
- assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
-
- // submit a job
- JobConf job = mr.createJobConf();
-
- UtilsForTests.configureWaitingJobConf(job,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output6"), 2, 0,
- "test-recovery-manager", signalFile, signalFile);
-
- // submit the faulty job
- RunningJob rJob = jc.submitJob(job);
- LOG.info("Submitted first job " + rJob.getID());
-
- // kill the jobtracker multiple times and check if the count is correct
- for (int i = 1; i <= 5; ++i) {
- LOG.info("Stopping jobtracker for " + i + " time");
- mr.stopJobTracker();
-
- // start the jobtracker
- LOG.info("Starting jobtracker for " + i + " time");
- mr.startJobTracker();
-
- UtilsForTests.waitForJobTracker(jc);
-
- // check if the system dir is sane
- assertTrue("Recovery file is missing upon restart", fs.exists(rFile));
- assertFalse("Temp recovery file exists upon restart", fs.exists(tFile));
-
- jobtracker = mr.getJobTrackerRunner().getJobTracker();
- JobInProgress jip = jobtracker.getJob(rJob.getID());
-
- // assert if restart count is correct
- assertEquals("Recovery manager failed to recover restart count",
- i, jip.getNumRestarts());
- }
-
- // kill the old job
- rJob.killJob();
-
- // II. Submit a new job and check if the restart count is 0
- JobConf job1 = mr.createJobConf();
-
- UtilsForTests.configureWaitingJobConf(job1,
- new Path(TEST_DIR, "input"), new Path(TEST_DIR, "output7"), 50, 0,
- "test-recovery-manager", signalFile, signalFile);
-
- // make sure that the job id's dont clash
- jobtracker.getNewJobId();
-
- // submit a new job
- rJob = jc.submitJob(job1);
- LOG.info("Submitted first job after restart" + rJob.getID());
-
- // assert if restart count is correct
- JobInProgress jip = jobtracker.getJob(rJob.getID());
- assertEquals("Restart count for new job is incorrect",
- 0, jip.getNumRestarts());
-
- LOG.info("Stopping jobtracker for testing the fs errors");
- mr.stopJobTracker();
-
- // check if system.dir problems in recovery kills the jobtracker
- fs.delete(rFile, false);
- FSDataOutputStream out = fs.create(rFile);
- out.writeBoolean(true);
- out.close();
-
- // start the jobtracker
- LOG.info("Starting jobtracker with fs errors");
- mr.startJobTracker();
- JobTrackerRunner runner = mr.getJobTrackerRunner();
- assertFalse("JobTracker is still alive", runner.isActive());
-
- mr.shutdown();
- }
/**
* Test if the jobtracker waits for the info file to be created before
@@ -411,8 +287,8 @@
// start the jobtracker
JobConf conf = new JobConf();
FileSystem.setDefaultUri(conf, namenode);
- conf.set("mapred.job.tracker", "localhost:0");
- conf.set("mapred.job.tracker.http.address", "127.0.0.1:0");
+ conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
+ conf.set(JTConfig.JT_HTTP_ADDRESS, "127.0.0.1:0");
JobTracker jobtracker = new JobTracker(conf);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetch.java Sat Nov 28 20:26:01 2009
@@ -33,13 +33,13 @@
public void testReduceFromDisk() throws Exception {
final int MAP_TASKS = 8;
JobConf job = mrCluster.createJobConf();
- job.set("mapred.job.reduce.input.buffer.percent", "0.0");
+ job.set(JobContext.REDUCE_INPUT_BUFFER_PERCENT, "0.0");
job.setNumMapTasks(MAP_TASKS);
job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
- job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
- job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
- job.setInt("io.sort.factor", 2);
- job.setInt("mapred.inmem.merge.threshold", 4);
+ job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
+ job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.05");
+ job.setInt(JobContext.IO_SORT_FACTOR, 2);
+ job.setInt(JobContext.REDUCE_MERGE_INMEM_THRESHOLD, 4);
Counters c = runJob(job);
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
@@ -56,9 +56,9 @@
public void testReduceFromMem() throws Exception {
final int MAP_TASKS = 3;
JobConf job = mrCluster.createJobConf();
- job.set("mapred.job.reduce.input.buffer.percent", "1.0");
- job.set("mapred.job.shuffle.input.buffer.percent", "1.0");
- job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
+ job.set(JobContext.REDUCE_INPUT_BUFFER_PERCENT, "1.0");
+ job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "1.0");
+ job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
job.setNumMapTasks(MAP_TASKS);
Counters c = runJob(job);
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceFetchFromPartialMem.java Sat Nov 28 20:26:01 2009
@@ -77,17 +77,17 @@
/** Verify that at least one segment does not hit disk */
public void testReduceFromPartialMem() throws Exception {
- final int MAP_TASKS = 5;
+ final int MAP_TASKS = 7;
JobConf job = mrCluster.createJobConf();
job.setNumMapTasks(MAP_TASKS);
- job.setInt("mapred.inmem.merge.threshold", 0);
- job.set("mapred.job.reduce.input.buffer.percent", "1.0");
- job.setInt("mapred.reduce.parallel.copies", 1);
- job.setInt("io.sort.mb", 10);
+ job.setInt(JobContext.REDUCE_MERGE_INMEM_THRESHOLD, 0);
+ job.set(JobContext.REDUCE_INPUT_BUFFER_PERCENT, "1.0");
+ job.setInt(JobContext.SHUFFLE_PARALLEL_COPIES, 1);
+ job.setInt(JobContext.IO_SORT_MB, 10);
job.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, "-Xmx128m");
- job.setInt("mapred.job.reduce.total.mem.bytes", 128 << 20);
- job.set("mapred.job.shuffle.input.buffer.percent", "0.14");
- job.set("mapred.job.shuffle.merge.percent", "1.0");
+ job.setLong(JobContext.REDUCE_MEMORY_TOTAL_BYTES, 128 << 20);
+ job.set(JobContext.SHUFFLE_INPUT_BUFFER_PERCENT, "0.14");
+ job.set(JobContext.SHUFFLE_MERGE_EPRCENT, "1.0");
Counters c = runJob(job);
final long out = c.findCounter(TaskCounter.MAP_OUTPUT_RECORDS).getCounter();
final long spill = c.findCounter(TaskCounter.SPILLED_RECORDS).getCounter();
@@ -112,7 +112,7 @@
@Override
public void configure(JobConf conf) {
nMaps = conf.getNumMapTasks();
- id = nMaps - conf.getInt("mapred.task.partition", -1) - 1;
+ id = nMaps - conf.getInt(JobContext.TASK_PARTITION, -1) - 1;
Arrays.fill(b, 0, 4096, (byte)'V');
((StringBuilder)fmt.out()).append(keyfmt);
}
@@ -248,8 +248,8 @@
conf.setNumReduceTasks(1);
conf.setInputFormat(FakeIF.class);
conf.setNumTasksToExecutePerJvm(1);
- conf.setInt("mapred.map.max.attempts", 0);
- conf.setInt("mapred.reduce.max.attempts", 0);
+ conf.setInt(JobContext.MAP_MAX_ATTEMPTS, 0);
+ conf.setInt(JobContext.REDUCE_MAX_ATTEMPTS, 0);
FileInputFormat.setInputPaths(conf, new Path("/in"));
final Path outp = new Path("/out");
FileOutputFormat.setOutputPath(conf, outp);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestReduceTask.java Sat Nov 28 20:26:01 2009
@@ -90,7 +90,7 @@
@SuppressWarnings("unchecked")
RawKeyValueIterator rawItr =
Merger.merge(conf, rfs, Text.class, Text.class, codec, new Path[]{path},
- false, conf.getInt("io.sort.factor", 100), tmpDir,
+ false, conf.getInt(JobContext.IO_SORT_FACTOR, 100), tmpDir,
new Text.Comparator(), new NullProgress(), null, null, null);
@SuppressWarnings("unchecked") // WritableComparators are not generic
ReduceTask.ValuesIterator valItr =
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java Sat Nov 28 20:26:01 2009
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,7 +18,9 @@
package org.apache.hadoop.mapred;
import junit.framework.TestCase;
-import org.apache.hadoop.mapred.JobClient.RawSplit;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Job.RawSplit;
public class TestResourceEstimation extends TestCase {
@@ -32,7 +34,8 @@
jc.setNumMapTasks(maps);
jc.setNumReduceTasks(reduces);
- JobInProgress jip = new JobInProgress(jid, jc, null);
+ JobInProgress jip = new JobInProgress(jid, jc,
+ UtilsForTests.getJobTracker());
//unfortunately, we can't set job input size from here.
ResourceEstimator re = new ResourceEstimator(jip);
@@ -44,7 +47,7 @@
TaskStatus ts = new MapTaskStatus();
ts.setOutputSize(singleMapOutputSize);
- RawSplit split = new RawSplit();
+ Job.RawSplit split = new Job.RawSplit();
split.setDataLength(0);
TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
@@ -64,7 +67,8 @@
jc.setNumMapTasks(maps);
jc.setNumReduceTasks(reduces);
- JobInProgress jip = new JobInProgress(jid, jc, null) {
+ JobInProgress jip = new JobInProgress(jid, jc,
+ UtilsForTests.getJobTracker()) {
long getInputLength() {
return singleMapInputSize*desiredMaps();
}
@@ -79,7 +83,7 @@
TaskStatus ts = new MapTaskStatus();
ts.setOutputSize(singleMapOutputSize);
- RawSplit split = new RawSplit();
+ Job.RawSplit split = new Job.RawSplit();
split.setDataLength(singleMapInputSize);
TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
@@ -91,7 +95,7 @@
//add one more map task with input size as 0
TaskStatus ts = new MapTaskStatus();
ts.setOutputSize(singleMapOutputSize);
- RawSplit split = new RawSplit();
+ Job.RawSplit split = new Job.RawSplit();
split.setDataLength(0);
TaskInProgress tip = new TaskInProgress(jid, "", split, null, jc, jip, 0, 1);
re.updateWithCompletedTask(ts, tip);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java Sat Nov 28 20:26:01 2009
@@ -53,7 +53,7 @@
fail("Failed to create output directory");
}
- job.set("mapred.task.id", attempt);
+ job.set(JobContext.TASK_ATTEMPT_ID, attempt);
FileOutputFormat.setOutputPath(job, dir.getParent().getParent());
FileOutputFormat.setWorkOutputPath(job, dir);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Sat Nov 28 20:26:01 2009
@@ -27,6 +27,8 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
/**
* Tests various failures in setup/cleanup of job, like
@@ -47,16 +49,16 @@
}
}
- // Commiter with cleanupJob throwing exception
- static class CommitterWithFailCleanup extends FileOutputCommitter {
+ // Commiter with commitJob throwing exception
+ static class CommitterWithFailCommit extends FileOutputCommitter {
@Override
- public void cleanupJob(JobContext context) throws IOException {
+ public void commitJob(JobContext context) throws IOException {
throw new IOException();
}
}
// Committer waits for a file to be created on dfs.
- static class CommitterWithLongSetupAndCleanup extends FileOutputCommitter {
+ static class CommitterWithLongSetupAndCommit extends FileOutputCommitter {
private void waitForSignalFile(FileSystem fs, Path signalFile)
throws IOException {
@@ -76,9 +78,9 @@
}
@Override
- public void cleanupJob(JobContext context) throws IOException {
+ public void commitJob(JobContext context) throws IOException {
waitForSignalFile(FileSystem.get(context.getJobConf()), cleanupSignalFile);
- super.cleanupJob(context);
+ super.commitJob(context);
}
}
@@ -121,7 +123,7 @@
throws IOException {
// launch job with waiting setup/cleanup
JobConf jobConf = mr.createJobConf();
- jobConf.setOutputCommitter(CommitterWithLongSetupAndCleanup.class);
+ jobConf.setOutputCommitter(CommitterWithLongSetupAndCommit.class);
RunningJob job = UtilsForTests.runJob(jobConf, inDir, outDir);
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jt.getJob(job.getID());
@@ -229,15 +231,14 @@
dfs = new MiniDFSCluster(conf, 4, true, null);
fileSys = dfs.getFileSystem();
JobConf jtConf = new JobConf();
- jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
- jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
- jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000);
- jtConf.setInt("mapred.reduce.copy.backoff", 4);
+ jtConf.setInt(TTConfig.TT_MAP_SLOTS, 1);
+ jtConf.setInt(TTConfig.TT_REDUCE_SLOTS, 1);
+ jtConf.setLong(JTConfig.JT_TRACKER_EXPIRY_INTERVAL, 10 * 1000);
mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1,
null, null, jtConf);
// test setup/cleanup throwing exceptions
testFailCommitter(CommitterWithFailSetup.class, mr.createJobConf());
- testFailCommitter(CommitterWithFailCleanup.class, mr.createJobConf());
+ testFailCommitter(CommitterWithFailCommit.class, mr.createJobConf());
// test the command-line kill for setup/cleanup attempts.
testSetupAndCleanupKill(mr, dfs, true);
// remove setup/cleanup signal files.