You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/05/29 00:40:34 UTC
svn commit: r1343412 - in /incubator/hama/trunk: conf/
core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/ipc/
core/src/main/java/org/apache/hama/util/
core/src/main/resources/webapp/groomserver/
Author: edwardyoon
Date: Mon May 28 22:40:33 2012
New Revision: 1343412
URL: http://svn.apache.org/viewvc?rev=1343412&view=rev
Log:
Task's error logs should be displayed on client-end when job is failed.
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
incubator/hama/trunk/core/src/main/resources/webapp/groomserver/
incubator/hama/trunk/core/src/main/resources/webapp/groomserver/groomserver.jsp
incubator/hama/trunk/core/src/main/resources/webapp/groomserver/index.html
Modified:
incubator/hama/trunk/conf/hama-default.xml
incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Mon May 28 22:40:33 2012
@@ -52,6 +52,12 @@
</description>
</property>
<property>
+ <name>bsp.http.groomserver.port</name>
+ <value>40015</value>
+ <description>The port where the web-interface can be seen.
+ </description>
+ </property>
+ <property>
<name>bsp.groom.report.address</name>
<value>127.0.0.1:0</value>
<description>The interface and port that groom server listens on.
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java Mon May 28 22:40:33 2012
@@ -109,4 +109,6 @@ public interface Constants {
* An empty instance.
*/
static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+ public static final int DEFAULT_GROOM_INFO_SERVER = 40015;
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon May 28 22:40:33 2012
@@ -17,11 +17,19 @@
*/
package org.apache.hama.bsp;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -40,13 +48,12 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Shell;
@@ -203,6 +210,11 @@ public class BSPJobClient extends Config
throws IOException {
jobSubmitClient.killTask(taskId, shouldFail);
}
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) {
+ return jobSubmitClient.getTaskCompletionEvents(getID(), startFrom, 10);
+ }
}
public BSPJobClient(Configuration conf) throws IOException {
@@ -218,8 +230,8 @@ public class BSPJobClient extends Config
if (masterAdress != null && !masterAdress.equals("local")) {
this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
JobSubmissionProtocol.class, HamaRPCProtocolVersion.versionID,
- BSPMaster.getAddress(conf), conf,
- NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+ BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(conf,
+ JobSubmissionProtocol.class));
} else {
LOG.debug("Using local BSP runner.");
this.jobSubmitClient = new LocalBSPRunner(conf);
@@ -377,7 +389,7 @@ public class BSPJobClient extends Config
}
}
- @SuppressWarnings({ "rawtypes", "unchecked" })
+ @SuppressWarnings( { "rawtypes", "unchecked" })
protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
InputSplit[] splits = job.getInputFormat().getSplits(
job,
@@ -416,8 +428,8 @@ public class BSPJobClient extends Config
job, null);
CompressionCodec codec = null;
if (outputCompressorClass != null) {
- codec = ReflectionUtils.newInstance(outputCompressorClass,
- job.getConf());
+ codec = ReflectionUtils.newInstance(outputCompressorClass, job
+ .getConf());
}
try {
@@ -471,7 +483,7 @@ public class BSPJobClient extends Config
/**
* Get the {@link CompressionType} for the output {@link SequenceFile}.
*
- * @param job the {@link Job}
+ * @param job the {@link BSPJob}
* @return the {@link CompressionType} for the output {@link SequenceFile},
* defaulting to {@link CompressionType#RECORD}
*/
@@ -487,7 +499,7 @@ public class BSPJobClient extends Config
/**
* Get the {@link CompressionCodec} for compressing the job outputs.
*
- * @param job the {@link Job} to look in
+ * @param job the {@link BSPJob} to look in
* @param defaultValue the {@link CompressionCodec} to return if not set
* @return the {@link CompressionCodec} to be used to compress the job outputs
* @throws IllegalArgumentException if the class was specified, but not found
@@ -604,21 +616,75 @@ public class BSPJobClient extends Config
LOG.info(report);
lastReport = report;
}
+
+ int eventCounter = 0;
+ TaskCompletionEvent[] events = info.getTaskCompletionEvents(eventCounter);
+ eventCounter += events.length;
+
+ for(TaskCompletionEvent event : events){
+ if (event.getTaskStatus() ==
+ TaskCompletionEvent.Status.FAILED){
+
+ // Displaying the task logs
+ displayTaskLogs(event.getTaskAttemptId(), event.getGroomServerInfo());
+ }
+ }
}
if (job.isSuccessful()) {
LOG.info("The total number of supersteps: " + info.getSuperstepCount());
- info.getStatus()
- .getCounter()
- .incrCounter(BSPPeerImpl.PeerCounter.SUPERSTEPS,
- info.getSuperstepCount());
+ info.getStatus().getCounter().incrCounter(
+ BSPPeerImpl.PeerCounter.SUPERSTEPS, info.getSuperstepCount());
info.getStatus().getCounter().log(LOG);
} else {
LOG.info("Job failed.");
}
+
return job.isSuccessful();
}
+ static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
+ return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId);
+ }
+
+ private void displayTaskLogs(TaskAttemptID taskId,
+ String baseUrl)
+ throws MalformedURLException {
+ // The tasktracker for a 'failed/killed' job might not be around...
+ if (baseUrl != null) {
+ // Construct the url for the tasklogs
+ String taskLogUrl = getTaskLogURL(taskId, baseUrl);
+
+ // Copy tasks's stdout of the JobClient
+ getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
+ }
+ }
+
+ private static void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl,
+ OutputStream out) {
+ try {
+ URLConnection connection = taskLogUrl.openConnection();
+ BufferedReader input =
+ new BufferedReader(new InputStreamReader(connection.getInputStream()));
+ BufferedWriter output =
+ new BufferedWriter(new OutputStreamWriter(out));
+ try {
+ String logData = null;
+ while ((logData = input.readLine()) != null) {
+ if (logData.length() > 0) {
+ output.write(taskId + ": " + logData + "\n");
+ output.flush();
+ }
+ }
+ } finally {
+ input.close();
+ }
+ }catch(IOException ioe){
+ LOG.warn("Error reading task output" + ioe.getMessage());
+ }
+ }
+
+
/**
* Grab the bspmaster system directory path where job-specific files are to be
* placed.
@@ -660,7 +726,9 @@ public class BSPJobClient extends Config
if (running.isSuccessful()) {
LOG.info("Job complete: " + jobId);
- LOG.info("The total number of supersteps: " + running.getSuperstepCount());
+ LOG
+ .info("The total number of supersteps: "
+ + running.getSuperstepCount());
running.getStatus().getCounter().log(LOG);
} else {
LOG.info("Job failed.");
@@ -813,9 +881,8 @@ public class BSPJobClient extends Config
System.out.println("Job name: " + job.getJobName());
System.out.printf("States are:\n\tRunning : 1\tSucceded : 2"
+ "\tFailed : 3\tPrep : 4\n");
- System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(),
- jobStatus.getRunState(), jobStatus.getStartTime(),
- jobStatus.getUsername());
+ System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(), jobStatus
+ .getRunState(), jobStatus.getStartTime(), jobStatus.getUsername());
exitCode = 0;
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Mon May 28 22:40:33 2012
@@ -66,7 +66,7 @@ import org.apache.zookeeper.data.Stat;
* jobs.
*/
public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
-GroomServerManager, Watcher {
+ GroomServerManager, Watcher {
public static final Log LOG = LogFactory.getLog(BSPMaster.class);
public static final String localModeMessage = "Local mode detected, no launch of the daemon needed.";
@@ -86,7 +86,8 @@ GroomServerManager, Watcher {
static long JOBINIT_SLEEP_INTERVAL = 2000;
// States
- final AtomicReference<State> state = new AtomicReference<State>(State.INITIALIZING);
+ final AtomicReference<State> state = new AtomicReference<State>(
+ State.INITIALIZING);
// Attributes
String masterIdentifier;
@@ -240,12 +241,12 @@ GroomServerManager, Watcher {
* Start the BSPMaster process, listen on the indicated hostname/port
*/
public BSPMaster(HamaConfiguration conf) throws IOException,
- InterruptedException {
+ InterruptedException {
this(conf, generateNewIdentifier());
}
BSPMaster(HamaConfiguration conf, String identifier) throws IOException,
- InterruptedException {
+ InterruptedException {
this.conf = conf;
this.masterIdentifier = identifier;
@@ -459,8 +460,8 @@ GroomServerManager, Watcher {
*/
private void initZK(HamaConfiguration conf) {
try {
- zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
- conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+ zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf), conf
+ .getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
} catch (IOException e) {
LOG.error("Exception during reinitialization!", e);
}
@@ -491,41 +492,42 @@ GroomServerManager, Watcher {
}
/**
- * Clears all sub-children of node bspRoot
+ * Clears all sub-children of node bspRoot
*/
- public void clearZKNodes(){
+ public void clearZKNodes() {
try {
Stat s = zk.exists(bspRoot, false);
- if(s != null){
+ if (s != null) {
clearZKNodes(bspRoot);
- }
+ }
} catch (Exception e) {
LOG.warn("Could not clear zookeeper nodes.", e);
- }
+ }
}
/**
* Clears all sub-children of node rooted at path.
+ *
* @param path
- * @throws InterruptedException
- * @throws KeeperException
+ * @throws InterruptedException
+ * @throws KeeperException
*/
- private void clearZKNodes(String path) throws KeeperException, InterruptedException{
+ private void clearZKNodes(String path) throws KeeperException,
+ InterruptedException {
ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
- if(list.size() == 0){
+ if (list.size() == 0) {
return;
- }else{
- for(String node:list){
- clearZKNodes(path+"/"+node);
- zk.delete(path+"/"+node, -1); //delete any version of this node.
+ } else {
+ for (String node : list) {
+ clearZKNodes(path + "/" + node);
+ zk.delete(path + "/" + node, -1); // delete any version of this node.
}
}
}
-
public void createJobRoot(String string) {
try {
zk.create("/" + string, new byte[0], Ids.OPEN_ACL_UNSAFE,
@@ -657,8 +659,8 @@ GroomServerManager, Watcher {
for (Map.Entry<GroomServerStatus, GroomProtocol> entry : groomServers
.entrySet()) {
GroomServerStatus s = entry.getKey();
- groomsMap.put(s.getGroomHostName() + ":" + Constants.DEFAULT_PEER_PORT,
- s);
+ groomsMap.put(s.getGroomHostName() + ":"
+ + Constants.DEFAULT_GROOM_INFO_SERVER, s);
}
}
@@ -666,7 +668,8 @@ GroomServerManager, Watcher {
this.totalTaskCapacity = tasksPerGroom * numGroomServers;
if (detailed) {
- return new ClusterStatus(groomsMap, totalTasks, totalTaskCapacity, state.get());
+ return new ClusterStatus(groomsMap, totalTasks, totalTaskCapacity, state
+ .get());
} else {
return new ClusterStatus(numGroomServers, totalTasks, totalTaskCapacity,
state.get());
@@ -868,4 +871,22 @@ GroomServerManager, Watcher {
// TODO Auto-generated method stub
}
+
+ TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(BSPJobID jobid,
+ int fromEventId, int maxEvents) {
+ synchronized (this) {
+ JobInProgress job = this.jobs.get(jobid);
+ if (null != job) {
+ if (job.areTasksInited()) {
+ return job.getTaskCompletionEvents(fromEventId, maxEvents);
+ } else {
+ return EMPTY_EVENTS;
+ }
+ }
+ }
+ return null;
+ }
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Mon May 28 22:40:33 2012
@@ -59,6 +59,7 @@ import org.apache.hadoop.util.StringUtil
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.http.HttpServer;
import org.apache.hama.ipc.BSPPeerProtocol;
import org.apache.hama.ipc.GroomProtocol;
import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -93,6 +94,7 @@ public class GroomServer implements Runn
NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
};
+ private HttpServer server;
private static ZooKeeper zk = null;
// Running States and its related things
@@ -236,8 +238,8 @@ public class GroomServer implements Runn
private BSPTasksMonitor() {
- outOfContactTasks = new ArrayList<GroomServer.TaskInProgress>(
- conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3));
+ outOfContactTasks = new ArrayList<GroomServer.TaskInProgress>(conf
+ .getInt(Constants.MAX_TASKS_PER_GROOM, 3));
}
@Override
@@ -257,9 +259,9 @@ public class GroomServer implements Runn
LOG.debug("Purging task " + tip);
purgeTask(tip, true);
} catch (Exception e) {
- LOG.error(
- new StringBuilder("Error while removing a timed-out task - ")
- .append(tip.toString()), e);
+ LOG.error(new StringBuilder(
+ "Error while removing a timed-out task - ")
+ .append(tip.toString()), e);
}
}
@@ -283,8 +285,8 @@ public class GroomServer implements Runn
// this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
try {
- zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
- conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+ zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf), conf
+ .getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
} catch (IOException e) {
LOG.error("Exception during reinitialization!", e);
}
@@ -296,9 +298,8 @@ public class GroomServer implements Runn
}
if (localHostname == null) {
- this.localHostname = DNS.getDefaultHost(
- conf.get("bsp.dns.interface", "default"),
- conf.get("bsp.dns.nameserver", "default"));
+ this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
+ "default"), conf.get("bsp.dns.nameserver", "default"));
}
// check local disk
checkLocalDirs(getLocalDirs());
@@ -330,6 +331,20 @@ public class GroomServer implements Runn
LOG.info("Worker rpc server --> " + rpcServer);
}
+ server = new HttpServer("groomserver", rpcAddr, conf.getInt(
+ "bsp.http.groomserver.port", Constants.DEFAULT_GROOM_INFO_SERVER),
+ true, conf);
+
+ FileSystem local = FileSystem.getLocal(conf);
+ server.setAttribute("groom.server", this);
+ server.setAttribute("local.file.system", local);
+ server.setAttribute("conf", conf);
+ server.setAttribute("log", LOG);
+ server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
+
+ LOG.info("starting webserver: " + rpcAddr);
+ server.start();
+
@SuppressWarnings("deprecation")
String address = NetUtils.getServerAddress(conf,
"bsp.groom.report.bindAddress", "bsp.groom.report.port",
@@ -393,6 +408,8 @@ public class GroomServer implements Runn
this.running = true;
this.initialized = true;
+
+ // FIXME
}
/** Return the port at which the tasktracker bound to */
@@ -687,18 +704,20 @@ public class GroomServer implements Runn
.entrySet()) {
TaskInProgress tip = entry.getValue();
if (LOG.isDebugEnabled())
- LOG.debug("checking task: "
- + tip.getTask().getTaskID()
- + " starttime ="
- + tip.startTime
- + " lastping = "
- + tip.lastPingedTimestamp
- + " run state = "
- + tip.taskStatus.getRunState().toString()
- + " monitorPeriod = "
- + monitorPeriod
- + " check = "
- + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
+ LOG
+ .debug("checking task: "
+ + tip.getTask().getTaskID()
+ + " starttime ="
+ + tip.startTime
+ + " lastping = "
+ + tip.lastPingedTimestamp
+ + " run state = "
+ + tip.taskStatus.getRunState().toString()
+ + " monitorPeriod = "
+ + monitorPeriod
+ + " check = "
+ + (tip.taskStatus.getRunState()
+ .equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
// Task is out of contact if it has not pinged since more than
// monitorPeriod. A task is given a leeway of 10 times monitorPeriod
@@ -1056,6 +1075,24 @@ public class GroomServer implements Runn
}
}
+ public String getGroomServerName() {
+ return this.groomServerName;
+ }
+
+ /**
+ * Get the list of tasks that will be reported back to the job tracker in the
+ * next heartbeat cycle.
+ *
+ * @return a copy of the list of TaskStatus objects
+ */
+ public synchronized List<TaskStatus> getRunningTaskStatuses() {
+ List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
+ for (TaskInProgress tip : runningTasks.values()) {
+ result.add(tip.getStatus());
+ }
+ return result;
+ }
+
/**
* The main() for BSPPeer child processes.
*/
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Mon May 28 22:40:33 2012
@@ -19,8 +19,10 @@ package org.apache.hama.bsp;
import java.io.DataInputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -29,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hama.Constants;
/**
* JobInProgress maintains all the info for keeping a Job on the straight and
@@ -54,7 +57,6 @@ class JobInProgress {
static final Log LOG = LogFactory.getLog(JobInProgress.class);
boolean tasksInited = false;
- boolean jobInited = false;
Configuration conf;
JobProfile profile;
@@ -89,6 +91,8 @@ class JobInProgress {
// Used only for scheduling!
Map<GroomServerStatus, Integer> tasksInGroomMap;
+ private int taskCompletionEventTracker = 0;
+
public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
Configuration conf) throws IOException {
this.conf = conf;
@@ -101,8 +105,8 @@ class JobInProgress {
this.tasksInGroomMap = new HashMap<GroomServerStatus, Integer>();
- this.status = new JobStatus(jobId, null, 0L, 0L,
- JobStatus.State.PREP.value(), counters);
+ this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP
+ .value(), counters);
this.startTime = System.currentTimeMillis();
this.superstepCounter = 0;
this.restartCount = 0;
@@ -119,9 +123,11 @@ class JobInProgress {
this.jobSplit = job.getConf().get("bsp.job.split.file");
this.numBSPTasks = job.getNumBspTask();
+ this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
+ numBSPTasks + 10);
- this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
- job.getJobName());
+ this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
+ .getJobName());
this.setJobName(job.getJobName());
@@ -303,9 +309,9 @@ class JobInProgress {
}
if (allDone) {
- this.status = new JobStatus(this.status.getJobID(),
- this.profile.getUser(), superstepCounter, superstepCounter,
- superstepCounter, JobStatus.SUCCEEDED, superstepCounter, counters);
+ this.status = new JobStatus(this.status.getJobID(), this.profile
+ .getUser(), superstepCounter, superstepCounter, superstepCounter,
+ JobStatus.SUCCEEDED, superstepCounter, counters);
this.finishTime = System.currentTimeMillis();
this.status.setFinishTime(this.finishTime);
@@ -339,9 +345,8 @@ class JobInProgress {
// Kill job
this.kill();
// Send KillTaskAction to GroomServer
- this.status = new JobStatus(this.status.getJobID(),
- this.profile.getUser(), 0L, 0L, 0L, JobStatus.KILLED,
- superstepCounter, counters);
+ this.status = new JobStatus(this.status.getJobID(), this.profile
+ .getUser(), 0L, 0L, 0L, JobStatus.KILLED, superstepCounter, counters);
this.finishTime = System.currentTimeMillis();
this.status.setFinishTime(this.finishTime);
@@ -353,8 +358,40 @@ class JobInProgress {
public synchronized void updateTaskStatus(TaskInProgress tip,
TaskStatus taskStatus) {
+ TaskAttemptID taskid = taskStatus.getTaskId();
+
tip.updateStatus(taskStatus); // update tip
+ TaskStatus.State state = taskStatus.getRunState();
+ TaskCompletionEvent taskEvent = null;
+ // FIXME port number should be configurable
+ String httpTaskLogLocation = "http://"
+ + tip.getGroomServerStatus().getGroomHostName() + ":"
+ + conf.getInt("bsp.http.groomserver.port", Constants.DEFAULT_GROOM_INFO_SERVER);
+
+ if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) {
+ int eventNumber;
+ if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
+ TaskCompletionEvent t = this.taskCompletionEvents.get(eventNumber);
+ if (t.getTaskAttemptId().equals(taskid))
+ t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
+ }
+
+ // Did the task failure lead to tip failure?
+ TaskCompletionEvent.Status taskCompletionStatus = (state == TaskStatus.State.FAILED) ? TaskCompletionEvent.Status.FAILED
+ : TaskCompletionEvent.Status.KILLED;
+ if (tip.isFailed()) {
+ taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
+ }
+ taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, taskid,
+ tip.idWithinJob(), taskCompletionStatus, httpTaskLogLocation);
+
+ if (taskEvent != null) {
+ this.taskCompletionEvents.add(taskEvent);
+ taskCompletionEventTracker++;
+ }
+ }
+
if (superstepCounter < taskStatus.getSuperstepCount()) {
superstepCounter = taskStatus.getSuperstepCount();
// TODO Later, we have to update JobInProgress status here
@@ -431,4 +468,22 @@ class JobInProgress {
return counters;
}
+ List<TaskCompletionEvent> taskCompletionEvents;
+
+ synchronized int getNumTaskCompletionEvents() {
+ return taskCompletionEvents.size();
+ }
+
+ public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId,
+ int maxEvents) {
+ TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
+ if (taskCompletionEvents.size() > fromEventId) {
+ int actualMax = Math.min(maxEvents,
+ (taskCompletionEvents.size() - fromEventId));
+ events = taskCompletionEvents.subList(fromEventId,
+ actualMax + fromEventId).toArray(events);
+ }
+ return events;
+ }
+
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon May 28 22:40:33 2012
@@ -529,4 +529,10 @@ public class LocalBSPRunner implements J
}
}
+
+ @Override
+ public TaskCompletionEvent[] getTaskCompletionEvents(BSPJobID id,
+ int startFrom, int i) {
+ return TaskCompletionEvent.EMPTY_ARRAY;
+ }
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java Mon May 28 22:40:33 2012
@@ -118,4 +118,6 @@ public interface RunningJob {
* @return the latest status of the job.
*/
public JobStatus getStatus();
+
+ public TaskCompletionEvent[] getTaskCompletionEvents(int eventCounter);
}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java?rev=1343412&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java Mon May 28 22:40:33 2012
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class TaskCompletionEvent implements Writable {
+ static public enum Status {
+ FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED
+ };
+
+ private int eventId;
+ private String groomServerInfo;
+ private int taskRunTime; // using int since runtime is the time difference
+ private TaskAttemptID taskId;
+ Status status;
+ private int idWithinJob;
+ public static final TaskCompletionEvent[] EMPTY_ARRAY = new TaskCompletionEvent[0];
+
+ /**
+ * Default constructor for Writable.
+ *
+ */
+ public TaskCompletionEvent() {
+ taskId = new TaskAttemptID();
+ }
+
+ /**
+ * Constructor. eventId should be created externally and incremented per event
+ * for each job.
+ *
+ * @param eventId event id, event id should be unique and assigned in
+ * incrementally, starting from 0.
+ * @param taskId task id
+ * @param status task's status
+ * @param taskTrackerHttp task tracker's host:port for http.
+ */
+ public TaskCompletionEvent(int eventId, TaskAttemptID taskId,
+ int idWithinJob, Status status, String groomServerInfo) {
+
+ this.taskId = taskId;
+ this.idWithinJob = idWithinJob;
+ this.eventId = eventId;
+ this.status = status;
+ this.groomServerInfo = groomServerInfo;
+ }
+
+ /**
+ * Returns event Id.
+ *
+ * @return event id
+ */
+ public int getEventId() {
+ return eventId;
+ }
+
+ /**
+ * Returns task id.
+ *
+ * @return task id
+ * @deprecated use {@link #getTaskAttemptId()} instead.
+ */
+ @Deprecated
+ public String getTaskId() {
+ return taskId.toString();
+ }
+
+ /**
+ * Returns task id.
+ *
+ * @return task id
+ */
+ public TaskAttemptID getTaskAttemptId() {
+ return taskId;
+ }
+
+ /**
+ * Returns enum Status.SUCESS or Status.FAILURE.
+ *
+ * @return task tracker status
+ */
+ public Status getTaskStatus() {
+ return status;
+ }
+
+ /**
+ * http location of the groomserver where this task ran.
+ *
+ * @return http location of groomserver tasklogs
+ */
+ public String getGroomServerInfo() {
+ return groomServerInfo;
+ }
+
+ /**
+ * Returns time (in millisec) the task took to complete.
+ */
+ public int getTaskRunTime() {
+ return taskRunTime;
+ }
+
+ /**
+ * Set the task completion time
+ *
+ * @param taskCompletionTime time (in millisec) the task took to complete
+ */
+ public void setTaskRunTime(int taskCompletionTime) {
+ this.taskRunTime = taskCompletionTime;
+ }
+
+ /**
+ * set event Id. should be assigned incrementally starting from 0.
+ *
+ * @param eventId
+ */
+ public void setEventId(int eventId) {
+ this.eventId = eventId;
+ }
+
+ /**
+ * Sets task id.
+ *
+ * @param taskId
+ * @deprecated use {@link #setTaskID(TaskAttemptID)} instead.
+ */
+ @Deprecated
+ public void setTaskId(String taskId) {
+ this.taskId = TaskAttemptID.forName(taskId);
+ }
+
+ /**
+ * Sets task id.
+ *
+ * @param taskId
+ */
+ public void setTaskID(TaskAttemptID taskId) {
+ this.taskId = taskId;
+ }
+
+ /**
+ * Set task status.
+ *
+ * @param status
+ */
+ public void setTaskStatus(Status status) {
+ this.status = status;
+ }
+
+ /**
+ * Set task tracker http location.
+ *
+ * @param taskTrackerHttp
+ */
+ public void setTaskTrackerHttp(String taskTrackerHttp) {
+ this.groomServerInfo = taskTrackerHttp;
+ }
+
+ @Override
+ public String toString() {
+ StringBuffer buf = new StringBuffer();
+ buf.append("Task Id : ");
+ buf.append(taskId);
+ buf.append(", Status : ");
+ buf.append(status.name());
+ return buf.toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null)
+ return false;
+ if (o.getClass().equals(TaskCompletionEvent.class)) {
+ TaskCompletionEvent event = (TaskCompletionEvent) o;
+ return this.eventId == event.getEventId()
+ && this.idWithinJob == event.idWithinJob()
+ && this.status.equals(event.getTaskStatus())
+ && this.taskId.equals(event.getTaskAttemptId())
+ && this.taskRunTime == event.getTaskRunTime()
+ && this.groomServerInfo.equals(event.getGroomServerInfo());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ public int idWithinJob() {
+ return idWithinJob;
+ }
+
+ // ////////////////////////////////////////////
+ // Writable
+ // ////////////////////////////////////////////
+ public void write(DataOutput out) throws IOException {
+ taskId.write(out);
+ WritableUtils.writeVInt(out, idWithinJob);
+ WritableUtils.writeEnum(out, status);
+ WritableUtils.writeString(out, groomServerInfo);
+ WritableUtils.writeVInt(out, taskRunTime);
+ WritableUtils.writeVInt(out, eventId);
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ taskId.readFields(in);
+ idWithinJob = WritableUtils.readVInt(in);
+ status = WritableUtils.readEnum(in, Status.class);
+ groomServerInfo = WritableUtils.readString(in);
+ taskRunTime = WritableUtils.readVInt(in);
+ eventId = WritableUtils.readVInt(in);
+ }
+}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Mon May 28 22:40:33 2012
@@ -51,13 +51,14 @@ class TaskInProgress {
private TaskID id;
private JobInProgress job;
private int completes = 0;
-
+
private GroomServerStatus myGroomStatus = null;
// Status
// private double progress = 0;
// private String state = "";
private long startTime = 0;
+ private int successEventNumber = -1;
// The 'next' usable taskid of this tip
int nextTaskId = 0;
@@ -81,7 +82,7 @@ class TaskInProgress {
private BSPJobID jobId;
private RawSplit rawSplit;
-
+
/**
* Constructor for new nexus between BSPMaster and GroomServer.
*
@@ -97,8 +98,8 @@ class TaskInProgress {
init(jobId);
}
- public TaskInProgress(BSPJobID jobId, String jobFile, RawSplit rawSplit, BSPMaster master,
- Configuration conf, JobInProgress job, int partition) {
+ public TaskInProgress(BSPJobID jobId, String jobFile, RawSplit rawSplit,
+ BSPMaster master, Configuration conf, JobInProgress job, int partition) {
this.jobId = jobId;
this.jobFile = jobFile;
this.rawSplit = rawSplit;
@@ -114,11 +115,11 @@ class TaskInProgress {
this.id = new TaskID(jobId, partition);
this.startTime = System.currentTimeMillis();
}
-
+
/**
* Return a Task that can be sent to a GroomServer for execution.
*/
- public Task getTaskToRun(Map<String, GroomServerStatus> grooms,
+ public Task getTaskToRun(Map<String, GroomServerStatus> grooms,
Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException {
Task t = null;
@@ -133,44 +134,44 @@ class TaskInProgress {
+ " attempts for the tip '" + getTIPId() + "'");
return null;
}
-
+
String splitClass = null;
BytesWritable split = null;
GroomServerStatus selectedGroom = null;
- if(rawSplit != null){
+ if (rawSplit != null) {
splitClass = rawSplit.getClassName();
split = rawSplit.getBytes();
String[] possibleLocations = rawSplit.getLocations();
- for (int i = 0; i < possibleLocations.length; ++i){
+ for (int i = 0; i < possibleLocations.length; ++i) {
String location = possibleLocations[i];
GroomServerStatus groom = grooms.get(location);
Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null)?0:taskInGroom;
- if(taskInGroom < groom.getMaxTasks() &&
- location.equals(groom.getGroomHostName())){
- selectedGroom = groom;
- t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
- activeTasks.put(taskid, groom.getGroomName());
-
- break;
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()
+ && location.equals(groom.getGroomHostName())) {
+ selectedGroom = groom;
+ t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+ activeTasks.put(taskid, groom.getGroomName());
+
+ break;
}
}
}
- //Failed in attempt to get data locality or there was no input split.
- if(selectedGroom == null){
+ // Failed in attempt to get data locality or there was no input split.
+ if (selectedGroom == null) {
Iterator<String> groomIter = grooms.keySet().iterator();
- while(groomIter.hasNext()) {
+ while (groomIter.hasNext()) {
GroomServerStatus groom = grooms.get(groomIter.next());
Integer taskInGroom = tasksInGroomMap.get(groom);
- taskInGroom = (taskInGroom == null)?0:taskInGroom;
- if(taskInGroom < groom.getMaxTasks()){
+ taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+ if (taskInGroom < groom.getMaxTasks()) {
selectedGroom = groom;
t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
activeTasks.put(taskid, groom.getGroomName());
}
}
}
-
+
myGroomStatus = selectedGroom;
return t;
@@ -204,8 +205,8 @@ class TaskInProgress {
public TreeMap<TaskAttemptID, String> getTasks() {
return activeTasks;
}
-
- public GroomServerStatus getGroomServerStatus(){
+
+ public GroomServerStatus getGroomServerStatus() {
return myGroomStatus;
}
@@ -278,7 +279,7 @@ class TaskInProgress {
this.completes++;
}
-
+
public void terminated(TaskAttemptID taskid) {
LOG.info("Task '" + taskid.getTaskID().toString() + "' has failed.");
@@ -339,4 +340,29 @@ class TaskInProgress {
return bspMaster;
}
+ /**
+ * Set the event number that was raised for this tip
+ */
+ public void setSuccessEventNumber(int eventNumber) {
+ successEventNumber = eventNumber;
+ }
+
+ /**
+ * Get the event number that was raised for this tip
+ */
+ public int getSuccessEventNumber() {
+ return successEventNumber;
+ }
+
+ /**
+ * @return int the tip index
+ */
+ public int idWithinJob() {
+ return partition;
+ }
+
+ public String machineWhereTaskRan(TaskAttemptID taskid) {
+ return taskStatuses.get(taskid).getGroomServer();
+ }
+
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Mon May 28 22:40:33 2012
@@ -37,7 +37,7 @@ public class TaskLog {
private static final Log LOG = LogFactory.getLog(TaskLog.class.getName());
private static final File LOG_DIR = new File(
- System.getProperty("hama.log.dir"), "userlogs").getAbsoluteFile();
+ System.getProperty("hama.log.dir"), "tasklogs").getAbsoluteFile();
static {
if (!LOG_DIR.exists()) {
@@ -46,7 +46,7 @@ public class TaskLog {
}
public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
- return new File(new File(LOG_DIR, taskid.toString()), filter.toString());
+ return new File(LOG_DIR, taskid.getJobID() + "/" + taskid.toString() + ".log");
}
/**
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java?rev=1343412&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java Mon May 28 22:40:33 2012
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.util.StringUtils;
+
+public class TaskLogServlet extends HttpServlet {
+ private static final long serialVersionUID = -6615764817774487321L;
+
+ private boolean haveTaskLog(TaskAttemptID taskId, TaskLog.LogName type) {
+ File f = TaskLog.getTaskLogFile(taskId, type);
+ return f.canRead();
+ }
+
+ /**
+ * Construct the taskLogUrl
+ *
+ * @param taskTrackerHostName
+ * @param httpPort
+ * @param taskAttemptID
+ * @return the taskLogUrl
+ */
+ public static String getTaskLogUrl(String taskTrackerHostName,
+ String httpPort, String taskAttemptID) {
+ return ("http://" + taskTrackerHostName + ":" + httpPort
+ + "/tasklog?taskid=" + taskAttemptID);
+ }
+
+ /**
+ * Find the next quotable character in the given array.
+ *
+ * @param data the bytes to look in
+ * @param offset the first index to look in
+ * @param end the index after the last one to look in
+ * @return the index of the quotable character or end if none was found
+ */
+ private static int findFirstQuotable(byte[] data, int offset, int end) {
+ while (offset < end) {
+ switch (data[offset]) {
+ case '<':
+ case '>':
+ case '&':
+ return offset;
+ default:
+ offset += 1;
+ }
+ }
+ return offset;
+ }
+
+ private static void quotedWrite(OutputStream out, byte[] data, int offset,
+ int length) throws IOException {
+ int end = offset + length;
+ while (offset < end) {
+ int next = findFirstQuotable(data, offset, end);
+ out.write(data, offset, next - offset);
+ offset = next;
+ if (offset < end) {
+ switch (data[offset]) {
+ case '<':
+ out.write("<".getBytes());
+ break;
+ case '>':
+ out.write(">".getBytes());
+ break;
+ case '&':
+ out.write("&".getBytes());
+ break;
+ default:
+ out.write(data[offset]);
+ break;
+ }
+ offset += 1;
+ }
+ }
+ }
+
+ private void printTaskLog(HttpServletResponse response, OutputStream out,
+ TaskAttemptID taskId, long start, long end, boolean plainText,
+ TaskLog.LogName filter, boolean isCleanup) throws IOException {
+ if (!plainText) {
+ out.write(("<br><b><u>" + filter + " logs</u></b><br>\n" + "<pre>\n")
+ .getBytes());
+ }
+
+ try {
+ InputStream taskLogReader = new TaskLog.Reader(taskId, filter, start, end);
+ byte[] b = new byte[65536];
+ int result;
+ while (true) {
+ result = taskLogReader.read(b);
+ if (result > 0) {
+ if (plainText) {
+ out.write(b, 0, result);
+ } else {
+ quotedWrite(out, b, 0, result);
+ }
+ } else {
+ break;
+ }
+ }
+ taskLogReader.close();
+ if (!plainText) {
+ out.write("</pre></td></tr></table><hr><br>\n".getBytes());
+ }
+ } catch (IOException ioe) {
+ if (filter == TaskLog.LogName.DEBUGOUT) {
+ if (!plainText) {
+ out.write("</pre><hr><br>\n".getBytes());
+ }
+ // do nothing
+ } else {
+ response.sendError(HttpServletResponse.SC_GONE, "Failed to retrieve "
+ + filter + " log for task: " + taskId);
+ out.write(("TaskLogServlet exception:\n"
+ + StringUtils.stringifyException(ioe) + "\n").getBytes());
+ }
+ }
+ }
+
+ /**
+ * Get the logs via http.
+ */
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ long start = 0;
+ long end = -1;
+ boolean plainText = false;
+ TaskLog.LogName filter = null;
+ boolean isCleanup = false;
+
+ String taskIdStr = request.getParameter("taskid");
+ if (taskIdStr == null) {
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST,
+ "Argument taskid is required");
+ return;
+ }
+ TaskAttemptID taskId = TaskAttemptID.forName(taskIdStr);
+ String logFilter = request.getParameter("filter");
+ if (logFilter != null) {
+ try {
+ filter = TaskLog.LogName.valueOf(TaskLog.LogName.class, logFilter
+ .toUpperCase());
+ } catch (IllegalArgumentException iae) {
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST,
+ "Illegal value for filter: " + logFilter);
+ return;
+ }
+ }
+
+ String sLogOff = request.getParameter("start");
+ if (sLogOff != null) {
+ start = Long.valueOf(sLogOff).longValue();
+ }
+
+ String sLogEnd = request.getParameter("end");
+ if (sLogEnd != null) {
+ end = Long.valueOf(sLogEnd).longValue();
+ }
+
+ String sPlainText = request.getParameter("plaintext");
+ if (sPlainText != null) {
+ plainText = Boolean.valueOf(sPlainText);
+ }
+
+ String sCleanup = request.getParameter("cleanup");
+ if (sCleanup != null) {
+ isCleanup = Boolean.valueOf(sCleanup);
+ }
+
+ OutputStream out = response.getOutputStream();
+ if (!plainText) {
+ out.write(("<html>\n" + "<title>Task Logs: '" + taskId + "'</title>\n"
+ + "<body>\n" + "<h1>Task Logs: '" + taskId + "'</h1><br>\n")
+ .getBytes());
+
+ if (filter == null) {
+ printTaskLog(response, out, taskId, start, end, plainText,
+ TaskLog.LogName.STDOUT, isCleanup);
+ printTaskLog(response, out, taskId, start, end, plainText,
+ TaskLog.LogName.STDERR, isCleanup);
+ printTaskLog(response, out, taskId, start, end, plainText,
+ TaskLog.LogName.SYSLOG, isCleanup);
+ if (haveTaskLog(taskId, TaskLog.LogName.DEBUGOUT)) {
+ printTaskLog(response, out, taskId, start, end, plainText,
+ TaskLog.LogName.DEBUGOUT, isCleanup);
+ }
+ if (haveTaskLog(taskId, TaskLog.LogName.PROFILE)) {
+ printTaskLog(response, out, taskId, start, end, plainText,
+ TaskLog.LogName.PROFILE, isCleanup);
+ }
+ } else {
+ printTaskLog(response, out, taskId, start, end, plainText, filter,
+ isCleanup);
+ }
+
+ out.write("</body></html>\n".getBytes());
+ out.close();
+ } else if (filter == null) {
+ response
+ .sendError(
+ HttpServletResponse.SC_BAD_REQUEST,
+ "You must supply a value for `filter' (STDOUT, STDERR, or SYSLOG) if you set plainText = true");
+ } else {
+ printTaskLog(response, out, taskId, start, end, plainText, filter,
+ isCleanup);
+ }
+ }
+}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java Mon May 28 22:40:33 2012
@@ -19,11 +19,12 @@ package org.apache.hama.ipc;
import java.io.IOException;
-import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.ClusterStatus;
import org.apache.hama.bsp.JobProfile;
import org.apache.hama.bsp.JobStatus;
import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskCompletionEvent;
/**
* Protocol that a groom server and the central BSP Master use to communicate.
@@ -120,4 +121,7 @@ public interface JobSubmissionProtocol e
public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
throws IOException;
+ public TaskCompletionEvent[] getTaskCompletionEvents(BSPJobID id,
+ int startFrom, int i);
+
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java Mon May 28 22:40:33 2012
@@ -31,7 +31,7 @@ import org.apache.hama.bsp.JobStatus;
public class BSPServletUtil extends ServletUtil {
public static final String HTML_TAIL = "<hr />\n"
- + "<a href='http://incubator.apache.org/hama/'>Hama</a>, "
+ + "<a href='http://hama.apache.org/'>Hama</a>, "
+ Calendar.getInstance().get(Calendar.YEAR) + ".\n" + "</body></html>";
/**
@@ -64,7 +64,8 @@ public class BSPServletUtil extends Serv
+ "<th>SuperSteps</th>" + "<th>Tasks</th>" + "<th>Starttime</th>"
+ "</tr>\n");
for (JobStatus status : jobs) {
- sb.append("<tr><td><a href=\"bspjob.jsp?jobid=").append(status.getJobID()).append("\">");
+ sb.append("<tr><td><a href=\"bspjob.jsp?jobid=").append(
+ status.getJobID()).append("\">");
sb.append(status.getJobID());
sb.append("</a></td><td>");
sb.append(status.getUsername());
@@ -102,8 +103,10 @@ public class BSPServletUtil extends Serv
for (Entry<String, GroomServerStatus> entry : status
.getActiveGroomServerStatus().entrySet()) {
sb.append("<tr><td>");
- sb.append(entry.getKey()).append("</td><td>");
- sb.append(entry.getValue().getGroomHostName()).append("</td>").append("<td>").append(entry.getValue().getMaxTasks()).append("</td><td>");
+ sb.append("<a href='http://" + entry.getKey() + "'>");
+ sb.append(entry.getKey()).append("</a></td><td>");
+ sb.append(entry.getValue().getGroomHostName()).append("</td>").append(
+ "<td>").append(entry.getValue().getMaxTasks()).append("</td><td>");
sb.append(entry.getValue().countTasks()).append("</td><td>");
sb.append(entry.getValue().getFailures()).append("</td><td>");
sb.append(entry.getValue().getLastSeen()).append("</td>");
Added: incubator/hama/trunk/core/src/main/resources/webapp/groomserver/groomserver.jsp
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/resources/webapp/groomserver/groomserver.jsp?rev=1343412&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/resources/webapp/groomserver/groomserver.jsp (added)
+++ incubator/hama/trunk/core/src/main/resources/webapp/groomserver/groomserver.jsp Mon May 28 22:40:33 2012
@@ -0,0 +1,61 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<%@ page
+ contentType="text/html; charset=UTF-8"
+ import="javax.servlet.*"
+ import="javax.servlet.http.*"
+ import="java.io.*"
+ import="java.util.*"
+ import="java.text.DecimalFormat"
+ import="org.apache.hama.bsp.*"
+ import="org.apache.hama.util.*"
+%>
+<%! private static final long serialVersionUID = 1L;
+%>
+<%
+ GroomServer groom = (GroomServer) application.getAttribute("groom.server");
+ String groomName = groom.getGroomServerName();
+%>
+
+<html>
+
+<title><%= groomName %> - Server Status</title>
+<body>
+<h2><%= groomName %></h2>
+<hr>
+<h2>Running tasks</h2>
+<table border=2 cellpadding="5" cellspacing="2">
+<tr><td align="center">Task Attempts</td><td>Status</td>
+ </tr>
+ <%
+ Iterator itr = groom.getRunningTaskStatuses().iterator();
+ while (itr.hasNext()) {
+ TaskStatus status = (TaskStatus) itr.next();
+ out.print("<tr><td>" + status.getTaskId());
+ out.print("</td><td>" + status.getRunState());
+ out.print("</td></tr>\n");
+ }
+ %>
+</table>
+<hr>
+<h2>Local Logs</h2>
+<a href="/logs/">Log</a> directory
+
+<%
+ out.println(BSPServletUtil.htmlFooter());
+%>
\ No newline at end of file
Added: incubator/hama/trunk/core/src/main/resources/webapp/groomserver/index.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/resources/webapp/groomserver/index.html?rev=1343412&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/resources/webapp/groomserver/index.html (added)
+++ incubator/hama/trunk/core/src/main/resources/webapp/groomserver/index.html Mon May 28 22:40:33 2012
@@ -0,0 +1,17 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<meta HTTP-EQUIV="REFRESH" content="0;url=groomserver.jsp"/>
\ No newline at end of file