You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:23:39 UTC
svn commit: r1076933 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
mapred/org/apache/hadoop/mapred/
Author: omalley
Date: Fri Mar 4 03:23:38 2011
New Revision: 1076933
URL: http://svn.apache.org/viewvc?rev=1076933&view=rev
Log:
commit b3b11c4936a501eda5991fbfa4498fc1f336cf9d
Author: Lee Tucker <lt...@yahoo-inc.com>
Date: Thu Jul 30 17:40:17 2009 -0700
Applying patch 2709938.5739.patch
Added:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java?rev=1076933&r1=1076932&r2=1076933&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java Fri Mar 4 03:23:38 2011
@@ -197,6 +197,13 @@ public class ClusterWithCapacitySchedule
}
}
+ /**
+ * @return the mrCluster
+ */
+ public MiniMRCluster getMrCluster() {
+ return mrCluster;
+ }
+
static class MyClassLoader extends ClassLoader {
@Override
public URL getResource(String name) {
Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java?rev=1076933&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java Fri Mar 4 03:23:38 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Properties;
+import org.apache.hadoop.mapred.ControlledMapReduceJob.ControlledMapReduceJobRunner;
+
+
+public class TestJobTrackerRestartWithCS extends ClusterWithCapacityScheduler {
+
+ /**
+ * Test single queue.
+ *
+ * <p>
+ *
+ * Submit a job with more M/R tasks than total capacity. Full queue capacity
+ * should be utilized and remaining M/R tasks should wait for slots to be
+ * available.
+ *
+ * @throws Exception
+ */
+ public void testJobTrackerRestartWithCS()
+ throws Exception {
+ try {
+ Properties schedulerProps = new Properties();
+ schedulerProps.put(
+ "mapred.capacity-scheduler.queue.default.guaranteed-capacity", "100");
+ Properties clusterProps = new Properties();
+ clusterProps.put("mapred.tasktracker.map.tasks.maximum", String.valueOf(2));
+ clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String.valueOf(0));
+
+ // cluster capacity 2 maps, 0 reduces
+ startCluster(1, clusterProps, schedulerProps);
+
+ ControlledMapReduceJobRunner jobRunner =
+ ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
+ getJobConf(), 4, 0);
+ jobRunner.start();
+ ControlledMapReduceJob controlledJob = jobRunner.getJob();
+ JobID myJobID = jobRunner.getJobID();
+ JobInProgress myJob = getJobTracker().getJob(myJobID);
+ ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 2);
+
+ LOG.info("Trying to finish 2 maps");
+ controlledJob.finishNTasks(true, 2);
+ ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2);
+ assertTrue("Number of maps finished", myJob.finishedMaps() == 2);
+
+ JobClient jobClient = new JobClient(getMrCluster().createJobConf());
+ getMrCluster().stopJobTracker();
+
+ getMrCluster().getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
+ true);
+ getMrCluster().startJobTracker();
+
+ UtilsForTests.waitForJobTracker(jobClient);
+ ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 1);
+
+ controlledJob.finishNTasks(true, 2);
+ ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2);
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ tearDown();
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076933&r1=1076932&r2=1076933&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:23:38 2011
@@ -739,7 +739,7 @@ public class JobTracker implements MRCon
TaskID id = TaskID.forName(taskId);
TaskInProgress tip = getTip(id);
-
+
updateTip(tip, task);
}
@@ -752,7 +752,10 @@ public class JobTracker implements MRCon
// Check if the transaction for this attempt can be committed
String taskStatus = attempt.get(Keys.TASK_STATUS);
-
+ TaskAttemptID taskID = TaskAttemptID.forName(taskAttemptId);
+ JobInProgress jip = getJob(taskID.getJobID());
+ JobStatus prevStatus = (JobStatus)jip.getStatus().clone();
+
if (taskStatus.length() > 0) {
// This means this is an update event
if (taskStatus.equals(Values.SUCCESS.name())) {
@@ -766,6 +769,16 @@ public class JobTracker implements MRCon
} else {
createTaskAttempt(jip, id, attempt);
}
+
+ JobStatus newStatus = (JobStatus)jip.getStatus().clone();
+ if (prevStatus.getRunState() != newStatus.getRunState()) {
+ if(LOG.isDebugEnabled())
+ LOG.debug("Status changed hence informing prevStatus" + prevStatus + " currentStatus "+ newStatus);
+ JobStatusChangeEvent event =
+ new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED,
+ prevStatus, newStatus);
+ updateJobInProgressListeners(event);
+ }
}
public void handle(JobHistory.RecordTypes recType, Map<Keys,