You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/07 21:38:18 UTC
svn commit: r483651 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Thu Dec 7 12:38:17 2006
New Revision: 483651
URL: http://svn.apache.org/viewvc?view=rev&rev=483651
Log:
HADOOP-639. Restructure InterTrackerProtocol to make task accounting more reliable.
Added:
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=483651&r1=483650&r2=483651
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Dec 7 12:38:17 2006
@@ -30,6 +30,9 @@
8. HADOOP-676. Improved exceptions and error messages for common job
input specification errors. (Sanjay Dahiya via cutting)
+ 9. HADOOP-639. Restructure InterTrackerProtocol to make task
+ accounting more reliable. (Arun C Murthy via cutting)
+
Release 0.9.1 - 2006-12-06
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/HeartbeatResponse.java Thu Dec 7 12:38:17 2006
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * The response sent by the {@link JobTracker} to the hearbeat sent
+ * periodically by the {@link TaskTracker}
+ *
+ * @author Arun C Murthy
+ */
+class HeartbeatResponse implements Writable, Configurable {
+ Configuration conf = null;
+ short responseId;
+ TaskTrackerAction[] actions;
+
+ HeartbeatResponse() {}
+
+ HeartbeatResponse(short responseId, TaskTrackerAction[] actions) {
+ this.responseId = responseId;
+ this.actions = actions;
+ }
+
+ public void setResponseId(short responseId) {
+ this.responseId = responseId;
+ }
+
+ public short getResponseId() {
+ return responseId;
+ }
+
+ public void setActions(TaskTrackerAction[] actions) {
+ this.actions = actions;
+ }
+
+ public TaskTrackerAction[] getActions() {
+ return actions;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeShort(responseId);
+ if (actions == null) {
+ WritableUtils.writeVInt(out, 0);
+ } else {
+ WritableUtils.writeVInt(out, actions.length);
+ for (TaskTrackerAction action : actions) {
+ WritableUtils.writeEnum(out, action.getActionId());
+ action.write(out);
+ }
+ }
+ //ObjectWritable.writeObject(out, actions, actions.getClass(), conf);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.responseId = in.readShort();
+ int length = WritableUtils.readVInt(in);
+ if (length > 0) {
+ actions = new TaskTrackerAction[length];
+ for (int i=0; i < length; ++i) {
+ TaskTrackerAction.ActionType actionType =
+ WritableUtils.readEnum(in, TaskTrackerAction.ActionType.class);
+ actions[i] = TaskTrackerAction.createAction(actionType);
+ actions[i].readFields(in);
+ }
+ } else {
+ actions = null;
+ }
+ //actions = (TaskTrackerAction[]) ObjectWritable.readObject(in, conf);
+ }
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?view=diff&rev=483651&r1=483650&r2=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Thu Dec 7 12:38:17 2006
@@ -27,31 +27,40 @@
* The JobTracker is the Server, which implements this protocol.
*/
interface InterTrackerProtocol extends VersionedProtocol {
- // version 2 introduced to replace TaskStatus.State with an enum
- public static final long versionID = 2L;
+ /**
+ * version 3 introduced to replace
+ * emitHearbeat/pollForNewTask/pollForTaskWithClosedJob with
+ * {@link #heartbeat(TaskTrackerStatus, boolean, boolean, short)}
+ */
+ public static final long versionID = 3L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
- /**
- * Called regularly by the task tracker to update the status of its tasks
- * within the job tracker. JobTracker responds with a code that tells the
- * TaskTracker whether all is well.
- *
- * TaskTracker must also indicate whether this is the first interaction
- * (since state refresh)
+ /**
+ * Called regularly by the {@link TaskTracker} to update the status of its
+ * tasks within the job tracker. {@link JobTracker} responds with a
+ * {@link HeartbeatResponse} that directs the
+ * {@link TaskTracker} to undertake a series of 'actions'
+ * (see {@link org.apache.hadoop.mapred.TaskTrackerAction.ActionType}).
+ *
+ * {@link TaskTracker} must also indicate whether this is the first
+ * interaction (since state refresh) and acknowledge the last response
+ * it recieved from the {@link JobTracker}
+ *
+ * @param status the status update
+ * @param initialContact <code>true</code> if this is first interaction since
+ * 'refresh', <code>false</code> otherwise.
+ * @param acceptNewTasks <code>true</code> if the {@link TaskTracker} is
+ * ready to accept new tasks to run.
+ * @param responseId the last responseId successfully acted upon by the
+ * {@link TaskTracker}.
+ * @return a {@link org.apache.hadoop.mapred.HeartbeatResponse} with
+ * fresh instructions.
*/
- int emitHeartbeat(TaskTrackerStatus status,
- boolean initialContact) throws IOException;
-
- /** Called to get new tasks from from the job tracker for this tracker.*/
- Task pollForNewTask(String trackerName) throws IOException;
-
- /** Called to find which tasks that have been run by this tracker should now
- * be closed because their job is complete. This is used to, e.g.,
- * notify a map task that its output is no longer needed and may
- * be removed. */
- String[] pollForTaskWithClosedJob(String trackerName) throws IOException;
+ HeartbeatResponse heartbeat(TaskTrackerStatus status,
+ boolean initialContact, boolean acceptNewTasks, short responseId)
+ throws IOException;
/** Called by a reduce task to find which map tasks are completed.
*
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=483651&r1=483650&r2=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Dec 7 12:38:17 2006
@@ -427,6 +427,9 @@
// (trackerID->TreeSet of taskids running at that tracker)
TreeMap trackerToTaskMap = new TreeMap();
+ // (trackerID --> last sent HeartBeatResponseID)
+ Map<String, Short> trackerToHeartbeatResponseIDMap = new TreeMap();
+
//
// Watch and expire TaskTracker objects using these structures.
// We can map from Name->TaskTrackerStatus, or we can expire by time.
@@ -723,6 +726,74 @@
////////////////////////////////////////////////////
/**
+ * The periodic heartbeat mechanism between the {@link TaskTracker} and
+ * the {@link JobTracker}.
+ *
+ * The {@link JobTracker} processes the status information sent by the
+ * {@link TaskTracker} and responds with instructions to start/stop
+ * tasks or jobs, and also 'reset' instructions during contingencies.
+ */
+ public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
+ boolean initialContact, boolean acceptNewTasks, short responseId)
+ throws IOException {
+ LOG.debug("Got heartbeat from: " + status.getTrackerName() +
+ " (initialContact: " + initialContact +
+ " acceptNewTasks: " + acceptNewTasks + ")" +
+ " with responseId: " + responseId);
+
+ // First check if the last heartbeat response got through
+ String trackerName = status.getTrackerName();
+ Short oldResponseId = trackerToHeartbeatResponseIDMap.get(trackerName);
+
+ short newResponseId = (short)(responseId + 1);
+ if (!initialContact && oldResponseId != null &&
+ oldResponseId.shortValue() != responseId) {
+ newResponseId = oldResponseId.shortValue();
+ }
+
+ // Process this heartbeat
+ if (!processHeartbeat(status, initialContact,
+ (newResponseId != responseId))) {
+ if (oldResponseId != null) {
+ trackerToHeartbeatResponseIDMap.remove(trackerName);
+ }
+
+ return new HeartbeatResponse(newResponseId,
+ new TaskTrackerAction[] {new ReinitTrackerAction()});
+ }
+
+ // Initialize the response to be sent for the heartbeat
+ HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
+ List<TaskTrackerAction> actions = new ArrayList();
+
+ // Check for new tasks to be executed on the tasktracker
+ if (acceptNewTasks) {
+ Task task = getNewTaskForTaskTracker(trackerName);
+ if (task != null) {
+ LOG.debug(trackerName + " -> LaunchTask: " + task.getTaskId());
+ actions.add(new LaunchTaskAction(task));
+ }
+ }
+
+ // Check for tasks to be killed
+ List<TaskTrackerAction> killTasksList = getTasksToKill(trackerName);
+ if (killTasksList != null) {
+ actions.addAll(killTasksList);
+ }
+
+ response.setActions(
+ actions.toArray(new TaskTrackerAction[actions.size()]));
+
+ // Update the trackerToHeartbeatResponseIDMap
+ if (newResponseId != responseId) {
+ trackerToHeartbeatResponseIDMap.put(trackerName,
+ new Short(newResponseId));
+ }
+
+ return response;
+ }
+
+ /**
* Update the last recorded status for the given task tracker.
* It assumes that the taskTrackers are locked on entry.
* @author Owen O'Malley
@@ -752,16 +823,21 @@
/**
* Process incoming heartbeat messages from the task trackers.
*/
- public synchronized int emitHeartbeat(TaskTrackerStatus trackerStatus, boolean initialContact) {
+ private synchronized boolean processHeartbeat(
+ TaskTrackerStatus trackerStatus,
+ boolean initialContact, boolean updateStatusTimestamp) {
String trackerName = trackerStatus.getTrackerName();
- trackerStatus.setLastSeen(System.currentTimeMillis());
+ if (initialContact || updateStatusTimestamp) {
+ trackerStatus.setLastSeen(System.currentTimeMillis());
+ }
synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
boolean seenBefore = updateTaskTrackerStatus(trackerName,
trackerStatus);
if (initialContact) {
- // If it's first contact, then clear out any state hanging around
+ // If it's first contact, then clear out
+ // any state hanging around
if (seenBefore) {
lostTaskTracker(trackerName, trackerStatus.getHost());
}
@@ -770,7 +846,7 @@
if (!seenBefore) {
LOG.warn("Status from unknown Tracker : " + trackerName);
taskTrackers.remove(trackerName);
- return InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+ return false;
}
}
@@ -782,18 +858,17 @@
updateTaskStatuses(trackerStatus);
//LOG.info("Got heartbeat from "+trackerName);
- return InterTrackerProtocol.TRACKERS_OK;
+ return true;
}
/**
- * A tracker wants to know if there's a Task to run. Returns
- * a task we'd like the TaskTracker to execute right now.
+ * Returns a task we'd like the TaskTracker to execute right now.
*
* Eventually this function should compute load on the various TaskTrackers,
* and incorporate knowledge of DFS file placement. But for right now, it
* just grabs a single item out of the pending task list and hands it back.
*/
- public synchronized Task pollForNewTask(String taskTracker) {
+ private synchronized Task getNewTaskForTaskTracker(String taskTracker) {
//
// Compute average map and reduce task numbers across pool
//
@@ -936,23 +1011,36 @@
* A tracker wants to know if any of its Tasks have been
* closed (because the job completed, whether successfully or not)
*/
- public synchronized String[] pollForTaskWithClosedJob(String taskTracker) {
- TreeSet taskIds = (TreeSet) trackerToTaskMap.get(taskTracker);
+ private synchronized List getTasksToKill(String taskTracker) {
+ Set<String> taskIds = (TreeSet) trackerToTaskMap.get(taskTracker);
if (taskIds != null) {
- ArrayList list = new ArrayList();
- for (Iterator it = taskIds.iterator(); it.hasNext(); ) {
- String taskId = (String) it.next();
- TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
- if (tip.shouldCloseForClosedJob(taskId)) {
+ List<TaskTrackerAction> killList = new ArrayList();
+ Set<String> killJobIds = new TreeSet();
+ for (String killTaskId : taskIds ) {
+ TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(killTaskId);
+ if (tip.shouldCloseForClosedJob(killTaskId)) {
//
// This is how the JobTracker ends a task at the TaskTracker.
// It may be successfully completed, or may be killed in
// mid-execution.
//
- list.add(taskId);
+ if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING) {
+ killList.add(new KillTaskAction(killTaskId));
+ LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
+ } else {
+ //killTasksList.add(new KillJobAction(taskId));
+ String killJobId = tip.getJob().getStatus().getJobId();
+ killJobIds.add(killJobId);
+ }
}
}
- return (String[]) list.toArray(new String[list.size()]);
+
+ for (String killJobId : killJobIds) {
+ killList.add(new KillJobAction(killJobId));
+ LOG.debug(taskTracker + " -> KillJobAction: " + killJobId);
+ }
+
+ return killList;
}
return null;
}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java Thu Dec 7 12:38:17 2006
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents a directive from the {@link org.apache.hadoop.mapred.JobTracker}
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to kill the task of
+ * a job and cleanup resources.
+ *
+ * @author Arun C Murthy
+ */
+class KillJobAction extends TaskTrackerAction {
+ String jobId;
+
+ public KillJobAction() {
+ super(ActionType.KILL_JOB);
+ }
+
+ public KillJobAction(String taskId) {
+ super(ActionType.KILL_JOB);
+ this.jobId = taskId;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, jobId);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ jobId = Text.readString(in);
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java Thu Dec 7 12:38:17 2006
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+
+/**
+ * Represents a directive from the {@link org.apache.hadoop.mapred.JobTracker}
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to kill a task.
+ *
+ * @author Arun C Murthy
+ */
+class KillTaskAction extends TaskTrackerAction {
+ String taskId;
+
+ public KillTaskAction() {
+ super(ActionType.KILL_TASK);
+ }
+
+ public KillTaskAction(String taskId) {
+ super(ActionType.KILL_TASK);
+ this.taskId = taskId;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, taskId);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ taskId = Text.readString(in);
+ }
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LaunchTaskAction.java Thu Dec 7 12:38:17 2006
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hadoop.mapred.JobTracker}
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to launch a new task.
+ *
+ * @author Arun C Murthy
+ */
+class LaunchTaskAction extends TaskTrackerAction {
+ private Task task;
+
+ public LaunchTaskAction() {
+ super(ActionType.LAUNCH_TASK);
+ }
+
+ public LaunchTaskAction(Task task) {
+ super(ActionType.LAUNCH_TASK);
+ this.task = task;
+ }
+
+ public Task getTask() {
+ return task;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(task.isMapTask());
+ task.write(out);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ boolean isMapTask = in.readBoolean();
+ if (isMapTask) {
+ task = new MapTask();
+ } else {
+ task = new ReduceTask();
+ }
+ task.readFields(in);
+ }
+
+}
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReinitTrackerAction.java Thu Dec 7 12:38:17 2006
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * Represents a directive from the {@link org.apache.hadoop.mapred.JobTracker}
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to reinitialize itself.
+ *
+ * @author Arun C Murthy
+ */
+class ReinitTrackerAction extends TaskTrackerAction {
+
+ public ReinitTrackerAction() {
+ super(ActionType.REINIT_TRACKER);
+ }
+
+ public void write(DataOutput out) throws IOException {}
+
+ public void readFields(DataInput in) throws IOException {}
+
+}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=483651&r1=483650&r2=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Thu Dec 7 12:38:17 2006
@@ -228,7 +228,12 @@
(job.getStatus().getRunState() != JobStatus.RUNNING)) {
tasksReportedClosed.add(taskid);
return true;
- } else {
+ } else if( !isMapTask() && isComplete() &&
+ ! tasksReportedClosed.contains(taskid) ){
+ tasksReportedClosed.add(taskid);
+ return true;
+ }
+ else {
return false;
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=483651&r1=483650&r2=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu Dec 7 12:38:17 2006
@@ -68,6 +68,9 @@
Server taskReportServer = null;
InterTrackerProtocol jobClient;
+
+ // last heartbeat response recieved
+ short heartbeatResponseId = -1;
StatusHttpServer server = null;
@@ -187,7 +190,7 @@
}
}
}
-
+
static String getCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
}
@@ -458,15 +461,23 @@
}
}
- if (!transmitHeartBeat()) {
+ // Send the heartbeat and process the jobtracker's directives
+ HeartbeatResponse heartbeatResponse = transmitHeartBeat();
+ TaskTrackerAction[] actions = heartbeatResponse.getActions();
+ LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +
+ heartbeatResponse.getResponseId() + " and " +
+ ((actions != null) ? actions.length : 0) + " actions");
+
+ if (reinitTaskTracker(actions)) {
return State.STALE;
}
+
lastHeartbeat = now;
justStarted = false;
- checkForNewTasks();
+ checkAndStartNewTasks(actions);
markUnresponsiveTasks();
- closeCompletedTasks();
+ closeCompletedTasks(actions);
killOverflowingTasks();
//we've cleaned up, resume normal operation
@@ -498,56 +509,94 @@
* @return false if the tracker was unknown
* @throws IOException
*/
- private boolean transmitHeartBeat() throws IOException {
+ private HeartbeatResponse transmitHeartBeat() throws IOException {
//
// Build the heartbeat information for the JobTracker
//
- List<TaskStatus> taskReports = new ArrayList(runningTasks.size());
+ List<TaskStatus> taskReports =
+ new ArrayList<TaskStatus>(runningTasks.size());
synchronized (this) {
- for (TaskInProgress tip: runningTasks.values()) {
- taskReports.add(tip.createStatus());
- }
+ for (TaskInProgress tip: runningTasks.values()) {
+ taskReports.add(tip.createStatus());
+ }
}
TaskTrackerStatus status =
new TaskTrackerStatus(taskTrackerName, localHostname,
- httpPort, taskReports,
- failures);
-
+ httpPort, taskReports,
+ failures);
+
+ //
+ // Check if we should ask for a new Task
+ //
+ boolean askForNewTask = false;
+ if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
+ acceptNewTasks) {
+ checkLocalDirs(fConf.getLocalDirs());
+
+ if (enoughFreeSpace(minSpaceStart)) {
+ askForNewTask = true;
+ }
+ }
+
//
// Xmit the heartbeat
//
+ HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
+ justStarted, askForNewTask,
+ heartbeatResponseId);
+ heartbeatResponseId = heartbeatResponse.getResponseId();
- int resultCode = jobClient.emitHeartbeat(status, justStarted);
synchronized (this) {
- for (TaskStatus taskStatus: taskReports) {
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
- if (taskStatus.getIsMap()) {
- mapTotal--;
- } else {
- reduceTotal--;
- }
- myMetrics.completeTask();
- runningTasks.remove(taskStatus.getTaskId());
+ for (TaskStatus taskStatus : taskReports) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ if (taskStatus.getIsMap()) {
+ mapTotal--;
+ } else {
+ reduceTotal--;
}
+ myMetrics.completeTask();
+ runningTasks.remove(taskStatus.getTaskId());
+ }
}
}
- return resultCode != InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+ return heartbeatResponse;
}
/**
+ * Check if the jobtracker directed a 'reset' of the tasktracker.
+ *
+ * @param actions the directives of the jobtracker for the tasktracker.
+ * @return <code>true</code> if tasktracker is to be reset,
+ * <code>false</code> otherwise.
+ */
+ private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
+ if (actions != null) {
+ for (TaskTrackerAction action : actions) {
+ if (action.getActionId() ==
+ TaskTrackerAction.ActionType.REINIT_TRACKER) {
+ LOG.info("Recieved RenitTrackerAction from JobTracker");
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
* Check to see if there are any new tasks that we should run.
* @throws IOException
*/
- private void checkForNewTasks() throws IOException {
- //
- // Check if we should ask for a new Task
- //
- if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
- acceptNewTasks) {
- checkLocalDirs(fConf.getLocalDirs());
-
- if (enoughFreeSpace(minSpaceStart)) {
- Task t = jobClient.pollForNewTask(taskTrackerName);
+ private void checkAndStartNewTasks(TaskTrackerAction[] actions)
+ throws IOException {
+ if (actions == null) {
+ return;
+ }
+
+ for (TaskTrackerAction action : actions) {
+ if (action.getActionId() ==
+ TaskTrackerAction.ActionType.LAUNCH_TASK) {
+ Task t = ((LaunchTaskAction)(action)).getTask();
+ LOG.info("LaunchTaskAction: " + t.getTaskId());
if (t != null) {
startNewTask(t);
}
@@ -580,24 +629,73 @@
* Ask the JobTracker if there are any tasks that we should clean up,
* either because we don't need them any more or because the job is done.
*/
- private void closeCompletedTasks() throws IOException {
- String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
- if (toCloseIds != null) {
- synchronized (this) {
- for (int i = 0; i < toCloseIds.length; i++) {
- TaskInProgress tip = tasks.get(toCloseIds[i]);
- if (tip != null) {
- // remove the task from running jobs, removing the job if
- // it is the last task
- removeTaskFromJob(tip.getTask().getJobId(), tip);
- tasksToCleanup.put(tip);
+ private void closeCompletedTasks(TaskTrackerAction[] actions)
+ throws IOException {
+ if (actions == null) {
+ return;
+ }
+
+ for (TaskTrackerAction action : actions) {
+ TaskTrackerAction.ActionType actionType = action.getActionId();
+
+ if (actionType == TaskTrackerAction.ActionType.KILL_JOB) {
+ String jobId = ((KillJobAction)action).getJobId();
+ LOG.info("Received 'KillJobAction' for job: " + jobId);
+ synchronized (runningJobs) {
+ RunningJob rjob = runningJobs.get(jobId);
+ if (rjob == null) {
+ LOG.warn("Unknown job " + jobId + " being deleted.");
} else {
- LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
+ synchronized (rjob) {
+ int noJobTasks = rjob.tasks.size();
+ int taskCtr = 0;
+
+ // Add this tips of this job to queue of tasks to be purged
+ for (TaskInProgress tip : rjob.tasks) {
+ // Purge the job files for the last element in rjob.tasks
+ if (++taskCtr == noJobTasks) {
+ tip.setPurgeJobFiles(true);
+ }
+
+ tasksToCleanup.put(tip);
+ }
+
+ // Remove this job
+ rjob.tasks.clear();
+ runningJobs.remove(jobId);
+ }
}
}
+ } else if(actionType == TaskTrackerAction.ActionType.KILL_TASK) {
+ String taskId = ((KillTaskAction)action).getTaskId();
+ LOG.info("Received KillTaskAction for task: " + taskId);
+ purgeTask(tasks.get(taskId), false);
}
}
}
+
+ /**
+ * Remove the tip and update all relevant state.
+ *
+ * @param tip {@link TaskInProgress} to be removed.
+ * @param purgeJobFiles <code>true</code> if the job files are to be
+ * purged, <code>false</code> otherwise.
+ */
+ private void purgeTask(TaskInProgress tip, boolean purgeJobFiles) {
+ if (tip != null) {
+ LOG.info("About to purge task: " + tip.getTask().getTaskId());
+
+ // Cleanup the job files?
+ tip.setPurgeJobFiles(purgeJobFiles);
+
+ // Remove the task from running jobs,
+ // removing the job if it's the last task
+ removeTaskFromJob(tip.getTask().getJobId(), tip);
+
+ // Add this tip to queue of tasks to be purged
+ tasksToCleanup.put(tip);
+ }
+ }
/** Check if we're dangerously low on disk space
* If so, kill jobs to free up space and make sure
@@ -829,6 +927,9 @@
private boolean alwaysKeepTaskFiles;
private TaskStatus taskStatus ;
private boolean keepJobFiles;
+
+ /** Cleanup the job files when the job is complete (done/failed) */
+ private boolean purgeJobFiles = false;
/**
*/
@@ -893,6 +994,10 @@
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
}
+ public void setPurgeJobFiles(boolean purgeJobFiles) {
+ this.purgeJobFiles = purgeJobFiles;
+ }
+
/**
*/
public synchronized TaskStatus createStatus() {
@@ -1046,10 +1151,12 @@
LOG.warn("Error in deleting reduce temporary output",e);
}
- // delete the job diretory for this task
- // since the job is done/failed
- this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +
- JOBCACHE + Path.SEPARATOR + task.getJobId());
+ // Delete the job directory for this
+ // task if the job is done/failed
+ if (purgeJobFiles) {
+ this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +
+ JOBCACHE + Path.SEPARATOR + task.getJobId());
+ }
}
/**
Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java?view=auto&rev=483651
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerAction.java Thu Dec 7 12:38:17 2006
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * A generic directive from the {@link org.apache.hadoop.mapred.JobTracker}
+ * to the {@link org.apache.hadoop.mapred.TaskTracker} to take some 'action'.
+ *
+ * @author Arun C Murthy
+ */
+abstract class TaskTrackerAction implements Writable {
+
+ /**
+ * Ennumeration of various 'actions' that the {@link JobTracker}
+ * directs the {@link TaskTracker} to perform periodically.
+ *
+ * @author Arun C Murthy
+ */
+ public static enum ActionType {
+ /** Launch a new task. */
+ LAUNCH_TASK,
+
+ /** Kill a task. */
+ KILL_TASK,
+
+ /** Kill any tasks of this job and cleanup. */
+ KILL_JOB,
+
+ /** Reinitialize the tasktracker. */
+ REINIT_TRACKER
+ };
+
+ /**
+ * A factory-method to create objects of given {@link ActionType}.
+ * @param actionType the {@link ActionType} of object to create.
+ * @return an object of {@link ActionType}.
+ */
+ public static TaskTrackerAction createAction(ActionType actionType) {
+ TaskTrackerAction action = null;
+
+ switch (actionType) {
+ case LAUNCH_TASK:
+ {
+ action = new LaunchTaskAction();
+ }
+ break;
+ case KILL_TASK:
+ {
+ action = new KillTaskAction();
+ }
+ break;
+ case KILL_JOB:
+ {
+ action = new KillJobAction();
+ }
+ break;
+ case REINIT_TRACKER:
+ {
+ action = new ReinitTrackerAction();
+ }
+ break;
+ }
+
+ return action;
+ }
+
+ private ActionType actionType;
+
+ protected TaskTrackerAction(ActionType actionType) {
+ this.actionType = actionType;
+ }
+
+ /**
+ * Return the {@link ActionType}.
+ * @return the {@link ActionType}.
+ */
+ ActionType getActionId() {
+ return actionType;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeEnum(out, actionType);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ actionType = WritableUtils.readEnum(in, ActionType.class);
+ }
+}