You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:23:39 UTC

svn commit: r1076933 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/

Author: omalley
Date: Fri Mar  4 03:23:38 2011
New Revision: 1076933

URL: http://svn.apache.org/viewvc?rev=1076933&view=rev
Log:
commit b3b11c4936a501eda5991fbfa4498fc1f336cf9d
Author: Lee Tucker <lt...@yahoo-inc.com>
Date:   Thu Jul 30 17:40:17 2009 -0700

    Applying patch 2709938.5739.patch

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java?rev=1076933&r1=1076932&r2=1076933&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ClusterWithCapacityScheduler.java Fri Mar  4 03:23:38 2011
@@ -197,6 +197,13 @@ public class ClusterWithCapacitySchedule
     }
   }
 
+  /**
+   * @return the mrCluster
+   */
+  public MiniMRCluster getMrCluster() {
+    return mrCluster;
+  }
+
   static class MyClassLoader extends ClassLoader {
     @Override
     public URL getResource(String name) {

Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java?rev=1076933&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestJobTrackerRestartWithCS.java Fri Mar  4 03:23:38 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.Properties;
+import org.apache.hadoop.mapred.ControlledMapReduceJob.ControlledMapReduceJobRunner;
+
+
+public class TestJobTrackerRestartWithCS extends ClusterWithCapacityScheduler {
+
+  /**
+   * Test single queue.
+   *
+   * <p>
+   *
+   * Submit a job with more M/R tasks than total capacity. Full queue capacity
+   * should be utilized and remaining M/R tasks should wait for slots to be
+   * available.
+   *
+   * @throws Exception
+   */
+  public void testJobTrackerRestartWithCS()
+          throws Exception {
+    try {
+      Properties schedulerProps = new Properties();
+      schedulerProps.put(
+              "mapred.capacity-scheduler.queue.default.guaranteed-capacity", "100");
+      Properties clusterProps = new Properties();
+      clusterProps.put("mapred.tasktracker.map.tasks.maximum", String.valueOf(2));
+      clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String.valueOf(0));
+
+      // cluster capacity 2 maps, 0 reduces
+      startCluster(1, clusterProps, schedulerProps);
+
+      ControlledMapReduceJobRunner jobRunner =
+              ControlledMapReduceJobRunner.getControlledMapReduceJobRunner(
+              getJobConf(), 4, 0);
+      jobRunner.start();
+      ControlledMapReduceJob controlledJob = jobRunner.getJob();
+      JobID myJobID = jobRunner.getJobID();
+      JobInProgress myJob = getJobTracker().getJob(myJobID);
+      ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 2);
+
+      LOG.info("Trying to finish 2 maps");
+      controlledJob.finishNTasks(true, 2);
+      ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2);
+      assertTrue("Number of maps finished", myJob.finishedMaps() == 2);
+
+      JobClient jobClient = new JobClient(getMrCluster().createJobConf());
+      getMrCluster().stopJobTracker();
+
+      getMrCluster().getJobTrackerConf().setBoolean("mapred.jobtracker.restart.recover",
+              true);
+      getMrCluster().startJobTracker();
+
+      UtilsForTests.waitForJobTracker(jobClient);
+      ControlledMapReduceJob.waitTillNTasksStartRunning(myJob, true, 1);
+
+      controlledJob.finishNTasks(true, 2);
+      ControlledMapReduceJob.waitTillNTotalTasksFinish(myJob, true, 2);
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      tearDown();
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076933&r1=1076932&r2=1076933&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:23:38 2011
@@ -739,7 +739,7 @@ public class JobTracker implements MRCon
         
         TaskID id = TaskID.forName(taskId);
         TaskInProgress tip = getTip(id);
-        
+
         updateTip(tip, task);
       }
 
@@ -752,7 +752,10 @@ public class JobTracker implements MRCon
         
         // Check if the transaction for this attempt can be committed
         String taskStatus = attempt.get(Keys.TASK_STATUS);
-        
+        TaskAttemptID taskID = TaskAttemptID.forName(taskAttemptId);
+        JobInProgress jip = getJob(taskID.getJobID());
+        JobStatus prevStatus = (JobStatus)jip.getStatus().clone();
+
         if (taskStatus.length() > 0) {
           // This means this is an update event
           if (taskStatus.equals(Values.SUCCESS.name())) {
@@ -766,6 +769,16 @@ public class JobTracker implements MRCon
         } else {
           createTaskAttempt(jip, id, attempt);
         }
+        
+        JobStatus newStatus = (JobStatus)jip.getStatus().clone();
+        if (prevStatus.getRunState() != newStatus.getRunState()) {
+          if(LOG.isDebugEnabled())
+            LOG.debug("Status changed hence informing prevStatus" +  prevStatus + " currentStatus "+ newStatus);
+          JobStatusChangeEvent event =
+            new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED,
+                                     prevStatus, newStatus);
+          updateJobInProgressListeners(event);
+        }
       }
 
       public void handle(JobHistory.RecordTypes recType, Map<Keys,