You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/09/11 20:49:26 UTC
svn commit: r694415 [2/2] - in /hadoop/core/trunk: conf/ src/contrib/
src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/
src/contrib/capacity-scheduler/src/java/
src/contrib/capacity-scheduler/src/java/org/ src/contrib/capacity-schedul...
Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Sep 11 11:49:22 2008
@@ -0,0 +1,792 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.BytesWritable;
+//import org.apache.hadoop.mapred.CapacityTaskScheduler;
+import org.apache.hadoop.conf.Configuration;
+
+public class TestCapacityScheduler extends TestCase {
+
+ private static int jobCounter;
+
+ static class FakeJobInProgress extends JobInProgress {
+
+ private FakeTaskTrackerManager taskTrackerManager;
+ private int mapTaskCtr;
+ private int redTaskCtr;
+ private Set<TaskInProgress> mapTips =
+ new HashSet<TaskInProgress>();
+ private Set<TaskInProgress> reduceTips =
+ new HashSet<TaskInProgress>();
+
+ public FakeJobInProgress(JobID jId, JobConf jobConf,
+ FakeTaskTrackerManager taskTrackerManager, String user)
+ throws IOException {
+ super(jId, jobConf);
+ this.taskTrackerManager = taskTrackerManager;
+ this.startTime = System.currentTimeMillis();
+ this.status = new JobStatus();
+ this.status.setRunState(JobStatus.PREP);
+ if (null == jobConf.getQueueName()) {
+ this.profile = new JobProfile(user, jId,
+ null, null, null);
+ }
+ else {
+ this.profile = new JobProfile(user, jId,
+ null, null, null, jobConf.getQueueName());
+ }
+ mapTaskCtr = 0;
+ redTaskCtr = 0;
+ }
+
+ @Override
+ public synchronized void initTasks() throws IOException {
+ getStatus().setRunState(JobStatus.RUNNING);
+ }
+
+ @Override
+ public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
+ int ignored) throws IOException {
+ if (runningMapTasks == numMapTasks) return null;
+ TaskAttemptID attemptId = getTaskAttemptID(true);
+ Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
+ @Override
+ public String toString() {
+ return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+ }
+ };
+ taskTrackerManager.startTask(tts.getTrackerName(), task);
+ runningMapTasks++;
+ // create a fake TIP and keep track of it
+ mapTips.add(new FakeTaskInProgress(getJobID(),
+ getJobConf(), task, true, this));
+ return task;
+ }
+
+ @Override
+ public Task obtainNewReduceTask(final TaskTrackerStatus tts,
+ int clusterSize, int ignored) throws IOException {
+ if (runningReduceTasks == numReduceTasks) return null;
+ TaskAttemptID attemptId = getTaskAttemptID(false);
+ Task task = new ReduceTask("", attemptId, 0, 10) {
+ @Override
+ public String toString() {
+ return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+ }
+ };
+ taskTrackerManager.startTask(tts.getTrackerName(), task);
+ runningReduceTasks++;
+ // create a fake TIP and keep track of it
+ reduceTips.add(new FakeTaskInProgress(getJobID(),
+ getJobConf(), task, false, this));
+ return task;
+ }
+
+ public void mapTaskFinished() {
+ runningMapTasks--;
+ finishedMapTasks++;
+ }
+
+ public void reduceTaskFinished() {
+ runningReduceTasks--;
+ finishedReduceTasks++;
+ }
+
+ private TaskAttemptID getTaskAttemptID(boolean isMap) {
+ JobID jobId = getJobID();
+ return new TaskAttemptID(jobId.getJtIdentifier(),
+ jobId.getId(), isMap, (isMap)?++mapTaskCtr: ++redTaskCtr, 0);
+ }
+
+ @Override
+ Set<TaskInProgress> getNonLocalRunningMaps() {
+ return (Set<TaskInProgress>)mapTips;
+ }
+ @Override
+ Set<TaskInProgress> getRunningReduces() {
+ return (Set<TaskInProgress>)reduceTips;
+ }
+ }
+
+ static class FakeTaskInProgress extends TaskInProgress {
+ private boolean isMap;
+ private FakeJobInProgress fakeJob;
+ private TreeMap<TaskAttemptID, String> activeTasks;
+ private TaskStatus taskStatus;
+ FakeTaskInProgress(JobID jId, JobConf jobConf, Task t,
+ boolean isMap, FakeJobInProgress job) {
+ super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
+ this.isMap = isMap;
+ this.fakeJob = job;
+ activeTasks = new TreeMap<TaskAttemptID, String>();
+ activeTasks.put(t.getTaskID(), "tt");
+ // create a fake status for a task that is running for a bit
+ this.taskStatus = TaskStatus.createTaskStatus(isMap);
+ taskStatus.setProgress(0.5f);
+ taskStatus.setRunState(TaskStatus.State.RUNNING);
+ }
+
+ @Override
+ TreeMap<TaskAttemptID, String> getActiveTasks() {
+ return activeTasks;
+ }
+ @Override
+ public TaskStatus getTaskStatus(TaskAttemptID taskid) {
+ // return a status for a task that has run a bit
+ return taskStatus;
+ }
+ @Override
+ boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
+ if (isMap) {
+ fakeJob.mapTaskFinished();
+ }
+ else {
+ fakeJob.reduceTaskFinished();
+ }
+ return true;
+ }
+ }
+
+ static class FakeQueueManager extends QueueManager {
+ private Set<String> queues = null;
+ FakeQueueManager() {
+ super(new Configuration());
+ }
+ void setQueues(Set<String> queues) {
+ this.queues = queues;
+ }
+ public synchronized Set<String> getQueues() {
+ return queues;
+ }
+ }
+
+ static class FakeTaskTrackerManager implements TaskTrackerManager {
+ int maps = 0;
+ int reduces = 0;
+ int maxMapTasksPerTracker = 2;
+ int maxReduceTasksPerTracker = 1;
+ List<JobInProgressListener> listeners =
+ new ArrayList<JobInProgressListener>();
+ FakeQueueManager qm = new FakeQueueManager();
+
+ private Map<String, TaskTrackerStatus> trackers =
+ new HashMap<String, TaskTrackerStatus>();
+ private Map<String, TaskStatus> taskStatuses =
+ new HashMap<String, TaskStatus>();
+
+ public FakeTaskTrackerManager() {
+ trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
+ new ArrayList<TaskStatus>(), 0,
+ maxMapTasksPerTracker, maxReduceTasksPerTracker));
+ trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
+ new ArrayList<TaskStatus>(), 0,
+ maxMapTasksPerTracker, maxReduceTasksPerTracker));
+ }
+
+ public void addTaskTracker(String ttName) {
+ trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", 1,
+ new ArrayList<TaskStatus>(), 0,
+ maxMapTasksPerTracker, maxReduceTasksPerTracker));
+ }
+
+ public ClusterStatus getClusterStatus() {
+ int numTrackers = trackers.size();
+ return new ClusterStatus(numTrackers, maps, reduces,
+ numTrackers * maxMapTasksPerTracker,
+ numTrackers * maxReduceTasksPerTracker,
+ JobTracker.State.RUNNING);
+ }
+
+ public int getNumberOfUniqueHosts() {
+ return 0;
+ }
+
+ public int getNextHeartbeatInterval() {
+ return MRConstants.HEARTBEAT_INTERVAL_MIN;
+ }
+
+ public Collection<TaskTrackerStatus> taskTrackers() {
+ return trackers.values();
+ }
+
+
+ public void addJobInProgressListener(JobInProgressListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeJobInProgressListener(JobInProgressListener listener) {
+ listeners.remove(listener);
+ }
+
+ public void submitJob(JobInProgress job) {
+ for (JobInProgressListener listener : listeners) {
+ listener.jobAdded(job);
+ }
+ }
+
+ public TaskTrackerStatus getTaskTracker(String trackerID) {
+ return trackers.get(trackerID);
+ }
+
+ public void startTask(String taskTrackerName, final Task t) {
+ if (t.isMapTask()) {
+ maps++;
+ } else {
+ reduces++;
+ }
+ TaskStatus status = new TaskStatus() {
+ @Override
+ public boolean getIsMap() {
+ return t.isMapTask();
+ }
+ };
+ taskStatuses.put(t.getTaskID().toString(), status);
+ status.setRunState(TaskStatus.State.RUNNING);
+ trackers.get(taskTrackerName).getTaskReports().add(status);
+ }
+
+ public void finishTask(String taskTrackerName, String tipId,
+ FakeJobInProgress j) {
+ TaskStatus status = taskStatuses.get(tipId);
+ if (status.getIsMap()) {
+ maps--;
+ j.mapTaskFinished();
+ } else {
+ reduces--;
+ j.reduceTaskFinished();
+ }
+ status.setRunState(TaskStatus.State.SUCCEEDED);
+ }
+
+ void addQueues(String[] arr) {
+ Set<String> queues = new HashSet<String>();
+ for (String s: arr) {
+ queues.add(s);
+ }
+ qm.setQueues(queues);
+ }
+
+ public QueueManager getQueueManager() {
+ return qm;
+ }
+ }
+
+ // represents a fake queue configuration info
+ static class FakeQueueInfo {
+ String queueName;
+ float gc;
+ int reclaimTimeLimit;
+ boolean supportsPrio;
+ int ulMin;
+
+ public FakeQueueInfo(String queueName, float gc,
+ int reclaimTimeLimit, boolean supportsPrio, int ulMin) {
+ this.queueName = queueName;
+ this.gc = gc;
+ this.reclaimTimeLimit = reclaimTimeLimit;
+ this.supportsPrio = supportsPrio;
+ this.ulMin = ulMin;
+ }
+ }
+
+ static class FakeResourceManagerConf extends CapacitySchedulerConf {
+
+ // map of queue names to queue info
+ private Map<String, FakeQueueInfo> queueMap =
+ new LinkedHashMap<String, FakeQueueInfo>();
+ String firstQueue;
+
+ void setFakeQueues(List<FakeQueueInfo> queues) {
+ for (FakeQueueInfo q: queues) {
+ queueMap.put(q.queueName, q);
+ }
+ firstQueue = new String(queues.get(0).queueName);
+ }
+
+ public synchronized Set<String> getQueues() {
+ return queueMap.keySet();
+ }
+
+ /*public synchronized String getFirstQueue() {
+ return firstQueue;
+ }*/
+
+ public float getGuaranteedCapacity(String queue) {
+ return queueMap.get(queue).gc;
+ }
+
+ public int getReclaimTimeLimit(String queue) {
+ return queueMap.get(queue).reclaimTimeLimit;
+ }
+
+ public int getMinimumUserLimitPercent(String queue) {
+ return queueMap.get(queue).ulMin;
+ }
+
+ public boolean isPrioritySupported(String queue) {
+ return queueMap.get(queue).supportsPrio;
+ }
+ }
+
+ protected class FakeClock extends CapacityTaskScheduler.Clock {
+ private long time = 0;
+
+ public void advance(long millis) {
+ time += millis;
+ }
+
+ @Override
+ long getTime() {
+ return time;
+ }
+ }
+
+
+ protected JobConf conf;
+ protected CapacityTaskScheduler scheduler;
+ private FakeTaskTrackerManager taskTrackerManager;
+ private FakeResourceManagerConf resConf;
+ private FakeClock clock;
+
+ @Override
+ protected void setUp() throws Exception {
+ jobCounter = 0;
+ taskTrackerManager = new FakeTaskTrackerManager();
+ clock = new FakeClock();
+ scheduler = new CapacityTaskScheduler(clock);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+
+ conf = new JobConf();
+ // set interval to a large number so thread doesn't interfere with us
+ conf.setLong("mapred.capacity-scheduler.reclaimCapacity.interval", 500);
+ scheduler.setConf(conf);
+
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ if (scheduler != null) {
+ scheduler.terminate();
+ }
+ }
+
+ private FakeJobInProgress submitJob(int state, int maps, int reduces,
+ String queue, String user) throws IOException {
+ JobConf jobConf = new JobConf(conf);
+ jobConf.setNumMapTasks(maps);
+ jobConf.setNumReduceTasks(reduces);
+ if (queue != null)
+ jobConf.setQueueName(queue);
+ FakeJobInProgress job = new FakeJobInProgress(
+ new JobID("test", ++jobCounter), jobConf, taskTrackerManager, user);
+ job.getStatus().setRunState(state);
+ taskTrackerManager.submitJob(job);
+ return job;
+ }
+
+ /*protected void submitJobs(int number, int state, int maps, int reduces)
+ throws IOException {
+ for (int i = 0; i < number; i++) {
+ submitJob(state, maps, reduces);
+ }
+ }*/
+
+ // basic tests, should be able to submit to queues
+ public void testSubmitToQueues() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // submit a job with no queue specified. It should be accepted
+ // and given to the default queue.
+ JobInProgress j = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ // when we ask for a task, we should get one, from the job submitted
+ Task t;
+ t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // submit another job, to a different queue
+ j = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ // now when we get a task, it should be from the second job
+ t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+ }
+
+ // test capacity transfer
+ public void testCapacityTransfer() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // submit a job
+ submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ // for queue 'q2', the GC for maps is 2. Since we're the only user,
+ // we should get a task
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // I should get another map task.
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ // Now we're at full capacity for maps. If I ask for another map task,
+ // I should get a map task from the default queue's capacity.
+ checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+ // and another
+ checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+ }
+
+ // test user limits
+ public void testUserLimits() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // submit a job
+ submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ // for queue 'q2', the GC for maps is 2. Since we're the only user,
+ // we should get a task
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // Submit another job, from a different user
+ submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ // Now if I ask for a map task, it should come from the second job
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ // Now we're at full capacity for maps. If I ask for another map task,
+ // I should get a map task from the default queue's capacity.
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ // and another
+ checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+ }
+
+ // test user limits when a 2nd job is submitted much after first job
+ public void testUserLimits2() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // submit a job
+ submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ // for queue 'q2', the GC for maps is 2. Since we're the only user,
+ // we should get a task
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // since we're the only job, we get another map
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ // Submit another job, from a different user
+ submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ // Now if I ask for a map task, it should come from the second job
+ checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+ // and another
+ checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+ }
+
+ // test user limits when a 2nd job is submitted much after first job
+ // and we need to wait for first job's task to complete
+ public void testUserLimits3() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // submit a job
+ FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ // for queue 'q2', the GC for maps is 2. Since we're the only user,
+ // we should get a task
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // since we're the only job, we get another map
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ // we get two more maps from 'default queue'
+ checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+ // Submit another job, from a different user
+ FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ // one of the task finishes
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+ // Now if I ask for a map task, it should come from the second job
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ // another task from job1 finishes, another new task to job2
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
+ checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+ // now we have equal number of tasks from each job. Whichever job's
+ // task finishes, that job gets a new task
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
+ checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
+ checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+ }
+
+ // test user limits with many users, more slots
+ public void testUserLimits4() throws Exception {
+ // set up one queue, with 10 slots
+ String[] qs = {"default"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, 10000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+ // add some more TTs
+ taskTrackerManager.addTaskTracker("tt3");
+ taskTrackerManager.addTaskTracker("tt4");
+ taskTrackerManager.addTaskTracker("tt5");
+
+ // u1 submits job
+ FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ // it gets the first 5 slots
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+ checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
+ // u2 submits job with 4 slots
+ FakeJobInProgress j2 = submitJob(JobStatus.PREP, 4, 4, null, "u2");
+ // u2 should get next 4 slots
+ checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
+ checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
+ checkAssignment("tt4", "attempt_test_0002_m_000003_0 on tt4");
+ checkAssignment("tt5", "attempt_test_0002_m_000004_0 on tt5");
+ // last slot should go to u1, since u2 has no more tasks
+ checkAssignment("tt5", "attempt_test_0001_m_000006_0 on tt5");
+ // u1 finishes a task
+ taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
+ // u1 submits a few more jobs
+ submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ // u2 also submits a job
+ submitJob(JobStatus.PREP, 10, 10, null, "u2");
+ // now u3 submits a job
+ submitJob(JobStatus.PREP, 2, 2, null, "u3");
+ // next slot should go to u3, even though u2 has an earlier job, since
+ // user limits have changed and u1/u2 are over limits
+ checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
+ // some other task finishes and u3 gets it
+ taskTrackerManager.finishTask("tt5", "attempt_test_0002_m_000004_0", j1);
+ checkAssignment("tt5", "attempt_test_0007_m_000002_0 on tt5");
+ // now, u2 finishes a task
+ taskTrackerManager.finishTask("tt4", "attempt_test_0002_m_000002_0", j1);
+ // next slot will go to u1, since u3 has nothing to run and u1's job is
+ // first in the queue
+ checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
+ }
+
+ // test code to reclaim capacity
+ public void testReclaimCapacity() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2", "q3"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 25.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("q3", 25.0f, 1000000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // set up a situation where q2 is under capacity, and default & q3
+ // are at/over capacity
+ FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+ // now submit a job to q2
+ FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+ // update our structures
+ scheduler.updateQSIInfo();
+ // get scheduler to notice that q2 needs to reclaim
+ scheduler.reclaimCapacity();
+ // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so
+ // we start reclaiming when 15 secs are left.
+ clock.advance(400000);
+ scheduler.reclaimCapacity();
+ // no tasks should have been killed yet
+ assertEquals(j1.runningMapTasks, 3);
+ assertEquals(j2.runningMapTasks, 1);
+ clock.advance(200000);
+ scheduler.reclaimCapacity();
+ // task from j1 will be killed
+ assertEquals(j1.runningMapTasks, 2);
+ assertEquals(j2.runningMapTasks, 1);
+
+ }
+
+ // test code to reclaim multiple capacity
+ public void testReclaimCapacity2() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2", "q3", "q4"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 20.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("q3", 20.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("q4", 10.0f, 1000000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // add some more TTs so our total map capacity is 10
+ taskTrackerManager.addTaskTracker("tt3");
+ taskTrackerManager.addTaskTracker("tt4");
+ taskTrackerManager.addTaskTracker("tt5");
+
+ // q2 has nothing running, default is under cap, q3 and q4 are over cap
+ FakeJobInProgress j1 = submitJob(JobStatus.PREP, 2, 2, null, "u1");
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q4", "u1");
+ checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
+ checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
+ checkAssignment("tt4", "attempt_test_0003_m_000002_0 on tt4");
+ checkAssignment("tt4", "attempt_test_0002_m_000004_0 on tt4");
+ checkAssignment("tt5", "attempt_test_0002_m_000005_0 on tt5");
+ checkAssignment("tt5", "attempt_test_0003_m_000003_0 on tt5");
+ // at this point, q3 is running 5 tasks (with a cap of 2), q4 is
+ // running 3 tasks (with a cap of 1).
+ // If we submit a job to 'default', we need to get 3 slots back.
+ FakeJobInProgress j4 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ // update our structures
+ scheduler.updateQSIInfo();
+ // get scheduler to notice that q2 needs to reclaim
+ scheduler.reclaimCapacity();
+ // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so
+ // we start reclaiming when 15 secs are left.
+ clock.advance(400000);
+ scheduler.reclaimCapacity();
+ // nothing should have happened
+ assertEquals(j2.runningMapTasks, 5);
+ assertEquals(j3.runningMapTasks, 3);
+ // 3 tasks to kill, 5 running over cap. q3 should give up 3*3/5 = 2 slots.
+ // q4 should give up 2*3/5 = 1 slot.
+ clock.advance(200000);
+ scheduler.reclaimCapacity();
+ assertEquals(j2.runningMapTasks, 3);
+ assertEquals(j3.runningMapTasks, 2);
+
+ }
+
+ // test code to reclaim capacity in steps
+ public void testReclaimCapacityInSteps() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, 1000000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // set up a situation where q2 is under capacity, and default is
+ // at/over capacity
+ FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+ // now submit a job to q2
+ FakeJobInProgress j2 = submitJob(JobStatus.PREP, 1, 1, "q2", "u1");
+ // update our structures
+ scheduler.updateQSIInfo();
+ // get scheduler to notice that q2 needs to reclaim
+ scheduler.reclaimCapacity();
+ // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so
+ // we start reclaiming when 15 secs are left.
+ clock.advance(400000);
+ // submit another job to q2 which causes more capacity to be reclaimed
+ j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
+ // update our structures
+ scheduler.updateQSIInfo();
+ clock.advance(200000);
+ scheduler.reclaimCapacity();
+ // one task from j1 will be killed
+ assertEquals(j1.runningMapTasks, 3);
+ clock.advance(300000);
+ scheduler.reclaimCapacity();
+ // timer for 2nd job hasn't fired, so nothing killed
+ assertEquals(j1.runningMapTasks, 3);
+ clock.advance(400000);
+ scheduler.reclaimCapacity();
+ // one task from j1 will be killed
+ assertEquals(j1.runningMapTasks, 2);
+
+ }
+
+ protected TaskTrackerStatus tracker(String taskTrackerName) {
+ return taskTrackerManager.getTaskTracker(taskTrackerName);
+ }
+
+ protected Task checkAssignment(String taskTrackerName,
+ String expectedTaskString) throws IOException {
+ List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+ assertNotNull(expectedTaskString, tasks);
+ assertEquals(expectedTaskString, 1, tasks.size());
+ assertEquals(expectedTaskString, tasks.get(0).toString());
+ return tasks.get(0);
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=694415&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java (added)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Thu Sep 11 11:49:22 2008
@@ -0,0 +1,245 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+
+public class TestCapacitySchedulerConf extends TestCase {
+
+ private static String testDataDir = System.getProperty("test.build.data");
+ private static String testConfFile;
+
+ private Map<String, String> defaultProperties;
+ private CapacitySchedulerConf testConf;
+ private PrintWriter writer;
+
+ static {
+ if (testDataDir == null) {
+ testDataDir = ".";
+ } else {
+ new File(testDataDir).mkdirs();
+ }
+ testConfFile = new File(testDataDir, "test-conf.xml").getAbsolutePath();
+ }
+
+ public TestCapacitySchedulerConf() {
+ defaultProperties = setupQueueProperties(
+ new String[] { "guaranteed-capacity",
+ "reclaim-time-limit",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "100",
+ "300",
+ "false",
+ "100" }
+ );
+ }
+
+
+ public void setUp() throws IOException {
+ openFile();
+ }
+
+ public void tearDown() throws IOException {
+ File confFile = new File(testConfFile);
+ if (confFile.exists()) {
+ confFile.delete();
+ }
+ }
+
+ public void testDefaults() {
+ testConf = new CapacitySchedulerConf();
+ Map<String, Map<String, String>> queueDetails
+ = new HashMap<String, Map<String,String>>();
+ queueDetails.put("default", defaultProperties);
+ checkQueueProperties(testConf, queueDetails);
+ }
+
+ public void testQueues() {
+
+ Map<String, String> q1Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity",
+ "reclaim-time-limit",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "10",
+ "600",
+ "true",
+ "25" }
+ );
+
+ Map<String, String> q2Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity",
+ "reclaim-time-limit",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "100",
+ "6000",
+ "false",
+ "50" }
+ );
+
+ startConfig();
+ writeQueueDetails("default", q1Props);
+ writeQueueDetails("research", q2Props);
+ endConfig();
+
+ testConf = new CapacitySchedulerConf(new Path(testConfFile));
+
+ Map<String, Map<String, String>> queueDetails
+ = new HashMap<String, Map<String,String>>();
+ queueDetails.put("default", q1Props);
+ queueDetails.put("research", q2Props);
+ checkQueueProperties(testConf, queueDetails);
+ }
+
+ public void testQueueWithDefaultProperties() {
+ Map<String, String> q1Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity",
+ "minimum-user-limit-percent" },
+ new String[] { "20",
+ "75" }
+ );
+ startConfig();
+ writeQueueDetails("default", q1Props);
+ endConfig();
+
+ testConf = new CapacitySchedulerConf(new Path(testConfFile));
+
+ Map<String, Map<String, String>> queueDetails
+ = new HashMap<String, Map<String,String>>();
+ Map<String, String> expProperties = new HashMap<String, String>();
+ for (String key : q1Props.keySet()) {
+ expProperties.put(key, q1Props.get(key));
+ }
+ expProperties.put("reclaim-time-limit", "300");
+ expProperties.put("supports-priority", "false");
+ queueDetails.put("default", expProperties);
+ checkQueueProperties(testConf, queueDetails);
+ }
+
+ public void testReload() throws IOException {
+ // use the setup in the test case testQueues as a base...
+ testQueues();
+
+ // write new values to the file...
+ Map<String, String> q1Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity",
+ "reclaim-time-limit",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "20.5",
+ "600",
+ "true",
+ "40" }
+ );
+
+ Map<String, String> q2Props = setupQueueProperties(
+ new String[] { "guaranteed-capacity",
+ "reclaim-time-limit",
+ "supports-priority",
+ "minimum-user-limit-percent" },
+ new String[] { "100",
+ "3000",
+ "false",
+ "50" }
+ );
+
+ openFile();
+ startConfig();
+ writeQueueDetails("default", q1Props);
+ writeQueueDetails("production", q2Props);
+ endConfig();
+
+ testConf.reloadConfiguration();
+
+ Map<String, Map<String, String>> queueDetails
+ = new HashMap<String, Map<String, String>>();
+ queueDetails.put("default", q1Props);
+ queueDetails.put("production", q2Props);
+ checkQueueProperties(testConf, queueDetails);
+ }
+
+ private void checkQueueProperties(
+ CapacitySchedulerConf testConf,
+ Map<String, Map<String, String>> queueDetails) {
+ for (String queueName : queueDetails.keySet()) {
+ Map<String, String> map = queueDetails.get(queueName);
+ assertEquals(Float.parseFloat(map.get("guaranteed-capacity")),
+ testConf.getGuaranteedCapacity(queueName));
+ assertEquals(Integer.parseInt(map.get("minimum-user-limit-percent")),
+ testConf.getMinimumUserLimitPercent(queueName));
+ assertEquals(Integer.parseInt(map.get("reclaim-time-limit")),
+ testConf.getReclaimTimeLimit(queueName));
+ assertEquals(Boolean.parseBoolean(map.get("supports-priority")),
+ testConf.isPrioritySupported(queueName));
+ }
+ }
+
+ private Map<String, String> setupQueueProperties(String[] keys,
+ String[] values) {
+ HashMap<String, String> map = new HashMap<String, String>();
+ for(int i=0; i<keys.length; i++) {
+ map.put(keys[i], values[i]);
+ }
+ return map;
+ }
+
+ private void openFile() throws IOException {
+
+ if (testDataDir != null) {
+ File f = new File(testDataDir);
+ f.mkdirs();
+ }
+ FileWriter fw = new FileWriter(testConfFile);
+ BufferedWriter bw = new BufferedWriter(fw);
+ writer = new PrintWriter(bw);
+ }
+
+ private void startConfig() {
+ writer.println("<?xml version=\"1.0\"?>");
+ writer.println("<configuration>");
+ }
+
+ private void writeQueueDetails(String queue, Map<String, String> props) {
+ for (String key : props.keySet()) {
+ writer.println("<property>");
+ writer.println("<name>mapred.capacity-scheduler.queue."
+ + queue + "." + key +
+ "</name>");
+ writer.println("<value>"+props.get(key)+"</value>");
+ writer.println("</property>");
+ }
+ }
+
+ private void endConfig() {
+ writer.println("</configuration>");
+ writer.close();
+ }
+
+}
Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=694415&r1=694414&r2=694415&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Thu Sep 11 11:49:22 2008
@@ -156,6 +156,11 @@
listeners.remove(listener);
}
+ @Override
+ public int getNextHeartbeatInterval() {
+ return MRConstants.HEARTBEAT_INTERVAL_MIN;
+ }
+
// Test methods
public void submitJob(JobInProgress job) {
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=694415&r1=694414&r2=694415&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Thu Sep 11 11:49:22 2008
@@ -72,6 +72,10 @@
int finishedReduceTasks = 0;
int failedMapTasks = 0;
int failedReduceTasks = 0;
+ // runningMapTasks include speculative tasks, so we need to capture
+ // speculative tasks separately
+ int speculativeMapTasks = 0;
+ int speculativeReduceTasks = 0;
int mapFailuresPercent = 0;
int reduceFailuresPercent = 0;
@@ -441,6 +445,14 @@
public synchronized int finishedReduces() {
return finishedReduceTasks;
}
+ public synchronized int pendingMaps() {
+ return numMapTasks - runningMapTasks - failedMapTasks -
+ finishedMapTasks + speculativeMapTasks;
+ }
+ public synchronized int pendingReduces() {
+ return numReduceTasks - runningReduceTasks - failedReduceTasks -
+ finishedReduceTasks + speculativeReduceTasks;
+ }
public JobPriority getPriority() {
return this.priority;
}
@@ -484,7 +496,34 @@
TaskInProgress[] getReduceTasks() {
return reduces;
}
-
+
+ /**
+ * Return the nonLocalRunningMaps
+ * @return
+ */
+ Set<TaskInProgress> getNonLocalRunningMaps()
+ {
+ return nonLocalRunningMaps;
+ }
+
+ /**
+ * Return the runningMapCache
+ * @return
+ */
+ Map<Node, Set<TaskInProgress>> getRunningMapCache()
+ {
+ return runningMapCache;
+ }
+
+ /**
+ * Return runningReduces
+ * @return
+ */
+ Set<TaskInProgress> getRunningReduces()
+ {
+ return runningReduces;
+ }
+
/**
* Get the job configuration
* @return the job's configuration
@@ -738,6 +777,8 @@
Task result = maps[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
runningMapTasks += 1;
+ if (maps[target].getActiveTasks().size() > 1)
+ speculativeMapTasks++;
if (maps[target].isFirstAttempt(result.getTaskID())) {
JobHistory.Task.logStarted(maps[target].getTIPId(), Values.MAP.name(),
System.currentTimeMillis(),
@@ -849,6 +890,8 @@
Task result = reduces[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
runningReduceTasks += 1;
+ if (reduces[target].getActiveTasks().size() > 1)
+ speculativeReduceTasks++;
if (reduces[target].isFirstAttempt(result.getTaskID())) {
JobHistory.Task.logStarted(reduces[target].getTIPId(), Values.REDUCE.name(),
System.currentTimeMillis(), "");
@@ -1467,6 +1510,7 @@
JobTrackerInstrumentation metrics)
{
TaskAttemptID taskid = status.getTaskID();
+ int oldNumAttempts = tip.getActiveTasks().size();
// Sanity check: is the TIP already complete?
// It _is_ safe to not decrement running{Map|Reduce}Tasks and
@@ -1514,10 +1558,14 @@
status.getCounters());
}
- // Update the running/finished map/reduce counts
+ int newNumAttempts = tip.getActiveTasks().size();
if (!tip.isCleanupTask()) {
if (tip.isMapTask()) {
runningMapTasks -= 1;
+ // check if this was a sepculative task
+ if (oldNumAttempts > 1) {
+ speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
+ }
finishedMapTasks += 1;
metrics.completeMap(taskid);
// remove the completed map from the resp running caches
@@ -1527,6 +1575,9 @@
}
} else {
runningReduceTasks -= 1;
+ if (oldNumAttempts > 1) {
+ speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
+ }
finishedReduceTasks += 1;
metrics.completeReduce(taskid);
// remove the completed reduces from the running reducers set
@@ -1620,7 +1671,7 @@
jobKilled = true;
}
}
-
+
/**
* A task assigned to this JobInProgress has reported in as failed.
* Most of the time, we'll just reschedule execution. However, after
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=694415&r1=694414&r2=694415&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Sep 11 11:49:22 2008
@@ -1272,7 +1272,7 @@
* Heartbeat interval is incremented 1second for every 50 nodes.
* @return next heartbeat interval.
*/
- private int getNextHeartbeatInterval() {
+ public int getNextHeartbeatInterval() {
// get the no of task trackers
int clusterSize = getClusterStatus().getTaskTrackers();
int heartbeatInterval = Math.max(
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=694415&r1=694414&r2=694415&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java Thu Sep 11 11:49:22 2008
@@ -63,4 +63,12 @@
* @return the {@link QueueManager}
*/
public QueueManager getQueueManager();
+
+ /**
+ * Return the current heartbeat interval that's used by {@link TaskTracker}s.
+ *
+ * @return the heartbeat interval used by {@link TaskTracker}s
+ */
+ public int getNextHeartbeatInterval();
+
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=694415&r1=694414&r2=694415&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Thu Sep 11 11:49:22 2008
@@ -144,6 +144,11 @@
return null;
}
+ @Override
+ public int getNextHeartbeatInterval() {
+ return MRConstants.HEARTBEAT_INTERVAL_MIN;
+ }
+
// Test methods
public void submitJob(JobInProgress job) {