You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:48:35 UTC
svn commit: r1077174 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
mapred/org/apache/hadoop/mapred/JobInProgress.java
mapred/org/apache/hadoop/mapred/JobTracker.java
test/org/apache/hadoop/mapred/FakeObjectUtilities.java
Author: omalley
Date: Fri Mar 4 03:48:35 2011
New Revision: 1077174
URL: http://svn.apache.org/viewvc?rev=1077174&view=rev
Log:
commit 0ddbf9677b0a793b6fcd1bda1f4458c1bef52427
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date: Thu Feb 18 12:15:13 2010 +0530
MAPREDUCE:686 from https://issues.apache.org/jira/secure/attachment/12436181/MAPREDUCE-686-y20.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-686. Move TestSpeculativeExecution.Fake* into a separate class
+ so that it can be used by other tests. (Jothi Padmanabhan via sharad)
+
Added:
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077174&r1=1077173&r2=1077174&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:48:35 2011
@@ -1915,13 +1915,13 @@ class JobInProgress {
/**
* Find a speculative task
* @param list a list of tips
- * @param taskTracker the tracker that has requested a tip
+ * @param ttStatus status of the tracker that has requested a tip
* @param avgProgress the average progress for speculation
* @param currentTime current time in milliseconds
* @param shouldRemove whether to remove the tips
* @return a tip that can be speculated on the tracker
*/
- private synchronized TaskInProgress findSpeculativeTask(
+ protected synchronized TaskInProgress findSpeculativeTask(
Collection<TaskInProgress> list, TaskTrackerStatus ttStatus,
double avgProgress, long currentTime, boolean shouldRemove) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077174&r1=1077173&r2=1077174&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:48:35 2011
@@ -1960,6 +1960,11 @@ public class JobTracker implements MRCon
this(conf, generateNewIdentifier());
}
+ JobTracker(JobConf conf, Clock clock)
+ throws IOException, InterruptedException {
+ this(conf, generateNewIdentifier(), clock);
+ }
+
public static final String JT_USER_NAME = "mapreduce.jobtracker.user.name";
public static final String JT_KEYTAB_FILE =
"mapreduce.jobtracker.keytab.file";
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=1077174&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/FakeObjectUtilities.java Fri Mar 4 03:48:35 2011
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapreduce.split.*;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+
+/**
+ * Utilities used in unit test.
+ *
+ */
+public class FakeObjectUtilities {
+
+ static final Log LOG = LogFactory.getLog(FakeObjectUtilities.class);
+
+ private static String jtIdentifier = "test";
+ private static int jobCounter;
+
+ /**
+ * A Fake JobTracker class for use in Unit Tests
+ */
+ static class FakeJobTracker extends JobTracker {
+
+ int totalSlots;
+ private String[] trackers;
+
+ FakeJobTracker(JobConf conf, Clock clock, String[] tts) throws IOException,
+ InterruptedException {
+ super(conf, clock);
+ this.trackers = tts;
+ //initialize max{Map/Reduce} task capacities to twice the clustersize
+ totalSlots = trackers.length * 4;
+ }
+ @Override
+ public ClusterStatus getClusterStatus(boolean detailed) {
+ return new ClusterStatus(trackers.length,
+ 0, 0, 0, 0, totalSlots/2, totalSlots/2, JobTracker.State.RUNNING, 0);
+ }
+
+ public void setNumSlots(int totalSlots) {
+ this.totalSlots = totalSlots;
+ }
+ }
+
+ static class FakeJobInProgress extends JobInProgress {
+ FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
+ super(new JobID(jtIdentifier, ++jobCounter), jobConf, tracker);
+ //initObjects(tracker, numMaps, numReduces);
+ }
+
+ @Override
+ public synchronized void initTasks() throws IOException {
+ maps = new TaskInProgress[numMapTasks];
+ for (int i = 0; i < numMapTasks; i++) {
+ maps[i] = new TaskInProgress(getJobID(), "test",
+ JobSplit.EMPTY_TASK_SPLIT, jobtracker, getJobConf(), this, i, 1);
+ nonLocalMaps.add(maps[i]);
+ }
+ reduces = new TaskInProgress[numReduceTasks];
+ for (int i = 0; i < numReduceTasks; i++) {
+ reduces[i] = new TaskInProgress(getJobID(), "test",
+ numMapTasks, i,
+ jobtracker, getJobConf(), this, 1);
+ nonRunningReduces.add(reduces[i]);
+ }
+ }
+
+ private TaskAttemptID findTask(String trackerName, String trackerHost,
+ Collection<TaskInProgress> nonRunningTasks,
+ Collection<TaskInProgress> runningTasks)
+ throws IOException {
+ TaskInProgress tip = null;
+ Iterator<TaskInProgress> iter = nonRunningTasks.iterator();
+ //look for a non-running task first
+ while (iter.hasNext()) {
+ TaskInProgress t = iter.next();
+ if (t.isRunnable() && !t.isRunning()) {
+ runningTasks.add(t);
+ iter.remove();
+ tip = t;
+ break;
+ }
+ }
+ if (tip == null) {
+ if (getJobConf().getSpeculativeExecution()) {
+ TaskTrackerStatus tts = jobtracker.getTaskTrackerStatus(trackerName);
+ tip = findSpeculativeTask(runningTasks, tts, status.mapProgress(),
+ jobtracker.getClock().getTime(), true);
+ }
+ }
+ if (tip != null) {
+ TaskAttemptID tId = tip.getTaskToRun(trackerName).getTaskID();
+ if (tip.isMapTask()) {
+ scheduleMap(tip);
+ } else {
+ scheduleReduce(tip);
+ }
+ //Set it to RUNNING
+ makeRunning(tId, tip, trackerName);
+ return tId;
+ }
+ return null;
+ }
+
+ public TaskAttemptID findMapTask(String trackerName)
+ throws IOException {
+ return findTask(trackerName,
+ JobInProgress.convertTrackerNameToHostName(trackerName),
+ nonLocalMaps, nonLocalRunningMaps);
+ }
+
+ public TaskAttemptID findReduceTask(String trackerName)
+ throws IOException {
+ return findTask(trackerName,
+ JobInProgress.convertTrackerNameToHostName(trackerName),
+ nonRunningReduces, runningReduces);
+ }
+
+ public void finishTask(TaskAttemptID taskId) {
+ TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
+ TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId,
+ 1.0f, 1, TaskStatus.State.SUCCEEDED, "", "",
+ tip.machineWhereTaskRan(taskId),
+ tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+ updateTaskStatus(tip, status);
+ }
+
+ private void makeRunning(TaskAttemptID taskId, TaskInProgress tip,
+ String taskTracker) {
+ addRunningTaskToTIP(tip, taskId, new TaskTrackerStatus(taskTracker,
+ JobInProgress.convertTrackerNameToHostName(taskTracker)), true);
+ TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId,
+ 0.0f, 1, TaskStatus.State.RUNNING, "", "", taskTracker,
+ tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+ updateTaskStatus(tip, status);
+ }
+
+ public void progressMade(TaskAttemptID taskId, float progress) {
+ TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
+ TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId,
+ progress, 1, TaskStatus.State.RUNNING, "", "",
+ tip.machineWhereTaskRan(taskId),
+ tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+ updateTaskStatus(tip, status);
+ }
+ }
+
+ static short sendHeartBeat(JobTracker jt, TaskTrackerStatus status,
+ boolean initialContact,
+ String tracker, short responseId)
+ throws IOException {
+ if (status == null) {
+ status = new TaskTrackerStatus(tracker,
+ JobInProgress.convertTrackerNameToHostName(tracker));
+
+ }
+ jt.heartbeat(status, false, initialContact, false, responseId);
+ return ++responseId ;
+ }
+
+ static void establishFirstContact(JobTracker jt, String tracker)
+ throws IOException {
+ sendHeartBeat(jt, null, true, tracker, (short) 0);
+ }
+
+}