You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2006/02/02 18:58:44 UTC
svn commit: r374443 [2/2] - in /lucene/nutch/trunk/src:
java/org/apache/nutch/fs/ java/org/apache/nutch/mapred/
test/org/apache/nutch/mapred/ webapps/jobtracker/
Added: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java?rev=374443&view=auto
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java (added)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskInProgress.java Thu Feb 2 09:58:38 2006
@@ -0,0 +1,445 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.mapred;
+
+import org.apache.nutch.io.*;
+import org.apache.nutch.fs.*;
+import org.apache.nutch.ipc.*;
+import org.apache.nutch.util.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.logging.*;
+
+
+////////////////////////////////////////////////////////
+// TaskInProgress maintains all the info needed for a
+// Task in the lifetime of its owning Job. A given Task
+// might be speculatively executed or reexecuted, so we
+// need a level of indirection above the running-id itself.
+//
+// A given TaskInProgress contains multiple taskids,
+// 0 or more of which might be executing at any one time.
+// (That's what allows speculative execution.) A taskid
+// is now *never* recycled. A TIP allocates enough taskids
+// to account for all the speculation and failures it will
+// ever have to handle. Once those are up, the TIP is dead.
+//
+////////////////////////////////////////////////////////
+class TaskInProgress {
+ static final int MAX_TASK_EXECS = 10;
+ static final int MAX_TASK_FAILURES = 4;
+ static final double SPECULATIVE_GAP = 0.2;
+ static final long SPECULATIVE_LAG = 60 * 1000;
+
+ public static final Logger LOG = LogFormatter.getLogger("org.apache.nutch.mapred.TaskInProgress");
+
+ // Defines the TIP
+ String jobFile = null;
+ FileSplit split = null;
+ TaskInProgress predecessors[] = null;
+ int partition;
+ JobTracker jobtracker;
+ String id;
+ String totalTaskIds[];
+ JobInProgress job;
+
+ // Status of the TIP
+ int numTaskFailures = 0;
+ double progress = 0;
+ long startTime = 0;
+ int completes = 0;
+ boolean failed = false;
+ TreeSet usableTaskIds = new TreeSet();
+ TreeSet recentTasks = new TreeSet();
+ NutchConf nutchConf;
+
+ TreeMap taskDiagnosticData = new TreeMap();
+ TreeMap taskStatuses = new TreeMap();
+
+ TreeSet machinesWhereFailed = new TreeSet();
+ TreeSet tasksReportedClosed = new TreeSet();
+
+ /**
+ * Constructor for MapTask
+ */
+ public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, NutchConf nutchConf, JobInProgress job) {
+ this.jobFile = jobFile;
+ this.split = split;
+ this.jobtracker = jobtracker;
+ this.job = job;
+ this.nutchConf = nutchConf;
+ init();
+ }
+
+ /**
+ * Constructor for ReduceTask
+ */
+ public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker jobtracker, NutchConf nutchConf, JobInProgress job) {
+ this.jobFile = jobFile;
+ this.predecessors = predecessors;
+ this.partition = partition;
+ this.jobtracker = jobtracker;
+ this.job = job;
+ this.nutchConf = nutchConf;
+ init();
+ }
+
+ /**
+ * Initialization common to Map and Reduce
+ */
+ void init() {
+ this.startTime = System.currentTimeMillis();
+ this.id = "tip_" + jobtracker.createUniqueId();
+ this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES];
+ for (int i = 0; i < totalTaskIds.length; i++) {
+ if (isMapTask()) {
+ totalTaskIds[i] = "task_m_" + jobtracker.createUniqueId();
+ } else {
+ totalTaskIds[i] = "task_r_" + jobtracker.createUniqueId();
+ }
+ usableTaskIds.add(totalTaskIds[i]);
+ }
+ }
+
+ ////////////////////////////////////
+ // Accessors, info, profiles, etc.
+ ////////////////////////////////////
+
+ /**
+ * Return the parent job
+ */
+ public JobInProgress getJob() {
+ return job;
+ }
+ /**
+ * Return an ID for this task, not its component taskid-threads
+ */
+ public String getTIPId() {
+ return this.id;
+ }
+ /**
+ * Whether this is a map task
+ */
+ public boolean isMapTask() {
+ return split != null;
+ }
+ /**
+ */
+ public boolean isComplete() {
+ return (completes > 0);
+ }
+ /**
+ */
+ public boolean isComplete(String taskid) {
+ TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+ if (status == null) {
+ return false;
+ }
+ return ((completes > 0) && (status.getRunState() == TaskStatus.SUCCEEDED));
+ }
+ /**
+ */
+ public boolean isFailed() {
+ return failed;
+ }
+ /**
+ * Number of times the TaskInProgress has failed.
+ */
+ public int numTaskFailures() {
+ return numTaskFailures();
+ }
+ /**
+ * Get the overall progress (from 0 to 1.0) for this TIP
+ */
+ public double getProgress() {
+ return progress;
+ }
+ /**
+ * Returns whether a component task-thread should be
+ * closed because the containing JobInProgress has completed.
+ */
+ public boolean shouldCloseForClosedJob(String taskid) {
+ // If the thing has never been closed,
+ // and it belongs to this TIP,
+ // and this TIP is somehow FINISHED,
+ // then true
+ TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
+ if ((ts != null) &&
+ (! tasksReportedClosed.contains(taskid)) &&
+ (job.getStatus().getRunState() != JobStatus.RUNNING)) {
+ tasksReportedClosed.add(taskid);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * A TaskInProgress might be speculatively executed, and so
+ * can have many taskids simultaneously. Reduce tasks rely on knowing
+ * their predecessor ids, so they can be sure that all the previous
+ * work has been completed.
+ *
+ * But we don't know ahead of time which task id will actually be
+ * the one that completes for a given Map task. We don't want the
+ * Reduce task to have to be recreated after Map-completion, or check
+ * in with the JobTracker. So instead, each TaskInProgress preallocates
+ * all the task-ids it could ever want to run simultaneously. Then the
+ * Reduce task can be told about all the ids task-ids for a given Map
+ * TaskInProgress. If any of the Map TIP's tasks complete, the Reduce
+ * task will know all is well, and can continue.
+ *
+ * Most of the time, only a small number of the possible task-ids will
+ * ever be used.
+ */
+ public String[] getAllPossibleTaskIds() {
+ return totalTaskIds;
+ }
+
+ /**
+ * Creates a "status report" for this task. Includes the
+ * task ID and overall status, plus reports for all the
+ * component task-threads that have ever been started.
+ */
+ Vector generateSingleReport() {
+ Vector report = new Vector();
+ report.add(getTIPId());
+ report.add("" + progress);
+
+ report.add(new Integer(taskDiagnosticData.size()));
+ for (Iterator it = taskDiagnosticData.keySet().iterator(); it.hasNext(); ) {
+ String taskid = (String) it.next();
+ Vector taskData = (Vector) taskDiagnosticData.get(taskid);
+
+ TaskStatus taskStatus = (TaskStatus) taskStatuses.get(taskid);
+ String taskStateString = taskStatus.getStateString();
+
+ report.add(taskData);
+ report.add(taskStateString);
+ }
+ return report;
+ }
+
+ ////////////////////////////////////////////////
+ // Update methods, usually invoked by the owning
+ // job.
+ ////////////////////////////////////////////////
+ /**
+ * A status message from a client has arrived.
+ * It updates the status of a single component-thread-task,
+ * which might result in an overall TaskInProgress status update.
+ */
+ public void updateStatus(TaskStatus status) {
+ String taskid = status.getTaskId();
+ String diagInfo = status.getDiagnosticInfo();
+ if (diagInfo != null && diagInfo.length() > 0) {
+ Vector diagHistory = (Vector) taskDiagnosticData.get(taskid);
+ if (diagHistory == null) {
+ diagHistory = new Vector();
+ taskDiagnosticData.put(taskid, diagHistory);
+ }
+ diagHistory.add(diagInfo);
+ }
+ taskStatuses.put(taskid, status);
+
+ // Recompute progress
+ recomputeProgress();
+ }
+
+ /**
+ * Indicate that one of the taskids in this TaskInProgress
+ * has failed.
+ */
+ public void failedSubTask(String taskid, String trackerName) {
+ //
+ // Note the failure and its location
+ //
+ LOG.info("Task '" + taskid + "' has been lost.");
+ TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+ if (status != null) {
+ status.setRunState(TaskStatus.FAILED);
+ }
+ this.recentTasks.remove(taskid);
+ this.completes--;
+
+ numTaskFailures++;
+ if (numTaskFailures >= MAX_TASK_FAILURES) {
+ LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
+ kill();
+ }
+ machinesWhereFailed.add(trackerName);
+
+ // Ask JobTracker to forget about this task
+ jobtracker.removeTaskEntry(taskid);
+
+ recomputeProgress();
+ }
+
+ /**
+ * Indicate that one of the taskids in this TaskInProgress
+ * has successfully completed!
+ */
+ public void completed(String taskid) {
+ LOG.info("Task '" + taskid + "' has completed.");
+ TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+ status.setRunState(TaskStatus.SUCCEEDED);
+ recentTasks.remove(taskid);
+
+ //
+ // Now that the TIP is complete, the other speculative
+ // subtasks will be closed when the owning tasktracker
+ // reports in and calls shouldClose() on this object.
+ //
+
+ this.completes++;
+ recomputeProgress();
+ }
+
+ /**
+ * The TIP's been ordered kill()ed.
+ */
+ public void kill() {
+ if (isComplete() || failed) {
+ return;
+ }
+ this.failed = true;
+ recomputeProgress();
+ }
+
+ /**
+ * This method is called whenever there's a status change
+ * for one of the TIP's sub-tasks. It recomputes the overall
+ * progress for the TIP. We examine all sub-tasks and find
+ * the one that's most advanced (and non-failed).
+ */
+ void recomputeProgress() {
+ if (isComplete()) {
+ this.progress = 1;
+ } else if (failed) {
+ this.progress = 0;
+ } else {
+ double bestProgress = 0;
+ for (Iterator it = taskStatuses.keySet().iterator(); it.hasNext(); ) {
+ String taskid = (String) it.next();
+ TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+ if (status.getRunState() == TaskStatus.SUCCEEDED) {
+ bestProgress = 1;
+ break;
+ } else if (status.getRunState() == TaskStatus.RUNNING) {
+ bestProgress = Math.max(bestProgress, status.getProgress());
+ }
+ }
+ this.progress = bestProgress;
+ }
+ }
+
+ /////////////////////////////////////////////////
+ // "Action" methods that actually require the TIP
+ // to do something.
+ /////////////////////////////////////////////////
+ /**
+ * Return whether this TIP has an NDFS cache-driven task
+ * to run at the given taskTracker.
+ */
+ boolean hasTaskWithCacheHit(String taskTracker, TaskTrackerStatus tts) {
+ if (failed || isComplete() || recentTasks.size() > 0) {
+ return false;
+ } else {
+ try {
+ if (isMapTask()) {
+ NutchFileSystem fs = NutchFileSystem.get(nutchConf);
+ String hints[][] = fs.getFileCacheHints(split.getFile(), split.getStart(), split.getLength());
+ for (int i = 0; i < hints.length; i++) {
+ for (int j = 0; j < hints[i].length; j++) {
+ if (hints[i][j].equals(tts.getHost())) {
+ return true;
+ }
+ }
+ }
+ }
+ } catch (IOException ie) {
+ }
+ return false;
+ }
+ }
+ /**
+ * Return whether this TIP has a non-speculative task to run
+ */
+ boolean hasTask() {
+ if (failed || isComplete() || recentTasks.size() > 0) {
+ return false;
+ } else {
+ for (Iterator it = taskStatuses.values().iterator(); it.hasNext(); ) {
+ TaskStatus ts = (TaskStatus) it.next();
+ if (ts.getRunState() == TaskStatus.RUNNING) {
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+ /**
+ * Return whether the TIP has a speculative task to run. We
+ * only launch a speculative task if the current TIP is really
+ * far behind, and has been behind for a non-trivial amount of
+ * time.
+ */
+ boolean hasSpeculativeTask(double averageProgress) {
+ //
+ // REMIND - mjc - these constants should be examined
+ // in more depth eventually...
+ //
+ if (isMapTask() &&
+ (averageProgress - progress >= SPECULATIVE_GAP) &&
+ (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Return a Task that can be sent to a TaskTracker for execution.
+ */
+ public Task getTaskToRun(String taskTracker, TaskTrackerStatus tts, double avgProgress) {
+ Task t = null;
+ if (hasTaskWithCacheHit(taskTracker, tts) ||
+ hasTask() ||
+ hasSpeculativeTask(avgProgress)) {
+
+ String taskid = (String) usableTaskIds.first();
+ usableTaskIds.remove(taskid);
+
+ if (isMapTask()) {
+ t = new MapTask(jobFile, taskid, split);
+ } else {
+ String mapIdPredecessors[][] = new String[predecessors.length][];
+ for (int i = 0; i < mapIdPredecessors.length; i++) {
+ mapIdPredecessors[i] = predecessors[i].getAllPossibleTaskIds();
+ }
+ t = new ReduceTask(jobFile, taskid, mapIdPredecessors, partition);
+ }
+ t.setConf(nutchConf);
+
+ recentTasks.add(taskid);
+
+ // Ask JobTracker to note that the task exists
+ jobtracker.createTaskEntry(taskid, taskTracker, this);
+ }
+ return t;
+ }
+}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskRunner.java Thu Feb 2 09:58:38 2006
@@ -33,6 +33,7 @@
public static final Logger LOG =
LogFormatter.getLogger("org.apache.nutch.mapred.TaskRunner");
+ boolean killed = false;
private Process process;
private Task t;
private TaskTracker tracker;
@@ -51,7 +52,7 @@
/** Called to assemble this task's input. This method is run in the parent
* process before the child is spawned. It should not execute user code,
* only system code. */
- public void prepare() throws IOException {}
+ public boolean prepare() throws IOException {return true;}
/** Called when this task's output is no longer needed.
* This method is run in the parent process after the child exits. It should
@@ -62,7 +63,9 @@
public final void run() {
try {
- prepare();
+ if (! prepare()) {
+ return;
+ }
String sep = System.getProperty("path.separator");
File workDir = new File(new File(t.getJobFile()).getParent(), "work");
@@ -72,7 +75,7 @@
// start with same classpath as parent process
classPath.append(System.getProperty("java.class.path"));
classPath.append(sep);
-
+
JobConf job = new JobConf(t.getJobFile());
String jar = job.getJar();
if (jar != null) { // if jar exists, it into workDir
@@ -158,6 +161,7 @@
if (process != null) {
process.destroy();
}
+ killed = true;
}
/**
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskStatus.java Thu Feb 2 09:58:38 2006
@@ -34,6 +34,7 @@
public static final int UNASSIGNED = 3;
private String taskid;
+ private boolean isMap;
private float progress;
private int runState;
private String diagnosticInfo;
@@ -41,8 +42,9 @@
public TaskStatus() {}
- public TaskStatus(String taskid, float progress, int runState, String diagnosticInfo, String stateString) {
+ public TaskStatus(String taskid, boolean isMap, float progress, int runState, String diagnosticInfo, String stateString) {
this.taskid = taskid;
+ this.isMap = isMap;
this.progress = progress;
this.runState = runState;
this.diagnosticInfo = diagnosticInfo;
@@ -50,6 +52,7 @@
}
public String getTaskId() { return taskid; }
+ public boolean getIsMap() { return isMap; }
public float getProgress() { return progress; }
public void setProgress(float progress) { this.progress = progress; }
public int getRunState() { return runState; }
@@ -64,6 +67,7 @@
//////////////////////////////////////////////
public void write(DataOutput out) throws IOException {
UTF8.writeString(out, taskid);
+ out.writeBoolean(isMap);
out.writeFloat(progress);
out.writeInt(runState);
UTF8.writeString(out, diagnosticInfo);
@@ -72,6 +76,7 @@
public void readFields(DataInput in) throws IOException {
this.taskid = UTF8.readString(in);
+ this.isMap = in.readBoolean();
this.progress = in.readFloat();
this.runState = in.readInt();
this.diagnosticInfo = UTF8.readString(in);
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTracker.java Thu Feb 2 09:58:38 2006
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.nutch.mapred;
+ package org.apache.nutch.mapred;
import org.apache.nutch.fs.*;
import org.apache.nutch.io.*;
@@ -33,7 +33,6 @@
* @author Mike Cafarella
*******************************************************/
public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable {
- private int maxCurrentTask;
static final long WAIT_FOR_DONE = 3 * 1000;
private long taskTimeout;
@@ -66,6 +65,8 @@
private NutchConf fConf;
private MapOutputFile mapOutputFile;
+ private int maxCurrentTasks;
+
/**
* Start with the local machine name, and the default JobTracker
*/
@@ -77,9 +78,10 @@
* Start with the local machine name, and the addr of the target JobTracker
*/
public TaskTracker(InetSocketAddress jobTrackAddr, NutchConf conf) throws IOException {
+ maxCurrentTasks = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
+
this.fConf = conf;
this.jobTrackAddr = jobTrackAddr;
- this.maxCurrentTask = conf.getInt("mapred.tasktracker.tasks.maximum", 2);
this.taskTimeout = conf.getInt("mapred.task.timeout", 10* 60 * 1000);
this.mapOutputFile = new MapOutputFile();
this.mapOutputFile.setConf(conf);
@@ -93,6 +95,7 @@
*/
void initialize() throws IOException {
this.taskTrackerName = "tracker_" + (Math.abs(r.nextInt()) % 100000);
+ LOG.info("Starting tracker " + taskTrackerName);
this.localHostname = InetAddress.getLocalHost().getHostName();
new JobConf(this.fConf).deleteLocalFiles(SUBDIR);
@@ -108,7 +111,7 @@
// RPC initialization
while (true) {
try {
- this.taskReportServer = RPC.getServer(this, this.taskReportPort, this.maxCurrentTask, false, this.fConf);
+ this.taskReportServer = RPC.getServer(this, this.taskReportPort, maxCurrentTasks, false, this.fConf);
this.taskReportServer.start();
break;
} catch (BindException e) {
@@ -119,7 +122,7 @@
}
while (true) {
try {
- this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, this.maxCurrentTask, false, this.fConf);
+ this.mapOutputServer = RPC.getServer(this, this.mapOutputPort, maxCurrentTasks, false, this.fConf);
this.mapOutputServer.start();
break;
} catch (BindException e) {
@@ -142,9 +145,14 @@
* clean.
*/
public synchronized void close() throws IOException {
- // Kill running tasks
- while (tasks.size() > 0) {
- TaskInProgress tip = (TaskInProgress)tasks.get(tasks.firstKey());
+ //
+ // Kill running tasks. Do this in a 2nd vector, called 'tasksToClose',
+ // because calling jobHasFinished() may result in an edit to 'tasks'.
+ //
+ TreeMap tasksToClose = new TreeMap();
+ tasksToClose.putAll(tasks);
+ for (Iterator it = tasksToClose.values().iterator(); it.hasNext(); ) {
+ TaskInProgress tip = (TaskInProgress) it.next();
tip.jobHasFinished();
}
@@ -154,11 +162,21 @@
} catch (InterruptedException ie) {
}
- // Shutdown local RPC servers
- if (taskReportServer != null) {
- taskReportServer.stop();
- taskReportServer = null;
- }
+ //
+ // Shutdown local RPC servers. Do them
+ // in parallel, as RPC servers can take a long
+ // time to shutdown. (They need to wait a full
+ // RPC timeout, which might be 10-30 seconds.)
+ //
+ new Thread() {
+ public void run() {
+ if (taskReportServer != null) {
+ taskReportServer.stop();
+ taskReportServer = null;
+ }
+ }
+ }.start();
+
if (mapOutputServer != null) {
mapOutputServer.stop();
mapOutputServer = null;
@@ -227,7 +245,7 @@
//
// Check if we should create a new Task
//
- if (runningTasks.size() < this.maxCurrentTask) {
+ if (runningTasks.size() < maxCurrentTasks) {
Task t = jobClient.pollForNewTask(taskTrackerName);
if (t != null) {
TaskInProgress tip = new TaskInProgress(t, this.fConf);
@@ -255,9 +273,16 @@
}
//
+ // Check for any Tasks that should be killed, even if
+ // the containing Job is still ongoing. (This happens
+ // with speculative execution, when one version of the
+ // task finished before another
+ //
+
+ //
// Check for any Tasks whose job may have ended
//
- String toCloseId = jobClient.pollForClosedTask(taskTrackerName);
+ String toCloseId = jobClient.pollForTaskWithClosedJob(taskTrackerName);
if (toCloseId != null) {
synchronized (this) {
TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId);
@@ -288,7 +313,8 @@
staleState = true;
}
} catch (Exception ex) {
- LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "]. ex=" + ex + " Retrying...");
+ ex.printStackTrace();
+ LOG.info("Lost connection to JobTracker [" + jobTrackAddr + "]. Retrying...");
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
@@ -373,7 +399,7 @@
/**
*/
public TaskStatus createStatus() {
- TaskStatus status = new TaskStatus(task.getTaskId(), progress, runstate, diagnosticInfo.toString(), (stateString == null) ? "" : stateString);
+ TaskStatus status = new TaskStatus(task.getTaskId(), task.isMapTask(), progress, runstate, diagnosticInfo.toString(), (stateString == null) ? "" : stateString);
if (diagnosticInfo.length() > 0) {
diagnosticInfo = new StringBuffer();
}
Modified: lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java (original)
+++ lucene/nutch/trunk/src/java/org/apache/nutch/mapred/TaskTrackerStatus.java Thu Feb 2 09:58:38 2006
@@ -77,6 +77,28 @@
public Iterator taskReports() {
return taskReports.iterator();
}
+
+ /**
+ * Return the current MapTask count
+ */
+ public int countMapTasks() {
+ int mapCount = 0;
+ for (Iterator it = taskReports.iterator(); it.hasNext(); ) {
+ TaskStatus ts = (TaskStatus) it.next();
+ if (ts.getIsMap()) {
+ mapCount++;
+ }
+ }
+ return mapCount;
+ }
+
+ /**
+ * Return the current ReduceTask count
+ */
+ public int countReduceTasks() {
+ return taskReports.size() - countMapTasks();
+ }
+
/**
*/
public long getLastSeen() {
Added: lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java?rev=374443&view=auto
==============================================================================
--- lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java (added)
+++ lucene/nutch/trunk/src/test/org/apache/nutch/mapred/MapredLoadTest.java Thu Feb 2 09:58:38 2006
@@ -0,0 +1,317 @@
+/**
+ * Copyright 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.mapred;
+
+import org.apache.nutch.fs.*;
+import org.apache.nutch.io.*;
+import org.apache.nutch.util.*;
+import org.apache.nutch.mapred.lib.*;
+
+import java.io.*;
+import java.util.*;
+import java.math.*;
+
+/**********************************************************
+ * MapredLoadTest generates a bunch of work that exercises
+ * a Nutch Map-Reduce system (and NDFS, too). It goes through
+ * the following steps:
+ *
+ * 1) Take inputs 'range' and 'counts'.
+ * 2) Generate 'counts' random integers between 0 and range-1.
+ * 3) Create a file that lists each integer between 0 and range-1,
+ * and lists the number of times that integer was generated.
+ * 4) Emit a (very large) file that contains all the integers
+ * in the order generated.
+ * 5) After the file has been generated, read it back and count
+ * how many times each int was generated.
+ * 6) Compare this big count-map against the original one. If
+ * they match, then SUCCESS! Otherwise, FAILURE!
+ *
+ * OK, that's how we can think about it. What are the map-reduce
+ * steps that get the job done?
+ *
+ * 1) In a non-mapred thread, take the inputs 'range' and 'counts'.
+ * 2) In a non-mapread thread, generate the answer-key and write to disk.
+ * 3) In a mapred job, divide the answer key into K jobs.
+ * 4) A mapred 'generator' task consists of K map jobs. Each reads
+ * an individual "sub-key", and generates integers according to
+ * to it (though with a random ordering).
+ * 5) The generator's reduce task agglomerates all of those files
+ * into a single one.
+ * 6) A mapred 'reader' task consists of M map jobs. The output
+ * file is cut into M pieces. Each of the M jobs counts the
+ * individual ints in its chunk and creates a map of all seen ints.
+ * 7) A mapred job integrates all the count files into a single one.
+ *
+ **********************************************************/
+public class MapredLoadTest {
+ static class RandomGenMapper implements Mapper {
+ Random r = new Random();
+ public void configure(JobConf job) {
+ }
+
+ public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
+ int randomVal = ((IntWritable) key).get();
+ int randomCount = ((IntWritable) val).get();
+
+ for (int i = 0; i < randomCount; i++) {
+ out.collect(new IntWritable(Math.abs(r.nextInt())), new IntWritable(randomVal));
+ }
+ }
+ }
+ static class RandomGenReducer implements Reducer {
+ public void configure(JobConf job) {
+ }
+
+ public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+ int keyint = ((IntWritable) key).get();
+ while (it.hasNext()) {
+ int val = ((IntWritable) it.next()).get();
+ out.collect(new UTF8("" + val), new UTF8(""));
+ }
+ }
+ }
+ static class RandomCheckMapper implements Mapper {
+ public void configure(JobConf job) {
+ }
+
+ public void map(WritableComparable key, Writable val, OutputCollector out, Reporter reporter) throws IOException {
+ long pos = ((LongWritable) key).get();
+ UTF8 str = (UTF8) val;
+
+ out.collect(new IntWritable(Integer.parseInt(str.toString().trim())), new IntWritable(1));
+ }
+ }
+ static class RandomCheckReducer implements Reducer {
+ public void configure(JobConf job) {
+ }
+
+ public void reduce(WritableComparable key, Iterator it, OutputCollector out, Reporter reporter) throws IOException {
+ int keyint = ((IntWritable) key).get();
+ int count = 0;
+ while (it.hasNext()) {
+ it.next();
+ count++;
+ }
+ out.collect(new IntWritable(keyint), new IntWritable(count));
+ }
+ }
+
+ int range;
+ int counts;
+ Random r = new Random();
+ NutchConf nutchConf;
+
+ /**
+ * MapredLoadTest
+ */
+ public MapredLoadTest(int range, int counts, NutchConf nutchConf) throws IOException {
+ this.range = range;
+ this.counts = counts;
+ this.nutchConf = nutchConf;
+ }
+
+ /**
+ *
+ */
+ public void launch() throws IOException {
+ //
+ // Generate distribution of ints. This is the answer key.
+ //
+ int countsToGo = counts;
+ int dist[] = new int[range];
+ for (int i = 0; i < range; i++) {
+ double avgInts = (1.0 * countsToGo) / (range - i);
+ dist[i] = (int) Math.max(0, Math.round(avgInts + (Math.sqrt(avgInts) * r.nextGaussian())));
+ countsToGo -= dist[i];
+ }
+ if (countsToGo > 0) {
+ dist[dist.length-1] += countsToGo;
+ }
+
+ //
+ // Write the answer key to a file.
+ //
+ NutchFileSystem fs = NutchFileSystem.get(nutchConf);
+ File testdir = new File("mapred.loadtest");
+ fs.mkdirs(testdir);
+
+ File randomIns = new File(testdir, "genins");
+ fs.mkdirs(randomIns);
+
+ File answerkey = new File(randomIns, "answer.key");
+ SequenceFile.Writer out = new SequenceFile.Writer(fs, answerkey.getPath(), IntWritable.class, IntWritable.class);
+ try {
+ for (int i = 0; i < range; i++) {
+ out.append(new IntWritable(i), new IntWritable(dist[i]));
+ }
+ } finally {
+ out.close();
+ }
+
+ //
+ // Now we need to generate the random numbers according to
+ // the above distribution.
+ //
+ // We create a lot of map tasks, each of which takes at least
+ // one "line" of the distribution. (That is, a certain number
+ // X is to be generated Y number of times.)
+ //
+ // A map task emits Y key/val pairs. The val is X. The key
+ // is a randomly-generated number.
+ //
+ // The reduce task gets its input sorted by key. That is, sorted
+ // in random order. It then emits a single line of text that
+ // for the given values. It does not emit the key.
+ //
+ // Because there's just one reduce task, we emit a single big
+ // file of random numbers.
+ //
+ File randomOuts = new File(testdir, "genouts");
+ fs.mkdirs(randomOuts);
+
+
+ JobConf genJob = new JobConf(nutchConf);
+ genJob.setInputDir(randomIns);
+ genJob.setInputKeyClass(IntWritable.class);
+ genJob.setInputValueClass(IntWritable.class);
+ genJob.setInputFormat(SequenceFileInputFormat.class);
+ genJob.setMapperClass(RandomGenMapper.class);
+
+ genJob.setOutputDir(randomOuts);
+ genJob.setOutputKeyClass(IntWritable.class);
+ genJob.setOutputValueClass(IntWritable.class);
+ genJob.setOutputFormat(TextOutputFormat.class);
+ genJob.setReducerClass(RandomGenReducer.class);
+ genJob.setNumReduceTasks(1);
+
+ JobClient.runJob(genJob);
+
+ //
+ // Next, we read the big file in and regenerate the
+ // original map.
+ //
+ // We have many map tasks, each of which read at least one
+ // of the output numbers. For each number read in, the
+ // map task emits a key/value pair where the key is the
+ // number and the value is "1".
+ //
+ // We have a single reduce task, which receives its input
+ // sorted by the key emitted above. For each key, there will
+ // be a certain number of "1" values. The reduce task sums
+ // these values to compute how many times the given key was
+ // emitted.
+ //
+ // The reduce task then emits a key/val pair where the key
+ // is the number in question, and the value is the number of
+ // times the key was emitted. This is the same format as the
+ // original answer key (except that numbers emitted zero times
+ // will not appear in the regenerated key.)
+ //
+ File finalOuts = new File(testdir, "finalouts");
+ fs.mkdirs(finalOuts);
+ JobConf checkJob = new JobConf(nutchConf);
+ checkJob.setInputDir(randomOuts);
+ checkJob.setInputKeyClass(LongWritable.class);
+ checkJob.setInputValueClass(UTF8.class);
+ checkJob.setInputFormat(TextInputFormat.class);
+ checkJob.setMapperClass(RandomCheckMapper.class);
+
+ checkJob.setOutputDir(finalOuts);
+ checkJob.setOutputKeyClass(IntWritable.class);
+ checkJob.setOutputValueClass(IntWritable.class);
+ checkJob.setOutputFormat(SequenceFileOutputFormat.class);
+ checkJob.setReducerClass(RandomCheckReducer.class);
+ checkJob.setNumReduceTasks(1);
+
+ JobClient.runJob(checkJob);
+
+ //
+ // Finally, we compare the reconstructed answer key with the
+ // original one. Remember, we need to ignore zero-count items
+ // in the original key.
+ //
+ boolean success = true;
+ File recomputedkey = new File(finalOuts, "part-00000");
+ SequenceFile.Reader in = new SequenceFile.Reader(fs, recomputedkey.getPath(), nutchConf);
+ int totalseen = 0;
+ try {
+ IntWritable key = new IntWritable();
+ IntWritable val = new IntWritable();
+ for (int i = 0; i < range; i++) {
+ if (dist[i] == 0) {
+ continue;
+ }
+ if (! in.next(key, val)) {
+ System.err.println("Cannot read entry " + i);
+ success = false;
+ break;
+ } else {
+ if ( !((key.get() == i ) && (val.get() == dist[i]))) {
+ System.err.println("Mismatch! Pos=" + key.get() + ", i=" + i + ", val=" + val.get() + ", dist[i]=" + dist[i]);
+ success = false;
+ }
+ totalseen += val.get();
+ }
+ }
+ if (success) {
+ if (in.next(key, val)) {
+ System.err.println("Unnecessary lines in recomputed key!");
+ success = false;
+ }
+ }
+ } finally {
+ in.close();
+ }
+ int originalTotal = 0;
+ for (int i = 0; i < dist.length; i++) {
+ originalTotal += dist[i];
+ }
+ System.out.println("Original sum: " + originalTotal);
+ System.out.println("Recomputed sum: " + totalseen);
+
+ //
+ // Write to "results" whether the test succeeded or not.
+ //
+ File resultFile = new File(testdir, "results");
+ BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(resultFile)));
+ try {
+ bw.write("Success=" + success + "\n");
+ System.out.println("Success=" + success);
+ } finally {
+ bw.close();
+ }
+ }
+
+ /**
+ * Launches all the tasks in order.
+ */
+ public static void main(String[] argv) throws Exception {
+ if (argv.length < 2) {
+ System.err.println("Usage: MapredLoadTest <range> <counts>");
+ System.err.println();
+ System.err.println("Note: a good test will have a <counts> value that is substantially larger than the <range>");
+ return;
+ }
+
+ int i = 0;
+ int range = Integer.parseInt(argv[i++]);
+ int counts = Integer.parseInt(argv[i++]);
+
+ MapredLoadTest mlt = new MapredLoadTest(range, counts, new NutchConf());
+ mlt.launch();
+ }
+}
Modified: lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp (original)
+++ lucene/nutch/trunk/src/webapps/jobtracker/jobdetails.jsp Thu Feb 2 09:58:38 2006
@@ -9,21 +9,28 @@
<%
String jobid = request.getParameter("jobid");
JobTracker tracker = JobTracker.getTracker();
- JobTracker.JobInProgress job = (JobTracker.JobInProgress) tracker.getJob(jobid);
+ JobInProgress job = (JobInProgress) tracker.getJob(jobid);
JobProfile profile = (job != null) ? (job.getProfile()) : null;
JobStatus status = (job != null) ? (job.getStatus()) : null;
- Vector mapTaskReports[] = tracker.getMapTaskReport(jobid);
- Vector reduceTaskReports[] = tracker.getReduceTaskReport(jobid);
+ Vector mapTaskReports[] = (job != null) ? tracker.getMapTaskReport(jobid) : null;
+ Vector reduceTaskReports[] = (job != null) ? tracker.getReduceTaskReport(jobid) : null;
%>
<html>
<title>Nutch MapReduce Job Details</title>
<body>
+<%
+ if (job == null) {
+ %>
+ No job found<br>
+ <%
+ } else {
+ %>
<h1>Job '<%=jobid%>'</h1>
<b>Job File:</b> <%=profile.getJobFile()%><br>
-<b>Start time:</b> <%= new Date(job.getStartTime())%><br>
+<b>The job started at:</b> <%= new Date(job.getStartTime())%><br>
<%
if (status.getRunState() == JobStatus.RUNNING) {
out.print("The job is still running.<br>\n");
@@ -38,19 +45,28 @@
<h2>Map Tasks</h2>
<center>
<table border=2 cellpadding="5" cellspacing="2">
- <tr><td align="center">Map Task Id</td><td>Pct Complete</td><td>State</td><td>Diagnostic Text</td></tr>
+ <tr><td align="center">Map Task Id</td><td>Pct Complete</td><td>Diagnostic Data</td></tr>
<%
for (int i = 0; i < mapTaskReports.length; i++) {
Vector v = mapTaskReports[i];
- out.print("<tr><td>" + v.elementAt(0) + "</td><td>" + v.elementAt(1) + "</td><td>" + v.elementAt(2) + "</td>");
- if (v.size() == 3) {
- out.print("<td></td>");
- } else {
- for (int j = 3; j < v.size(); j++) {
- out.print("<td>" + v.elementAt(j) + "</td>");
+ String tipid = (String) v.elementAt(0);
+ String progress = (String) v.elementAt(1);
+ int diagnosticSize = ((Integer) v.elementAt(2)).intValue();
+
+ out.print("<tr><td>" + tipid + "</td><td>" + progress + "</td><td>");
+ for (int j = 0; j < diagnosticSize; j++) {
+ Vector taskData = (Vector) v.elementAt(3 + ((2 * j)));
+ String taskStateString = (String) v.elementAt(3 + ((2 * j) + 1));
+ out.print(taskStateString);
+ out.print("<b>");
+
+ for (Iterator it2 = taskData.iterator(); it2.hasNext(); ) {
+ out.print("" + it2.next());
+ out.println("<b>");
}
}
+ out.print("</td>");
out.print("</tr>\n");
}
%>
@@ -62,25 +78,36 @@
<h2>Reduce Tasks</h2>
<center>
<table border=2 cellpadding="5" cellspacing="2">
- <tr><td align="center">Reduce Task Id</td><td>Pct Complete</td><td>State</td><td>Diagnostic Text</td></tr>
+ <tr><td align="center">Reduce Task Id</td><td>Pct Complete</td><td>Diagnostic Data</td></tr>
<%
for (int i = 0; i < reduceTaskReports.length; i++) {
Vector v = reduceTaskReports[i];
- out.print("<tr><td>" + v.elementAt(0) + "</td><td>" + v.elementAt(1) + "</td><td>" + v.elementAt(2) + "</td>");
- if (v.size() == 3) {
- out.print("<td></td>");
- } else {
- for (int j = 3; j < v.size(); j++) {
- out.print("<td>" + v.elementAt(j) + "</td>");
+ String tipid = (String) v.elementAt(0);
+ String progress = (String) v.elementAt(1);
+ int diagnosticSize = ((Integer) v.elementAt(2)).intValue();
+
+ out.print("<tr><td>" + tipid + "</td><td>" + progress + "</td><td>");
+ for (int j = 0; j < diagnosticSize; j++) {
+ Vector taskData = (Vector) v.elementAt(3 + ((2 * j)));
+ String taskStateString = (String) v.elementAt(3 + ((2 * j) + 1));
+ out.print(taskStateString);
+ out.print("<b>");
+
+ for (Iterator it2 = taskData.iterator(); it2.hasNext(); ) {
+ out.print("" + it2.next());
+ out.println("<b>");
}
}
+ out.print("</td>");
out.print("</tr>\n");
}
%>
</table>
</center>
-
+ <%
+ }
+%>
<hr>
<a href="/jobtracker.jsp">Go back to JobTracker</a><br>
Modified: lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp
URL: http://svn.apache.org/viewcvs/lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp?rev=374443&r1=374442&r2=374443&view=diff
==============================================================================
--- lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp (original)
+++ lucene/nutch/trunk/src/webapps/jobtracker/jobtracker.jsp Thu Feb 2 09:58:38 2006
@@ -46,22 +46,22 @@
out.print("<tr><td align=\"center\" colspan=\"8\"><b>" + label + " Jobs </b></td></tr>\n");
if (jobs.size() > 0) {
- out.print("<tr><td><b>Jobid</b></td><td><b>% complete</b></td><td><b>Required maps</b></td><td><b>maps attempted</b></td><td><b>maps completed</b></td><td><b>Required reduces</b></td><td><b>reduces attempted</b></td><td><b>reduces completed</b></td></tr>\n");
+ out.print("<tr><td><b>Jobid</b></td><td><b>% complete</b></td><td><b>Required maps</b></td><td><b>maps completed</b></td><td><b>Required reduces</b></td><td><b>reduces completed</b></td></tr>\n");
for (Iterator it = jobs.iterator(); it.hasNext(); ) {
- JobTracker.JobInProgress job = (JobTracker.JobInProgress) it.next();
+ JobInProgress job = (JobInProgress) it.next();
JobProfile profile = job.getProfile();
JobStatus status = job.getStatus();
String jobid = profile.getJobId();
- float completedRatio = (100 * job.completedRatio());
+ double completedRatio = (0.5 * (100 * status.mapProgress())) +
+ (0.5 * (100 * status.reduceProgress()));
+
int desiredMaps = job.desiredMaps();
- int attemptedMaps = job.attemptedMaps();
- int completedMaps = job.completedMaps();
int desiredReduces = job.desiredReduces();
- int attemptedReduces = job.attemptedReduces();
- int completedReduces = job.completedReduces();
+ int completedMaps = job.finishedMaps();
+ int completedReduces = job.finishedReduces();
- out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" + jobid + "</a></td><td>" + completedRatio + "%</td><td>" + desiredMaps + "</td><td>" + attemptedMaps + "</td><td>" + completedMaps + "</td><td>" + desiredReduces + "</td><td>" + attemptedReduces + "</td><td> " + completedReduces + "</td></tr>\n");
+ out.print("<tr><td><a href=\"jobdetails.jsp?jobid=" + jobid + "\">" + jobid + "</a></td><td>" + completedRatio + "%</td><td>" + desiredMaps + "</td><td>" + completedMaps + "</td><td>" + desiredReduces + "</td><td> " + completedReduces + "</td></tr>\n");
}
} else {
out.print("<tr><td align=\"center\" colspan=\"8\"><i>none</i></td></tr>\n");