You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:48:35 UTC

svn commit: r1077174 - in /hadoop/common/branches/branch-0.20-security-patches/src: mapred/org/apache/hadoop/mapred/JobInProgress.java mapred/org/apache/hadoop/mapred/JobTracker.java test/org/apache/hadoop/mapred/FakeObjectUtilities.java

Author: omalley
Date: Fri Mar  4 03:48:35 2011
New Revision: 1077174

URL: http://svn.apache.org/viewvc?rev=1077174&view=rev
Log:
commit 0ddbf9677b0a793b6fcd1bda1f4458c1bef52427
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date:   Thu Feb 18 12:15:13 2010 +0530

    MAPREDUCE:686 from https://issues.apache.org/jira/secure/attachment/12436181/MAPREDUCE-686-y20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-686. Move TestSpeculativeExecution.Fake* into a separate class
    +    so that it can be used by other tests. (Jothi Padmanabhan via sharad)
    +

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077174&r1=1077173&r2=1077174&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:48:35 2011
@@ -1915,13 +1915,13 @@ class JobInProgress {
   /**
    * Find a speculative task
    * @param list a list of tips
-   * @param taskTracker the tracker that has requested a tip
+   * @param ttStatus status of the tracker that has requested a tip
    * @param avgProgress the average progress for speculation
    * @param currentTime current time in milliseconds
    * @param shouldRemove whether to remove the tips
    * @return a tip that can be speculated on the tracker
    */
-  private synchronized TaskInProgress findSpeculativeTask(
+  protected synchronized TaskInProgress findSpeculativeTask(
       Collection<TaskInProgress> list, TaskTrackerStatus ttStatus,
       double avgProgress, long currentTime, boolean shouldRemove) {
     

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077174&r1=1077173&r2=1077174&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:48:35 2011
@@ -1960,6 +1960,11 @@ public class JobTracker implements MRCon
     this(conf, generateNewIdentifier());
   }
 
+  JobTracker(JobConf conf, Clock clock) 
+  throws IOException, InterruptedException {
+    this(conf, generateNewIdentifier(), clock);
+  }
+  
   public static final String JT_USER_NAME = "mapreduce.jobtracker.user.name";
   public static final String JT_KEYTAB_FILE =
     "mapreduce.jobtracker.keytab.file";

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=1077174&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java Fri Mar  4 03:48:35 2011
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapreduce.split.*;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+
+/** 
+ * Utilities used in unit test.
+ *  
+ */
+public class FakeObjectUtilities {
+
+  static final Log LOG = LogFactory.getLog(FakeObjectUtilities.class);
+
+  private static String jtIdentifier = "test";
+  private static int jobCounter;
+  
+  /**
+   * A Fake JobTracker class for use in Unit Tests
+   */
+  static class FakeJobTracker extends JobTracker {
+    
+    int totalSlots;
+    private String[] trackers;
+
+    FakeJobTracker(JobConf conf, Clock clock, String[] tts) throws IOException, 
+    InterruptedException {
+      super(conf, clock);
+      this.trackers = tts;
+      //initialize max{Map/Reduce} task capacities to twice the clustersize
+      totalSlots = trackers.length * 4;
+    }
+    @Override
+    public ClusterStatus getClusterStatus(boolean detailed) {
+      return new ClusterStatus(trackers.length,
+          0, 0, 0, 0, totalSlots/2, totalSlots/2, JobTracker.State.RUNNING, 0);
+    }
+
+    public void setNumSlots(int totalSlots) {
+      this.totalSlots = totalSlots;
+    }
+  }
+
+  static class FakeJobInProgress extends JobInProgress {
+    FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
+      super(new JobID(jtIdentifier, ++jobCounter), jobConf, tracker);
+      //initObjects(tracker, numMaps, numReduces);
+    }
+
+    @Override
+    public synchronized void initTasks() throws IOException {
+      maps = new TaskInProgress[numMapTasks];
+      for (int i = 0; i < numMapTasks; i++) {
+        maps[i] = new TaskInProgress(getJobID(), "test", 
+            JobSplit.EMPTY_TASK_SPLIT, jobtracker, getJobConf(), this, i, 1);
+        nonLocalMaps.add(maps[i]);
+      }
+      reduces = new TaskInProgress[numReduceTasks];
+      for (int i = 0; i < numReduceTasks; i++) {
+        reduces[i] = new TaskInProgress(getJobID(), "test", 
+                                        numMapTasks, i, 
+                                        jobtracker, getJobConf(), this, 1);
+        nonRunningReduces.add(reduces[i]);
+      }
+    }
+    
+    private TaskAttemptID findTask(String trackerName, String trackerHost,
+        Collection<TaskInProgress> nonRunningTasks, 
+        Collection<TaskInProgress> runningTasks)
+    throws IOException {
+      TaskInProgress tip = null;
+      Iterator<TaskInProgress> iter = nonRunningTasks.iterator();
+      //look for a non-running task first
+      while (iter.hasNext()) {
+        TaskInProgress t = iter.next();
+        if (t.isRunnable() && !t.isRunning()) {
+          runningTasks.add(t);
+          iter.remove();
+          tip = t;
+          break;
+        }
+      }
+      if (tip == null) {
+        if (getJobConf().getSpeculativeExecution()) {
+          TaskTrackerStatus tts = jobtracker.getTaskTrackerStatus(trackerName);
+          tip = findSpeculativeTask(runningTasks, tts, status.mapProgress(), 
+                                    jobtracker.getClock().getTime(), true);
+        }
+      }
+      if (tip != null) {
+        TaskAttemptID tId = tip.getTaskToRun(trackerName).getTaskID();
+        if (tip.isMapTask()) {
+          scheduleMap(tip);
+        } else {
+          scheduleReduce(tip);
+        }
+        //Set it to RUNNING
+        makeRunning(tId, tip, trackerName);
+        return tId;
+      }
+      return null;
+    }
+
+    public TaskAttemptID findMapTask(String trackerName)
+    throws IOException {
+      return findTask(trackerName, 
+          JobInProgress.convertTrackerNameToHostName(trackerName),
+          nonLocalMaps, nonLocalRunningMaps);
+    }
+
+    public TaskAttemptID findReduceTask(String trackerName) 
+    throws IOException {
+      return findTask(trackerName, 
+          JobInProgress.convertTrackerNameToHostName(trackerName),
+          nonRunningReduces, runningReduces);
+    }
+
+    public void finishTask(TaskAttemptID taskId) {
+      TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
+          1.0f, 1, TaskStatus.State.SUCCEEDED, "", "", 
+          tip.machineWhereTaskRan(taskId), 
+          tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+    }
+  
+    private void makeRunning(TaskAttemptID taskId, TaskInProgress tip, 
+        String taskTracker) {
+      addRunningTaskToTIP(tip, taskId, new TaskTrackerStatus(taskTracker,
+          JobInProgress.convertTrackerNameToHostName(taskTracker)), true);
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
+          0.0f, 1, TaskStatus.State.RUNNING, "", "", taskTracker,
+          tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+    }
+
+    public void progressMade(TaskAttemptID taskId, float progress) {
+      TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
+          progress, 1, TaskStatus.State.RUNNING, "", "", 
+          tip.machineWhereTaskRan(taskId), 
+          tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+    }
+  }
+  
+  static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status, 
+                                             boolean initialContact, 
+                                             String tracker, short responseId) 
+    throws IOException {
+    if (status == null) {
+      status = new TaskTrackerStatus(tracker, 
+          JobInProgress.convertTrackerNameToHostName(tracker));
+
+    }
+      jt.heartbeat(status, false, initialContact, false, responseId);
+      return ++responseId ;
+  }
+  
+  static void establishFirstContact(JobTracker jt, String tracker) 
+    throws IOException {
+    sendHeartBeat(jt, null, true, tracker, (short) 0);
+  }
+
+}