You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/09/13 22:24:11 UTC
svn commit: r575438 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/
src/test/org/apache/hadoop/mapred/ src/webapps/job/
Author: cutting
Date: Thu Sep 13 13:24:10 2007
New Revision: 575438
URL: http://svn.apache.org/viewvc?rev=575438&view=rev
Log:
HADOOP-1819. Jobtracker cleanups, including binding ports before clearing state directories, so that inadvertantly starting a second jobtracker doesn't trash one that's already running. Contributed by Owen.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp
lucene/hadoop/trunk/src/webapps/job/jobconf.jsp
lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp
lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp
lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp
lucene/hadoop/trunk/src/webapps/job/machines.jsp
lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
lucene/hadoop/trunk/src/webapps/job/taskstats.jsp
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Sep 13 13:24:10 2007
@@ -218,6 +218,12 @@
HADOOP-1718. Add ant targets for measuring code coverage with clover.
(simonwillnauer via nigel)
+ HADOOP-1819. Jobtracker cleanups, including binding ports before
+ clearing state directories, so that inadvertently starting a
+ second jobtracker doesn't trash one that's already running.
+ (omalley via cutting)
+
+
Release 0.14.1 - 2007-09-04
BUG FIXES
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Thu Sep 13 13:24:10 2007
@@ -22,8 +22,6 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.BufferedOutputStream;
-import java.io.StringWriter;
-import java.io.PrintWriter;
import java.io.ByteArrayInputStream;
import java.nio.ByteBuffer;
@@ -32,9 +30,14 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.UnknownHostException;
import java.util.Collections;
import java.util.LinkedList;
@@ -44,7 +47,6 @@
import org.apache.commons.logging.*;
-import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
@@ -142,7 +144,35 @@
private Listener listener = null;
private int numConnections = 0;
private Handler[] handlers = null;
-
+
+ /**
+ * A convience method to bind to a given address and report
+ * better exceptions if the address is not a valid host.
+ * @param socket the socket to bind
+ * @param address the address to bind to
+ * @param backlog the number of connections allowed in the queue
+ * @throws BindException if the address can't be bound
+ * @throws UnknownHostException if the address isn't a valid host name
+ * @throws IOException other random errors from bind
+ */
+ static void bind(ServerSocket socket, InetSocketAddress address,
+ int backlog) throws IOException {
+ try {
+ socket.bind(address, backlog);
+ } catch (BindException e) {
+ throw new BindException("Problem binding to " + address);
+ } catch (SocketException e) {
+ // If they try to bind to a different host's address, give a better
+ // error message.
+ if ("Unresolved address".equals(e.getMessage())) {
+ throw new UnknownHostException("Invalid hostname for server: " +
+ address.getHostName());
+ } else {
+ throw e;
+ }
+ }
+ }
+
/** A call queued for handling. */
private static class Call {
private int id; // the client's call id
@@ -182,7 +212,7 @@
acceptChannel.configureBlocking(false);
// Bind the server socket to the local host and port
- acceptChannel.socket().bind(address, backlogLength);
+ bind(acceptChannel.socket(), address, backlogLength);
port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port
// create a selector;
selector= Selector.open();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ClusterStatus.java Thu Sep 13 13:24:10 2007
@@ -23,34 +23,28 @@
import java.io.IOException;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableFactory;
-import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableUtils;
/**
* Summarizes the size and current state of the cluster.
*/
public class ClusterStatus implements Writable {
- static { // register a ctor
- WritableFactories.setFactory
- (ClusterStatus.class,
- new WritableFactory() {
- public Writable newInstance() { return new ClusterStatus(); }
- });
- }
-
private int task_trackers;
private int map_tasks;
private int reduce_tasks;
private int max_tasks;
+ private JobTracker.State state;
ClusterStatus() {}
- ClusterStatus(int trackers, int maps, int reduces, int max) {
+ ClusterStatus(int trackers, int maps, int reduces, int max,
+ JobTracker.State state) {
task_trackers = trackers;
map_tasks = maps;
reduce_tasks = reduces;
max_tasks = max;
+ this.state = state;
}
@@ -81,12 +75,17 @@
public int getMaxTasks() {
return max_tasks;
}
-
+
+ public JobTracker.State getJobTrackerState() {
+ return state;
+ }
+
public void write(DataOutput out) throws IOException {
out.writeInt(task_trackers);
out.writeInt(map_tasks);
out.writeInt(reduce_tasks);
out.writeInt(max_tasks);
+ WritableUtils.writeEnum(out, state);
}
public void readFields(DataInput in) throws IOException {
@@ -94,6 +93,7 @@
map_tasks = in.readInt();
reduce_tasks = in.readInt();
max_tasks = in.readInt();
+ state = WritableUtils.readEnum(in, JobTracker.State.class);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Thu Sep 13 13:24:10 2007
@@ -32,6 +32,7 @@
*Changing the versionID to 2L since the getTaskCompletionEvents method has
*changed.
*Changed to 4 since killTask(String,boolean) is added
+ *Version 4: added jobtracker state to ClusterStatus
*/
public static final long versionID = 4L;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Thu Sep 13 13:24:10 2007
@@ -19,7 +19,9 @@
import java.io.IOException;
+import java.net.BindException;
import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
import java.text.NumberFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -66,6 +68,17 @@
static float TASK_ALLOC_EPSILON;
static float PAD_FRACTION;
static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
+ public static enum State { INITIALIZING, RUNNING }
+ State state = State.INITIALIZING;
+
+ /**
+ * A client tried to submit a job before the Job Tracker was ready.
+ */
+ public static class IllegalStateException extends IOException {
+ public IllegalStateException(String msg) {
+ super(msg);
+ }
+ }
/**
* The maximum no. of 'completed' (successful/failed/killed)
@@ -85,9 +98,6 @@
private int nextJobId = 1;
public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobTracker");
-
- private static JobTracker tracker = null;
- private static boolean runTracker = true;
/**
* Start the JobTracker with given configuration.
@@ -99,17 +109,18 @@
* @param conf configuration for the JobTracker.
* @throws IOException
*/
- public static void startTracker(JobConf conf) throws IOException {
- if (tracker != null)
- throw new IOException("JobTracker already running.");
- runTracker = true;
- while (runTracker) {
+ public static JobTracker startTracker(JobConf conf) throws IOException {
+ JobTracker result = null;
+ while (true) {
try {
- tracker = new JobTracker(conf);
+ result = new JobTracker(conf);
break;
- } catch (VersionMismatch v) {
- // Can't recover from a version mismatch. Avoid the retry loop and re-throw
- throw v;
+ } catch (VersionMismatch e) {
+ throw e;
+ } catch (BindException e) {
+ throw e;
+ } catch (UnknownHostException e) {
+ throw e;
} catch (IOException e) {
LOG.warn("Error starting tracker: " +
StringUtils.stringifyException(e));
@@ -117,25 +128,17 @@
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- }
+ }
}
- if (runTracker) {
+ if (result != null) {
JobEndNotifier.startNotifier();
- tracker.offerService();
}
+ return result;
}
- public static JobTracker getTracker() {
- return tracker;
- }
-
- public static void stopTracker() throws IOException {
- runTracker = false;
- if (tracker != null) {
- JobEndNotifier.stopNotifier();
- tracker.close();
- tracker = null;
- }
+ public void stopTracker() throws IOException {
+ JobEndNotifier.stopNotifier();
+ close();
}
public long getProtocolVersion(String protocol,
@@ -426,8 +429,9 @@
private int numReduceTasksCompleted = 0;
private int numJobsSubmitted = 0;
private int numJobsCompleted = 0;
+ private JobTracker tracker;
- JobTrackerMetrics(JobConf conf) {
+ JobTrackerMetrics(JobTracker tracker, JobConf conf) {
String sessionId = conf.getSessionId();
// Initiate JVM Metrics
JvmMetrics.init("JobTracker", sessionId);
@@ -435,6 +439,7 @@
MetricsContext context = MetricsUtil.getContext("mapred");
metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
metricsRecord.setTag("sessionId", sessionId);
+ this.tracker = tracker;
context.registerUpdater(this);
}
@@ -459,14 +464,14 @@
numJobsCompleted = 0;
}
metricsRecord.update();
-
+
if (tracker != null) {
for (JobInProgress jip : tracker.getRunningJobs()) {
jip.updateMetrics();
}
}
}
-
+
synchronized void launchMap() {
++numMapTasksLaunched;
}
@@ -629,13 +634,6 @@
JobConf jobConf = new JobConf(conf);
this.systemDir = jobConf.getSystemDir();
this.fs = FileSystem.get(conf);
- fs.delete(systemDir);
- if (!fs.mkdirs(systemDir)) {
- throw new IOException("Mkdirs failed to create " + systemDir.toString());
- }
-
- // Same with 'localDir' except it's always on the local disk.
- jobConf.deleteLocalFiles(SUBDIR);
// Read the hosts/exclude files to restrict access to the jobtracker.
this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
@@ -648,23 +646,26 @@
int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
this.interTrackerServer = RPC.getServer(this, addr.getHostName(), addr.getPort(), handlerCount, false, conf);
this.interTrackerServer.start();
- Properties p = System.getProperties();
- for (Iterator it = p.keySet().iterator(); it.hasNext();) {
- String key = (String) it.next();
- String val = (String) p.getProperty(key);
- LOG.info("Property '" + key + "' is " + val);
+ if (LOG.isDebugEnabled()) {
+ Properties p = System.getProperties();
+ for (Iterator it = p.keySet().iterator(); it.hasNext();) {
+ String key = (String) it.next();
+ String val = (String) p.getProperty(key);
+ LOG.debug("Property '" + key + "' is " + val);
+ }
}
this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
this.infoBindAddress = conf.get("mapred.job.tracker.info.bindAddress","0.0.0.0");
- this.infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false);
- this.infoServer.start();
+ infoServer = new StatusHttpServer("job", infoBindAddress, infoPort, false);
+ infoServer.setAttribute("job.tracker", this);
+ infoServer.start();
this.startTime = System.currentTimeMillis();
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
trackerIdentifier = dateFormat.format(new Date());
- myMetrics = new JobTrackerMetrics(jobConf);
+ myMetrics = new JobTrackerMetrics(this, jobConf);
this.expireTrackersThread = new Thread(this.expireTrackers,
"expireTrackers");
@@ -683,6 +684,25 @@
this.infoPort = this.infoServer.getPort();
this.conf.setInt("mapred.job.tracker.info.port", this.infoPort);
LOG.info("JobTracker webserver: " + this.infoServer.getPort());
+
+ while (true) {
+ try {
+ fs.delete(systemDir);
+ if (fs.mkdirs(systemDir)) {
+ break;
+ }
+ LOG.error("Mkdirs failed to create " + systemDir);
+ } catch (IOException ie) {
+ LOG.info("problem cleaning system directory: " + systemDir, ie);
+ }
+ }
+
+ // Same with 'localDir' except it's always on the local disk.
+ jobConf.deleteLocalFiles(SUBDIR);
+ synchronized (this) {
+ state = State.RUNNING;
+ }
+ LOG.info("Starting RUNNING");
}
public static InetSocketAddress getAddress(Configuration conf) {
@@ -1456,13 +1476,21 @@
////////////////////////////////////////////////////
/**
+ * Make sure the JobTracker is done initializing.
+ */
+ private synchronized void ensureRunning() throws IllegalStateException {
+ if (state != State.RUNNING) {
+ throw new IllegalStateException("Job tracker still initializing");
+ }
+ }
+
+ /**
* Allocates a new JobId string.
*/
- public String getNewJobId() {
- synchronized (this) {
- return "job_" + getTrackerIdentifier() + "_" +
+ public synchronized String getNewJobId() throws IOException {
+ ensureRunning();
+ return "job_" + getTrackerIdentifier() + "_" +
idFormat.format(nextJobId++);
- }
}
/**
@@ -1478,6 +1506,7 @@
* the right TaskTracker/Block mapping.
*/
public synchronized JobStatus submitJob(String jobFile) throws IOException {
+ ensureRunning();
totalSubmissions++;
JobInProgress job = new JobInProgress(jobFile, this, this.conf);
synchronized (jobs) {
@@ -1526,7 +1555,8 @@
return new ClusterStatus(taskTrackers.size(),
totalMaps,
totalReduces,
- maxCurrentTasks);
+ maxCurrentTasks,
+ state);
}
}
@@ -1816,7 +1846,8 @@
}
try {
- startTracker(new JobConf());
+ JobTracker tracker = startTracker(new JobConf());
+ tracker.offerService();
} catch (Throwable e) {
LOG.fatal(StringUtils.stringifyException(e));
System.exit(-1);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Thu Sep 13 13:24:10 2007
@@ -265,7 +265,7 @@
public LocalJobRunner(JobConf conf) throws IOException {
this.fs = FileSystem.get(conf);
this.conf = conf;
- myMetrics = new JobTrackerMetrics(new JobConf(conf));
+ myMetrics = new JobTrackerMetrics(null, new JobConf(conf));
}
// JobSubmissionProtocol methods
@@ -316,7 +316,8 @@
}
public ClusterStatus getClusterStatus() {
- return new ClusterStatus(1, map_tasks, reduce_tasks, 1);
+ return new ClusterStatus(1, map_tasks, reduce_tasks, 1,
+ JobTracker.State.RUNNING);
}
public JobStatus[] jobsToComplete() {return null;}
Modified: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Thu Sep 13 13:24:10 2007
@@ -20,11 +20,15 @@
import java.io.*;
import java.util.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
/**
* This class creates a single-process Map-Reduce cluster for junit testing.
* One thread is created for each server.
*/
public class MiniMRCluster {
+ private static final Log LOG = LogFactory.getLog(MiniMRCluster.class);
private Thread jobTrackerThread;
private JobTrackerRunner jobTracker;
@@ -43,19 +47,20 @@
* An inner class that runs a job tracker.
*/
class JobTrackerRunner implements Runnable {
+ private JobTracker tracker = null;
JobConf jc = null;
public boolean isUp() {
- return (JobTracker.getTracker() != null);
+ return (tracker != null);
}
public int getJobTrackerPort() {
- return JobTracker.getAddress(jc).getPort();
+ return tracker.getTrackerPort();
}
public int getJobTrackerInfoPort() {
- return jc.getInt("mapred.job.tracker.info.port", 50030);
+ return tracker.getInfoPort();
}
/**
@@ -65,10 +70,10 @@
try {
jc = createJobConf();
jc.set("mapred.local.dir","build/test/mapred/local");
- JobTracker.startTracker(jc);
+ tracker = JobTracker.startTracker(jc);
+ tracker.offerService();
} catch (Throwable e) {
- System.err.println("Job tracker crashed:");
- e.printStackTrace();
+ LOG.error("Job tracker crashed", e);
}
}
@@ -77,10 +82,11 @@
*/
public void shutdown() {
try {
- JobTracker.stopTracker();
+ if (tracker != null) {
+ tracker.stopTracker();
+ }
} catch (Throwable e) {
- System.err.println("Unable to shut down job tracker:");
- e.printStackTrace();
+ LOG.error("Problem shutting down job tracker", e);
}
}
}
@@ -91,16 +97,40 @@
class TaskTrackerRunner implements Runnable {
volatile TaskTracker tt;
int trackerId;
+ JobConf conf = createJobConf();
// the localDirs for this taskTracker
- String[] localDir;
+ String[] localDirs;
volatile boolean isInitialized = false;
volatile boolean isDead = false;
- int numDir;
- TaskTrackerRunner(int trackerId, int numDir) {
+ int numDir;
+
+ TaskTrackerRunner(int trackerId, int numDir) throws IOException {
this.trackerId = trackerId;
this.numDir = numDir;
- // a maximum of 10 local dirs can be specified in MinMRCluster
- localDir = new String[10];
+ localDirs = new String[numDir];
+ conf = createJobConf();
+ conf.setInt("mapred.task.tracker.info.port", 0);
+ conf.setInt("mapred.task.tracker.report.port", taskTrackerPort);
+ File localDirBase =
+ new File(conf.get("mapred.local.dir")).getAbsoluteFile();
+ localDirBase.mkdirs();
+ StringBuffer localPath = new StringBuffer();
+ for(int i=0; i < numDir; ++i) {
+ File ttDir = new File(localDirBase,
+ Integer.toString(trackerId) + "_" + 0);
+ if (!ttDir.mkdirs()) {
+ if (!ttDir.isDirectory()) {
+ throw new IOException("Mkdirs failed to create " + ttDir);
+ }
+ }
+ localDirs[i] = ttDir.toString();
+ if (i != 0) {
+ localPath.append(",");
+ }
+ localPath.append(localDirs[i]);
+ }
+ conf.set("mapred.local.dir", localPath.toString());
+ LOG.info("mapred.local.dir is " + localPath);
}
/**
@@ -108,40 +138,13 @@
*/
public void run() {
try {
- JobConf jc = createJobConf();
- jc.setInt("mapred.task.tracker.info.port", 0);
- jc.setInt("mapred.task.tracker.report.port", taskTrackerPort);
- File localDir = new File(jc.get("mapred.local.dir"));
- String mapredDir = "";
- File ttDir = new File(localDir, Integer.toString(trackerId) + "_" + 0);
- if (!ttDir.mkdirs()) {
- if (!ttDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + ttDir.toString());
- }
- }
- this.localDir[0] = ttDir.getAbsolutePath();
- mapredDir = ttDir.getAbsolutePath();
- for (int i = 1; i < numDir; i++){
- ttDir = new File(localDir, Integer.toString(trackerId) + "_" + i);
- ttDir.mkdirs();
- if (!ttDir.mkdirs()) {
- if (!ttDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + ttDir.toString());
- }
- }
- this.localDir[i] = ttDir.getAbsolutePath();
- mapredDir = mapredDir + "," + ttDir.getAbsolutePath();
- }
- jc.set("mapred.local.dir", mapredDir);
- System.out.println("mapred.local.dir is " + mapredDir);
- tt = new TaskTracker(jc);
+ tt = new TaskTracker(conf);
isInitialized = true;
tt.run();
} catch (Throwable e) {
isDead = true;
tt = null;
- System.err.println("Task tracker crashed:");
- e.printStackTrace();
+ LOG.error("task tracker " + trackerId + " crashed", e);
}
}
@@ -152,11 +155,11 @@
* @return the absolute pathname
*/
public String getLocalDir() {
- return localDir[0];
+ return localDirs[0];
}
public String[] getLocalDirs(){
- return localDir;
+ return localDirs;
}
/**
* Shut down the server and wait for it to finish.
@@ -166,8 +169,8 @@
try {
tt.shutdown();
} catch (Throwable e) {
- System.err.println("Unable to shut down task tracker:");
- e.printStackTrace();
+ LOG.error("task tracker " + trackerId + " could not shut down",
+ e);
}
}
}
@@ -198,10 +201,10 @@
TaskTrackerRunner runner = (TaskTrackerRunner) itr.next();
while (!runner.isDead && (!runner.isInitialized || !runner.tt.isIdle())) {
if (!runner.isInitialized) {
- System.out.println("Waiting for task tracker to start.");
+ LOG.info("Waiting for task tracker to start.");
} else {
- System.out.println("Waiting for task tracker " + runner.tt.getName() +
- " to be idle.");
+ LOG.info("Waiting for task tracker " + runner.tt.getName() +
+ " to be idle.");
}
try {
Thread.sleep(1000);
@@ -270,6 +273,19 @@
jobTracker = new JobTrackerRunner();
jobTrackerThread = new Thread(jobTracker);
+ jobTrackerThread.start();
+ while (!jobTracker.isUp()) {
+ try { // let daemons get started
+ LOG.info("Waiting for JobTracker to start...");
+ Thread.sleep(1000);
+ } catch(InterruptedException e) {
+ }
+ }
+
+ // Set the configuration for the task-trackers
+ this.jobTrackerPort = jobTracker.getJobTrackerPort();
+ this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();
+
// Create the TaskTrackers
for (int idx = 0; idx < numTaskTrackers; idx++) {
TaskTrackerRunner taskTracker = new TaskTrackerRunner(idx, numDir);
@@ -286,19 +302,6 @@
}
}
- jobTrackerThread.start();
- while (!jobTracker.isUp()) {
- try { // let daemons get started
- System.err.println("Waiting for JobTracker to start...");
- Thread.sleep(1000);
- } catch(InterruptedException e) {
- }
- }
-
- // Set the configuration for the task-trackers
- this.jobTrackerPort = jobTracker.getJobTrackerPort();
- this.jobTrackerInfoPort = jobTracker.getJobTrackerInfoPort();
-
if (!taskTrackerFirst) {
for (Thread taskTrackerThread : taskTrackerThreadList){
taskTrackerThread.start();
@@ -323,7 +326,7 @@
try {
taskTrackerThread.join();
} catch (InterruptedException ex) {
- ex.printStackTrace();
+ LOG.error("Problem shutting down task tracker", ex);
}
}
jobTracker.shutdown();
@@ -331,7 +334,7 @@
try {
jobTrackerThread.join();
} catch (InterruptedException ex) {
- ex.printStackTrace();
+ LOG.error("Problem waiting for job tracker to finish", ex);
}
} finally {
File configDir = new File("build", "minimr");
@@ -341,11 +344,11 @@
}
public static void main(String[] args) throws IOException {
- System.out.println("Bringing up Jobtracker and tasktrackers.");
+ LOG.info("Bringing up Jobtracker and tasktrackers.");
MiniMRCluster mr = new MiniMRCluster(4, "local", 1);
- System.out.println("JobTracker and TaskTrackers are up.");
+ LOG.info("JobTracker and TaskTrackers are up.");
mr.shutdown();
- System.out.println("JobTracker and TaskTrackers brought down.");
+ LOG.info("JobTracker and TaskTrackers brought down.");
}
}
Modified: lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobblacklistedtrackers.jsp Thu Sep 13 13:24:10 2007
@@ -8,11 +8,12 @@
import="org.apache.hadoop.util.*"
%>
-<%!
- JobTracker tracker = JobTracker.getTracker();
+<%
+ JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
String trackerName =
StringUtils.simpleHostname(tracker.getJobTrackerMachine());
-
+%>
+<%!
private void printBlackListedTrackers(JspWriter out,
JobInProgress job) throws IOException {
Map<String, Integer> trackerErrors = job.getTaskTrackerErrors();
Modified: lucene/hadoop/trunk/src/webapps/job/jobconf.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobconf.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobconf.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobconf.jsp Thu Sep 13 13:24:10 2007
@@ -10,6 +10,7 @@
<%
+ JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
String jobId = request.getParameter("jobid");
if (jobId == null) {
out.println("<h2>Missing 'jobid' for fetching job configuration!</h2>");
@@ -25,8 +26,6 @@
<h2>Job Configuration: JobId - <%= jobId %></h2><br>
<%
- JobTracker tracker = JobTracker.getTracker();
-
JobInProgress job = (JobInProgress)tracker.getJob(jobId);
if (job == null) {
out.print("<h4>Job '" + jobId + "' not found!</h4><br>\n");
Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Thu Sep 13 13:24:10 2007
@@ -11,15 +11,16 @@
import="org.apache.hadoop.dfs.JspHelper"
%>
+<%
+ JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+ String trackerName =
+ StringUtils.simpleHostname(tracker.getJobTrackerMachine());
+%>
<%!
private static final String PRIVATE_ACTIONS_KEY
= "webinterface.private.actions";
-
- JobTracker tracker = JobTracker.getTracker();
- String trackerName =
- StringUtils.simpleHostname(tracker.getJobTrackerMachine());
-
+
private void printTaskSummary(JspWriter out,
String jobId,
String kind,
Modified: lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobfailures.jsp Thu Sep 13 13:24:10 2007
@@ -8,12 +8,14 @@
import="org.apache.hadoop.util.*"
%>
-<%!
- JobTracker tracker = JobTracker.getTracker();
+<%
+ JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
String trackerName =
StringUtils.simpleHostname(tracker.getJobTrackerMachine());
-
+%>
+<%!
private void printFailedAttempts(JspWriter out,
+ JobTracker tracker,
String jobId,
TaskInProgress tip,
TaskStatus.State failState) throws IOException {
@@ -75,6 +77,7 @@
}
private void printFailures(JspWriter out,
+ JobTracker tracker,
String jobId,
String kind,
String cause) throws IOException {
@@ -122,13 +125,13 @@
if (includeMap) {
TaskInProgress[] tips = job.getMapTasks();
for(int i=0; i < tips.length; ++i) {
- printFailedAttempts(out, jobId, tips[i], state);
+ printFailedAttempts(out, tracker, jobId, tips[i], state);
}
}
if (includeReduce) {
TaskInProgress[] tips = job.getReduceTasks();
for(int i=0; i < tips.length; ++i) {
- printFailedAttempts(out, jobId, tips[i], state);
+ printFailedAttempts(out, tracker, jobId, tips[i], state);
}
}
out.print("</table>\n");
@@ -148,7 +151,7 @@
failures on <a href="jobtracker.jsp"><%=trackerName%></a></h1>
<%
- printFailures(out, jobId, kind, cause);
+ printFailures(out, tracker, jobId, kind, cause);
%>
<hr>
Modified: lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobtasks.jsp Thu Sep 13 13:24:10 2007
@@ -11,15 +11,15 @@
%>
<%! static SimpleDateFormat dateFormat = new SimpleDateFormat("d-MMM-yyyy HH:mm:ss") ; %>
<%
+ JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+ String trackerName =
+ StringUtils.simpleHostname(tracker.getJobTrackerMachine());
String jobid = request.getParameter("jobid");
String type = request.getParameter("type");
String pagenum = request.getParameter("pagenum");
int pnum = Integer.parseInt(pagenum);
int next_page = pnum+1;
int numperpage = 2000;
- JobTracker tracker = JobTracker.getTracker();
- String trackerLabel =
- StringUtils.simpleHostname(tracker.getJobTrackerMachine());
JobInProgress job = (JobInProgress) tracker.getJob(jobid);
JobProfile profile = (job != null) ? (job.getProfile()) : null;
JobStatus status = (job != null) ? (job.getStatus()) : null;
@@ -37,12 +37,12 @@
<html>
<head>
- <title>Hadoop <%=type%> task list for <%=jobid%> on <%=trackerLabel%></title>
+ <title>Hadoop <%=type%> task list for <%=jobid%> on <%=trackerName%></title>
</head>
<body>
<h1>Hadoop <%=type%> task list for
<a href="jobdetails.jsp?jobid=<%=jobid%>"><%=jobid%></a> on
-<a href="jobtracker.jsp"><%=trackerLabel%></a></h1>
+<a href="jobtracker.jsp"><%=trackerName%></a></h1>
<%
if (job == null) {
out.print("<b>Job " + jobid + " not found.</b><br>\n");
Modified: lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobtracker.jsp Thu Sep 13 13:24:10 2007
@@ -8,10 +8,12 @@
import="org.apache.hadoop.mapred.*"
import="org.apache.hadoop.util.*"
%>
-<%!
- JobTracker tracker = JobTracker.getTracker();
- String trackerLabel =
+<%
+ JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+ String trackerName =
StringUtils.simpleHostname(tracker.getJobTrackerMachine());
+%>
+<%!
private static DecimalFormat percentFormat = new DecimalFormat("##0.00");
public void generateJobTable(JspWriter out, String label, Vector jobs, int refresh) throws IOException {
@@ -60,7 +62,8 @@
out.print("</center>\n");
}
- public void generateSummaryTable(JspWriter out) throws IOException {
+ public void generateSummaryTable(JspWriter out,
+ JobTracker tracker) throws IOException {
ClusterStatus status = tracker.getClusterStatus();
out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n"+
"<tr><th>Maps</th><th>Reduces</th>" +
@@ -76,11 +79,12 @@
<html>
-<title><%= trackerLabel %> Hadoop Map/Reduce Administration</title>
+<title><%= trackerName %> Hadoop Map/Reduce Administration</title>
<body>
-<h1><%= trackerLabel %> Hadoop Map/Reduce Administration</h1>
+<h1><%= trackerName %> Hadoop Map/Reduce Administration</h1>
+<b>State:</b> <%= tracker.getClusterStatus().getJobTrackerState() %><br>
<b>Started:</b> <%= new Date(tracker.getStartTime())%><br>
<b>Version:</b> <%= VersionInfo.getVersion()%>,
r<%= VersionInfo.getRevision()%><br>
@@ -92,7 +96,7 @@
<h2>Cluster Summary</h2>
<center>
<%
- generateSummaryTable(out);
+ generateSummaryTable(out, tracker);
%>
</center>
<hr>
Modified: lucene/hadoop/trunk/src/webapps/job/machines.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/machines.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/machines.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/machines.jsp Thu Sep 13 13:24:10 2007
@@ -8,12 +8,14 @@
import="org.apache.hadoop.mapred.*"
import="org.apache.hadoop.util.*"
%>
-<%!
- JobTracker tracker = JobTracker.getTracker();
- String trackerLabel =
+<%
+ JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+ String trackerName =
StringUtils.simpleHostname(tracker.getJobTrackerMachine());
-
- public void generateTaskTrackerTable(JspWriter out) throws IOException {
+%>
+<%!
+ public void generateTaskTrackerTable(JspWriter out,
+ JobTracker tracker) throws IOException {
Collection c = tracker.taskTrackers();
if (c.size() == 0) {
@@ -63,14 +65,14 @@
<html>
-<title><%=trackerLabel%> Hadoop Machine List</title>
+<title><%=trackerName%> Hadoop Machine List</title>
<body>
-<h1><a href="jobtracker.jsp"><%=trackerLabel%></a> Hadoop Machine List</h1>
+<h1><a href="jobtracker.jsp"><%=trackerName%></a> Hadoop Machine List</h1>
<h2>Task Trackers</h2>
<%
- generateTaskTrackerTable(out);
+ generateTaskTrackerTable(out, tracker);
%>
<hr>
Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Thu Sep 13 13:24:10 2007
@@ -27,8 +27,9 @@
+ "\">Cancel</a></td></tr></table></body></html>");
}%>
<%
- JobTracker tracker = JobTracker.getTracker();
+ JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
String jobid = request.getParameter("jobid");
+ JobInProgress job = (JobInProgress) tracker.getJob(jobid);
String tipid = request.getParameter("tipid");
String taskid = request.getParameter("taskid");
@@ -57,7 +58,6 @@
}
}
}
- JobInProgress job = (JobInProgress) tracker.getJob(jobid);
TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(jobid, tipid)
: null;
%>
Modified: lucene/hadoop/trunk/src/webapps/job/taskstats.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskstats.jsp?rev=575438&r1=575437&r2=575438&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskstats.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/taskstats.jsp Thu Sep 13 13:24:10 2007
@@ -12,8 +12,10 @@
import="org.apache.hadoop.util.*"
%>
<%
+ JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
+ String trackerName =
+ StringUtils.simpleHostname(tracker.getJobTrackerMachine());
String jobid = request.getParameter("jobid");
- JobTracker tracker = JobTracker.getTracker();
JobInProgress job = (JobInProgress) tracker.getJob(jobid);
String tipid = request.getParameter("tipid");
String taskid = request.getParameter("taskid");