You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/05/29 00:40:34 UTC

svn commit: r1343412 - in /incubator/hama/trunk: conf/ core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/ipc/ core/src/main/java/org/apache/hama/util/ core/src/main/resources/webapp/groomserver/

Author: edwardyoon
Date: Mon May 28 22:40:33 2012
New Revision: 1343412

URL: http://svn.apache.org/viewvc?rev=1343412&view=rev
Log:
Task's error logs should be displayed on client-end when job is failed.

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
    incubator/hama/trunk/core/src/main/resources/webapp/groomserver/
    incubator/hama/trunk/core/src/main/resources/webapp/groomserver/groomserver.jsp
    incubator/hama/trunk/core/src/main/resources/webapp/groomserver/index.html
Modified:
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Mon May 28 22:40:33 2012
@@ -52,6 +52,12 @@
     </description>
   </property>
   <property>
+    <name>bsp.http.groomserver.port</name>
+    <value>40015</value>
+    <description>The port where the web-interface can be seen.
+    </description>
+  </property>
+  <property>
     <name>bsp.groom.report.address</name>
     <value>127.0.0.1:0</value>
     <description>The interface and port that groom server listens on. 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/Constants.java Mon May 28 22:40:33 2012
@@ -109,4 +109,6 @@ public interface Constants {
    * An empty instance.
    */
   static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+  public static final int DEFAULT_GROOM_INFO_SERVER = 40015;
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Mon May 28 22:40:33 2012
@@ -17,11 +17,19 @@
  */
 package org.apache.hama.bsp;
 
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -40,13 +48,12 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
@@ -203,6 +210,11 @@ public class BSPJobClient extends Config
         throws IOException {
       jobSubmitClient.killTask(taskId, shouldFail);
     }
+
+    @Override
+    public TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) {
+      return jobSubmitClient.getTaskCompletionEvents(getID(), startFrom, 10);
+    }
   }
 
   public BSPJobClient(Configuration conf) throws IOException {
@@ -218,8 +230,8 @@ public class BSPJobClient extends Config
     if (masterAdress != null && !masterAdress.equals("local")) {
       this.jobSubmitClient = (JobSubmissionProtocol) RPC.getProxy(
           JobSubmissionProtocol.class, HamaRPCProtocolVersion.versionID,
-          BSPMaster.getAddress(conf), conf,
-          NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+          BSPMaster.getAddress(conf), conf, NetUtils.getSocketFactory(conf,
+              JobSubmissionProtocol.class));
     } else {
       LOG.debug("Using local BSP runner.");
       this.jobSubmitClient = new LocalBSPRunner(conf);
@@ -377,7 +389,7 @@ public class BSPJobClient extends Config
     }
   }
 
-  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @SuppressWarnings( { "rawtypes", "unchecked" })
   protected BSPJob partition(BSPJob job, int maxTasks) throws IOException {
     InputSplit[] splits = job.getInputFormat().getSplits(
         job,
@@ -416,8 +428,8 @@ public class BSPJobClient extends Config
           job, null);
       CompressionCodec codec = null;
       if (outputCompressorClass != null) {
-        codec = ReflectionUtils.newInstance(outputCompressorClass,
-            job.getConf());
+        codec = ReflectionUtils.newInstance(outputCompressorClass, job
+            .getConf());
       }
 
       try {
@@ -471,7 +483,7 @@ public class BSPJobClient extends Config
   /**
    * Get the {@link CompressionType} for the output {@link SequenceFile}.
    * 
-   * @param job the {@link Job}
+   * @param job the {@link BSPJob}
    * @return the {@link CompressionType} for the output {@link SequenceFile},
    *         defaulting to {@link CompressionType#RECORD}
    */
@@ -487,7 +499,7 @@ public class BSPJobClient extends Config
   /**
    * Get the {@link CompressionCodec} for compressing the job outputs.
    * 
-   * @param job the {@link Job} to look in
+   * @param job the {@link BSPJob} to look in
    * @param defaultValue the {@link CompressionCodec} to return if not set
    * @return the {@link CompressionCodec} to be used to compress the job outputs
    * @throws IllegalArgumentException if the class was specified, but not found
@@ -604,21 +616,75 @@ public class BSPJobClient extends Config
         LOG.info(report);
         lastReport = report;
       }
+
+      int eventCounter = 0;
+      TaskCompletionEvent[] events = info.getTaskCompletionEvents(eventCounter);
+      eventCounter += events.length;
+      
+      for(TaskCompletionEvent event : events){
+        if (event.getTaskStatus() == 
+          TaskCompletionEvent.Status.FAILED){
+          
+          // Displaying the task logs
+          displayTaskLogs(event.getTaskAttemptId(), event.getGroomServerInfo());
+        }
+      }
     }
 
     if (job.isSuccessful()) {
       LOG.info("The total number of supersteps: " + info.getSuperstepCount());
-      info.getStatus()
-          .getCounter()
-          .incrCounter(BSPPeerImpl.PeerCounter.SUPERSTEPS,
-              info.getSuperstepCount());
+      info.getStatus().getCounter().incrCounter(
+          BSPPeerImpl.PeerCounter.SUPERSTEPS, info.getSuperstepCount());
       info.getStatus().getCounter().log(LOG);
     } else {
       LOG.info("Job failed.");
     }
+    
     return job.isSuccessful();
   }
 
+  static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
+    return (baseUrl + "/tasklog?plaintext=true&taskid=" + taskId); 
+  }
+  
+  private void displayTaskLogs(TaskAttemptID taskId,
+ String baseUrl)
+      throws MalformedURLException {
+    // The tasktracker for a 'failed/killed' job might not be around...
+    if (baseUrl != null) {
+      // Construct the url for the tasklogs
+      String taskLogUrl = getTaskLogURL(taskId, baseUrl);
+      
+      // Copy tasks's stdout of the JobClient
+      getTaskLogs(taskId, new URL(taskLogUrl+"&filter=stdout"), System.out);
+    }
+  }
+  
+  private static void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl, 
+                                  OutputStream out) {
+    try {
+      URLConnection connection = taskLogUrl.openConnection();
+      BufferedReader input = 
+        new BufferedReader(new InputStreamReader(connection.getInputStream()));
+      BufferedWriter output = 
+        new BufferedWriter(new OutputStreamWriter(out));
+      try {
+        String logData = null;
+        while ((logData = input.readLine()) != null) {
+          if (logData.length() > 0) {
+            output.write(taskId + ": " + logData + "\n");
+            output.flush();
+          }
+        }
+      } finally {
+        input.close();
+      }
+    }catch(IOException ioe){
+      LOG.warn("Error reading task output" + ioe.getMessage()); 
+    }
+  }    
+
+  
   /**
    * Grab the bspmaster system directory path where job-specific files are to be
    * placed.
@@ -660,7 +726,9 @@ public class BSPJobClient extends Config
 
     if (running.isSuccessful()) {
       LOG.info("Job complete: " + jobId);
-      LOG.info("The total number of supersteps: " + running.getSuperstepCount());
+      LOG
+          .info("The total number of supersteps: "
+              + running.getSuperstepCount());
       running.getStatus().getCounter().log(LOG);
     } else {
       LOG.info("Job failed.");
@@ -813,9 +881,8 @@ public class BSPJobClient extends Config
         System.out.println("Job name: " + job.getJobName());
         System.out.printf("States are:\n\tRunning : 1\tSucceded : 2"
             + "\tFailed : 3\tPrep : 4\n");
-        System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(),
-            jobStatus.getRunState(), jobStatus.getStartTime(),
-            jobStatus.getUsername());
+        System.out.printf("%s\t%d\t%d\t%s\n", jobStatus.getJobID(), jobStatus
+            .getRunState(), jobStatus.getStartTime(), jobStatus.getUsername());
 
         exitCode = 0;
       }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMaster.java Mon May 28 22:40:33 2012
@@ -66,7 +66,7 @@ import org.apache.zookeeper.data.Stat;
  * jobs.
  */
 public class BSPMaster implements JobSubmissionProtocol, MasterProtocol,
-GroomServerManager, Watcher {
+    GroomServerManager, Watcher {
 
   public static final Log LOG = LogFactory.getLog(BSPMaster.class);
   public static final String localModeMessage = "Local mode detected, no launch of the daemon needed.";
@@ -86,7 +86,8 @@ GroomServerManager, Watcher {
   static long JOBINIT_SLEEP_INTERVAL = 2000;
 
   // States
-  final AtomicReference<State> state = new AtomicReference<State>(State.INITIALIZING);
+  final AtomicReference<State> state = new AtomicReference<State>(
+      State.INITIALIZING);
 
   // Attributes
   String masterIdentifier;
@@ -240,12 +241,12 @@ GroomServerManager, Watcher {
    * Start the BSPMaster process, listen on the indicated hostname/port
    */
   public BSPMaster(HamaConfiguration conf) throws IOException,
-  InterruptedException {
+      InterruptedException {
     this(conf, generateNewIdentifier());
   }
 
   BSPMaster(HamaConfiguration conf, String identifier) throws IOException,
-  InterruptedException {
+      InterruptedException {
     this.conf = conf;
     this.masterIdentifier = identifier;
 
@@ -459,8 +460,8 @@ GroomServerManager, Watcher {
    */
   private void initZK(HamaConfiguration conf) {
     try {
-      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
-          conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf), conf
+          .getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
     } catch (IOException e) {
       LOG.error("Exception during reinitialization!", e);
     }
@@ -491,41 +492,42 @@ GroomServerManager, Watcher {
   }
 
   /**
-   * Clears all sub-children of node bspRoot 
+   * Clears all sub-children of node bspRoot
    */
-  public void clearZKNodes(){
+  public void clearZKNodes() {
     try {
       Stat s = zk.exists(bspRoot, false);
-      if(s != null){
+      if (s != null) {
         clearZKNodes(bspRoot);
-      }      
+      }
 
     } catch (Exception e) {
       LOG.warn("Could not clear zookeeper nodes.", e);
-    }      
+    }
   }
 
   /**
    * Clears all sub-children of node rooted at path.
+   * 
    * @param path
-   * @throws InterruptedException 
-   * @throws KeeperException 
+   * @throws InterruptedException
+   * @throws KeeperException
    */
-  private void clearZKNodes(String path) throws KeeperException, InterruptedException{
+  private void clearZKNodes(String path) throws KeeperException,
+      InterruptedException {
     ArrayList<String> list = (ArrayList<String>) zk.getChildren(path, false);
 
-    if(list.size() == 0){
+    if (list.size() == 0) {
       return;
 
-    }else{
-      for(String node:list){
-        clearZKNodes(path+"/"+node);
-        zk.delete(path+"/"+node, -1); //delete any version of this node.
+    } else {
+      for (String node : list) {
+        clearZKNodes(path + "/" + node);
+        zk.delete(path + "/" + node, -1); // delete any version of this node.
       }
     }
   }
 
-
   public void createJobRoot(String string) {
     try {
       zk.create("/" + string, new byte[0], Ids.OPEN_ACL_UNSAFE,
@@ -657,8 +659,8 @@ GroomServerManager, Watcher {
       for (Map.Entry<GroomServerStatus, GroomProtocol> entry : groomServers
           .entrySet()) {
         GroomServerStatus s = entry.getKey();
-        groomsMap.put(s.getGroomHostName() + ":" + Constants.DEFAULT_PEER_PORT,
-            s);
+        groomsMap.put(s.getGroomHostName() + ":"
+            + Constants.DEFAULT_GROOM_INFO_SERVER, s);
       }
     }
 
@@ -666,7 +668,8 @@ GroomServerManager, Watcher {
     this.totalTaskCapacity = tasksPerGroom * numGroomServers;
 
     if (detailed) {
-      return new ClusterStatus(groomsMap, totalTasks, totalTaskCapacity, state.get());
+      return new ClusterStatus(groomsMap, totalTasks, totalTaskCapacity, state
+          .get());
     } else {
       return new ClusterStatus(numGroomServers, totalTasks, totalTaskCapacity,
           state.get());
@@ -868,4 +871,22 @@ GroomServerManager, Watcher {
     // TODO Auto-generated method stub
 
   }
+
+  TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
+
+  @Override
+  public TaskCompletionEvent[] getTaskCompletionEvents(BSPJobID jobid,
+      int fromEventId, int maxEvents) {
+    synchronized (this) {
+      JobInProgress job = this.jobs.get(jobid);
+      if (null != job) {
+        if (job.areTasksInited()) {
+          return job.getTaskCompletionEvents(fromEventId, maxEvents);
+        } else {
+          return EMPTY_EVENTS;
+        }
+      }
+    }
+    return null;
+  }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/GroomServer.java Mon May 28 22:40:33 2012
@@ -59,6 +59,7 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.http.HttpServer;
 import org.apache.hama.ipc.BSPPeerProtocol;
 import org.apache.hama.ipc.GroomProtocol;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
@@ -93,6 +94,7 @@ public class GroomServer implements Runn
     NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
   };
 
+  private HttpServer server;
   private static ZooKeeper zk = null;
 
   // Running States and its related things
@@ -236,8 +238,8 @@ public class GroomServer implements Runn
 
     private BSPTasksMonitor() {
 
-      outOfContactTasks = new ArrayList<GroomServer.TaskInProgress>(
-          conf.getInt(Constants.MAX_TASKS_PER_GROOM, 3));
+      outOfContactTasks = new ArrayList<GroomServer.TaskInProgress>(conf
+          .getInt(Constants.MAX_TASKS_PER_GROOM, 3));
     }
 
     @Override
@@ -257,9 +259,9 @@ public class GroomServer implements Runn
           LOG.debug("Purging task " + tip);
           purgeTask(tip, true);
         } catch (Exception e) {
-          LOG.error(
-              new StringBuilder("Error while removing a timed-out task - ")
-                  .append(tip.toString()), e);
+          LOG.error(new StringBuilder(
+              "Error while removing a timed-out task - ")
+              .append(tip.toString()), e);
 
         }
       }
@@ -283,8 +285,8 @@ public class GroomServer implements Runn
     // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
 
     try {
-      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf),
-          conf.getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
+      zk = new ZooKeeper(QuorumPeer.getZKQuorumServersString(conf), conf
+          .getInt(Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
     } catch (IOException e) {
       LOG.error("Exception during reinitialization!", e);
     }
@@ -296,9 +298,8 @@ public class GroomServer implements Runn
     }
 
     if (localHostname == null) {
-      this.localHostname = DNS.getDefaultHost(
-          conf.get("bsp.dns.interface", "default"),
-          conf.get("bsp.dns.nameserver", "default"));
+      this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
+          "default"), conf.get("bsp.dns.nameserver", "default"));
     }
     // check local disk
     checkLocalDirs(getLocalDirs());
@@ -330,6 +331,20 @@ public class GroomServer implements Runn
       LOG.info("Worker rpc server --> " + rpcServer);
     }
 
+    server = new HttpServer("groomserver", rpcAddr, conf.getInt(
+        "bsp.http.groomserver.port", Constants.DEFAULT_GROOM_INFO_SERVER),
+        true, conf);
+
+    FileSystem local = FileSystem.getLocal(conf);
+    server.setAttribute("groom.server", this);
+    server.setAttribute("local.file.system", local);
+    server.setAttribute("conf", conf);
+    server.setAttribute("log", LOG);
+    server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
+
+    LOG.info("starting webserver: " + rpcAddr);
+    server.start();
+
     @SuppressWarnings("deprecation")
     String address = NetUtils.getServerAddress(conf,
         "bsp.groom.report.bindAddress", "bsp.groom.report.port",
@@ -393,6 +408,8 @@ public class GroomServer implements Runn
 
     this.running = true;
     this.initialized = true;
+
+    // FIXME
   }
 
   /** Return the port at which the tasktracker bound to */
@@ -687,18 +704,20 @@ public class GroomServer implements Runn
         .entrySet()) {
       TaskInProgress tip = entry.getValue();
       if (LOG.isDebugEnabled())
-        LOG.debug("checking task: "
-            + tip.getTask().getTaskID()
-            + " starttime ="
-            + tip.startTime
-            + " lastping = "
-            + tip.lastPingedTimestamp
-            + " run state = "
-            + tip.taskStatus.getRunState().toString()
-            + " monitorPeriod = "
-            + monitorPeriod
-            + " check = "
-            + (tip.taskStatus.getRunState().equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
+        LOG
+            .debug("checking task: "
+                + tip.getTask().getTaskID()
+                + " starttime ="
+                + tip.startTime
+                + " lastping = "
+                + tip.lastPingedTimestamp
+                + " run state = "
+                + tip.taskStatus.getRunState().toString()
+                + " monitorPeriod = "
+                + monitorPeriod
+                + " check = "
+                + (tip.taskStatus.getRunState()
+                    .equals(TaskStatus.State.RUNNING) && (((tip.lastPingedTimestamp == 0 && ((currentTime - tip.startTime) > 10 * monitorPeriod)) || ((tip.lastPingedTimestamp > 0) && (currentTime - tip.lastPingedTimestamp) > monitorPeriod)))));
 
       // Task is out of contact if it has not pinged since more than
       // monitorPeriod. A task is given a leeway of 10 times monitorPeriod
@@ -1056,6 +1075,24 @@ public class GroomServer implements Runn
     }
   }
 
+  public String getGroomServerName() {
+    return this.groomServerName;
+  }
+
+  /**
+   * Get the list of tasks that will be reported back to the job tracker in the
+   * next heartbeat cycle.
+   * 
+   * @return a copy of the list of TaskStatus objects
+   */
+  public synchronized List<TaskStatus> getRunningTaskStatuses() {
+    List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
+    for (TaskInProgress tip : runningTasks.values()) {
+      result.add(tip.getStatus());
+    }
+    return result;
+  }
+
   /**
    * The main() for BSPPeer child processes.
    */

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/JobInProgress.java Mon May 28 22:40:33 2012
@@ -19,8 +19,10 @@ package org.apache.hama.bsp;
 
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -29,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hama.Constants;
 
 /**
  * JobInProgress maintains all the info for keeping a Job on the straight and
@@ -54,7 +57,6 @@ class JobInProgress {
 
   static final Log LOG = LogFactory.getLog(JobInProgress.class);
   boolean tasksInited = false;
-  boolean jobInited = false;
 
   Configuration conf;
   JobProfile profile;
@@ -89,6 +91,8 @@ class JobInProgress {
   // Used only for scheduling!
   Map<GroomServerStatus, Integer> tasksInGroomMap;
 
+  private int taskCompletionEventTracker = 0;
+
   public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
       Configuration conf) throws IOException {
     this.conf = conf;
@@ -101,8 +105,8 @@ class JobInProgress {
 
     this.tasksInGroomMap = new HashMap<GroomServerStatus, Integer>();
 
-    this.status = new JobStatus(jobId, null, 0L, 0L,
-        JobStatus.State.PREP.value(), counters);
+    this.status = new JobStatus(jobId, null, 0L, 0L, JobStatus.State.PREP
+        .value(), counters);
     this.startTime = System.currentTimeMillis();
     this.superstepCounter = 0;
     this.restartCount = 0;
@@ -119,9 +123,11 @@ class JobInProgress {
     this.jobSplit = job.getConf().get("bsp.job.split.file");
 
     this.numBSPTasks = job.getNumBspTask();
+    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
+        numBSPTasks + 10);
 
-    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
-        job.getJobName());
+    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(), job
+        .getJobName());
 
     this.setJobName(job.getJobName());
 
@@ -303,9 +309,9 @@ class JobInProgress {
     }
 
     if (allDone) {
-      this.status = new JobStatus(this.status.getJobID(),
-          this.profile.getUser(), superstepCounter, superstepCounter,
-          superstepCounter, JobStatus.SUCCEEDED, superstepCounter, counters);
+      this.status = new JobStatus(this.status.getJobID(), this.profile
+          .getUser(), superstepCounter, superstepCounter, superstepCounter,
+          JobStatus.SUCCEEDED, superstepCounter, counters);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 
@@ -339,9 +345,8 @@ class JobInProgress {
       // Kill job
       this.kill();
       // Send KillTaskAction to GroomServer
-      this.status = new JobStatus(this.status.getJobID(),
-          this.profile.getUser(), 0L, 0L, 0L, JobStatus.KILLED,
-          superstepCounter, counters);
+      this.status = new JobStatus(this.status.getJobID(), this.profile
+          .getUser(), 0L, 0L, 0L, JobStatus.KILLED, superstepCounter, counters);
       this.finishTime = System.currentTimeMillis();
       this.status.setFinishTime(this.finishTime);
 
@@ -353,8 +358,40 @@ class JobInProgress {
 
   public synchronized void updateTaskStatus(TaskInProgress tip,
       TaskStatus taskStatus) {
+    TaskAttemptID taskid = taskStatus.getTaskId();
+
     tip.updateStatus(taskStatus); // update tip
 
+    TaskStatus.State state = taskStatus.getRunState();
+    TaskCompletionEvent taskEvent = null;
+    // FIXME port number should be configurable
+    String httpTaskLogLocation = "http://"
+        + tip.getGroomServerStatus().getGroomHostName() + ":"
+        + conf.getInt("bsp.http.groomserver.port", Constants.DEFAULT_GROOM_INFO_SERVER);
+
+    if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) {
+      int eventNumber;
+      if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
+        TaskCompletionEvent t = this.taskCompletionEvents.get(eventNumber);
+        if (t.getTaskAttemptId().equals(taskid))
+          t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
+      }
+
+      // Did the task failure lead to tip failure?
+      TaskCompletionEvent.Status taskCompletionStatus = (state == TaskStatus.State.FAILED) ? TaskCompletionEvent.Status.FAILED
+          : TaskCompletionEvent.Status.KILLED;
+      if (tip.isFailed()) {
+        taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
+      }
+      taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, taskid,
+          tip.idWithinJob(), taskCompletionStatus, httpTaskLogLocation);
+
+      if (taskEvent != null) {
+        this.taskCompletionEvents.add(taskEvent);
+        taskCompletionEventTracker++;
+      }
+    }
+
     if (superstepCounter < taskStatus.getSuperstepCount()) {
       superstepCounter = taskStatus.getSuperstepCount();
       // TODO Later, we have to update JobInProgress status here
@@ -431,4 +468,22 @@ class JobInProgress {
     return counters;
   }
 
+  List<TaskCompletionEvent> taskCompletionEvents;
+
+  synchronized int getNumTaskCompletionEvents() {
+    return taskCompletionEvents.size();
+  }
+
+  public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId,
+      int maxEvents) {
+    TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
+    if (taskCompletionEvents.size() > fromEventId) {
+      int actualMax = Math.min(maxEvents,
+          (taskCompletionEvents.size() - fromEventId));
+      events = taskCompletionEvents.subList(fromEventId,
+          actualMax + fromEventId).toArray(events);
+    }
+    return events;
+  }
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Mon May 28 22:40:33 2012
@@ -529,4 +529,10 @@ public class LocalBSPRunner implements J
 
     }
   }
+
+  @Override
+  public TaskCompletionEvent[] getTaskCompletionEvents(BSPJobID id,
+      int startFrom, int i) {
+    return TaskCompletionEvent.EMPTY_ARRAY;
+  }
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/RunningJob.java Mon May 28 22:40:33 2012
@@ -118,4 +118,6 @@ public interface RunningJob {
    * @return the latest status of the job.
    */
   public JobStatus getStatus();
+
+  public TaskCompletionEvent[] getTaskCompletionEvents(int eventCounter);
 }

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java?rev=1343412&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskCompletionEvent.java Mon May 28 22:40:33 2012
@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+public class TaskCompletionEvent implements Writable {
+  static public enum Status {
+    FAILED, KILLED, SUCCEEDED, OBSOLETE, TIPFAILED
+  };
+
+  private int eventId;
+  private String groomServerInfo;
+  private int taskRunTime; // using int since runtime is the time difference
+  private TaskAttemptID taskId;
+  Status status;
+  private int idWithinJob;
+  public static final TaskCompletionEvent[] EMPTY_ARRAY = new TaskCompletionEvent[0];
+
+  /**
+   * Default constructor for Writable.
+   * 
+   */
+  public TaskCompletionEvent() {
+    taskId = new TaskAttemptID();
+  }
+
+  /**
+   * Constructor. eventId should be created externally and incremented per event
+   * for each job.
+   * 
+   * @param eventId event id, event id should be unique and assigned in
+   *          incrementally, starting from 0.
+   * @param taskId task id
+   * @param status task's status
+   * @param taskTrackerHttp task tracker's host:port for http.
+   */
+  public TaskCompletionEvent(int eventId, TaskAttemptID taskId,
+      int idWithinJob, Status status, String groomServerInfo) {
+
+    this.taskId = taskId;
+    this.idWithinJob = idWithinJob;
+    this.eventId = eventId;
+    this.status = status;
+    this.groomServerInfo = groomServerInfo;
+  }
+
+  /**
+   * Returns event Id.
+   * 
+   * @return event id
+   */
+  public int getEventId() {
+    return eventId;
+  }
+
+  /**
+   * Returns task id.
+   * 
+   * @return task id
+   * @deprecated use {@link #getTaskAttemptId()} instead.
+   */
+  @Deprecated
+  public String getTaskId() {
+    return taskId.toString();
+  }
+
+  /**
+   * Returns task id.
+   * 
+   * @return task id
+   */
+  public TaskAttemptID getTaskAttemptId() {
+    return taskId;
+  }
+
+  /**
+   * Returns enum Status.SUCESS or Status.FAILURE.
+   * 
+   * @return task tracker status
+   */
+  public Status getTaskStatus() {
+    return status;
+  }
+
+  /**
+   * http location of the groomserver where this task ran.
+   * 
+   * @return http location of groomserver tasklogs
+   */
+  public String getGroomServerInfo() {
+    return groomServerInfo;
+  }
+
+  /**
+   * Returns time (in millisec) the task took to complete.
+   */
+  public int getTaskRunTime() {
+    return taskRunTime;
+  }
+
+  /**
+   * Set the task completion time
+   * 
+   * @param taskCompletionTime time (in millisec) the task took to complete
+   */
+  public void setTaskRunTime(int taskCompletionTime) {
+    this.taskRunTime = taskCompletionTime;
+  }
+
+  /**
+   * set event Id. should be assigned incrementally starting from 0.
+   * 
+   * @param eventId
+   */
+  public void setEventId(int eventId) {
+    this.eventId = eventId;
+  }
+
+  /**
+   * Sets task id.
+   * 
+   * @param taskId
+   * @deprecated use {@link #setTaskID(TaskAttemptID)} instead.
+   */
+  @Deprecated
+  public void setTaskId(String taskId) {
+    this.taskId = TaskAttemptID.forName(taskId);
+  }
+
+  /**
+   * Sets task id.
+   * 
+   * @param taskId
+   */
+  public void setTaskID(TaskAttemptID taskId) {
+    this.taskId = taskId;
+  }
+
+  /**
+   * Set task status.
+   * 
+   * @param status
+   */
+  public void setTaskStatus(Status status) {
+    this.status = status;
+  }
+
+  /**
+   * Set task tracker http location.
+   * 
+   * @param taskTrackerHttp
+   */
+  public void setTaskTrackerHttp(String taskTrackerHttp) {
+    this.groomServerInfo = taskTrackerHttp;
+  }
+
+  @Override
+  public String toString() {
+    StringBuffer buf = new StringBuffer();
+    buf.append("Task Id : ");
+    buf.append(taskId);
+    buf.append(", Status : ");
+    buf.append(status.name());
+    return buf.toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o == null)
+      return false;
+    if (o.getClass().equals(TaskCompletionEvent.class)) {
+      TaskCompletionEvent event = (TaskCompletionEvent) o;
+      return this.eventId == event.getEventId()
+          && this.idWithinJob == event.idWithinJob()
+          && this.status.equals(event.getTaskStatus())
+          && this.taskId.equals(event.getTaskAttemptId())
+          && this.taskRunTime == event.getTaskRunTime()
+          && this.groomServerInfo.equals(event.getGroomServerInfo());
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return toString().hashCode();
+  }
+
+  public int idWithinJob() {
+    return idWithinJob;
+  }
+
+  // ////////////////////////////////////////////
+  // Writable
+  // ////////////////////////////////////////////
+  public void write(DataOutput out) throws IOException {
+    taskId.write(out);
+    WritableUtils.writeVInt(out, idWithinJob);
+    WritableUtils.writeEnum(out, status);
+    WritableUtils.writeString(out, groomServerInfo);
+    WritableUtils.writeVInt(out, taskRunTime);
+    WritableUtils.writeVInt(out, eventId);
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    taskId.readFields(in);
+    idWithinJob = WritableUtils.readVInt(in);
+    status = WritableUtils.readEnum(in, Status.class);
+    groomServerInfo = WritableUtils.readString(in);
+    taskRunTime = WritableUtils.readVInt(in);
+    eventId = WritableUtils.readVInt(in);
+  }
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Mon May 28 22:40:33 2012
@@ -51,13 +51,14 @@ class TaskInProgress {
   private TaskID id;
   private JobInProgress job;
   private int completes = 0;
-  
+
   private GroomServerStatus myGroomStatus = null;
 
   // Status
   // private double progress = 0;
   // private String state = "";
   private long startTime = 0;
+  private int successEventNumber = -1;
 
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -81,7 +82,7 @@ class TaskInProgress {
   private BSPJobID jobId;
 
   private RawSplit rawSplit;
-  
+
   /**
    * Constructor for new nexus between BSPMaster and GroomServer.
    * 
@@ -97,8 +98,8 @@ class TaskInProgress {
     init(jobId);
   }
 
-  public TaskInProgress(BSPJobID jobId, String jobFile, RawSplit rawSplit, BSPMaster master,
-      Configuration conf, JobInProgress job, int partition) {
+  public TaskInProgress(BSPJobID jobId, String jobFile, RawSplit rawSplit,
+      BSPMaster master, Configuration conf, JobInProgress job, int partition) {
     this.jobId = jobId;
     this.jobFile = jobFile;
     this.rawSplit = rawSplit;
@@ -114,11 +115,11 @@ class TaskInProgress {
     this.id = new TaskID(jobId, partition);
     this.startTime = System.currentTimeMillis();
   }
-  
+
   /**
    * Return a Task that can be sent to a GroomServer for execution.
    */
-  public Task getTaskToRun(Map<String, GroomServerStatus> grooms, 
+  public Task getTaskToRun(Map<String, GroomServerStatus> grooms,
       Map<GroomServerStatus, Integer> tasksInGroomMap) throws IOException {
     Task t = null;
 
@@ -133,44 +134,44 @@ class TaskInProgress {
           + " attempts for the tip '" + getTIPId() + "'");
       return null;
     }
-    
+
     String splitClass = null;
     BytesWritable split = null;
     GroomServerStatus selectedGroom = null;
-    if(rawSplit != null){
+    if (rawSplit != null) {
       splitClass = rawSplit.getClassName();
       split = rawSplit.getBytes();
       String[] possibleLocations = rawSplit.getLocations();
-      for (int i = 0; i < possibleLocations.length; ++i){
+      for (int i = 0; i < possibleLocations.length; ++i) {
         String location = possibleLocations[i];
         GroomServerStatus groom = grooms.get(location);
         Integer taskInGroom = tasksInGroomMap.get(groom);
-        taskInGroom = (taskInGroom == null)?0:taskInGroom;
-        if(taskInGroom < groom.getMaxTasks() && 
-            location.equals(groom.getGroomHostName())){
-            selectedGroom = groom;
-            t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
-            activeTasks.put(taskid, groom.getGroomName());
-            
-            break;
+        taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+        if (taskInGroom < groom.getMaxTasks()
+            && location.equals(groom.getGroomHostName())) {
+          selectedGroom = groom;
+          t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
+          activeTasks.put(taskid, groom.getGroomName());
+
+          break;
         }
       }
     }
-    //Failed in attempt to get data locality or there was no input split.
-    if(selectedGroom == null){
+    // Failed in attempt to get data locality or there was no input split.
+    if (selectedGroom == null) {
       Iterator<String> groomIter = grooms.keySet().iterator();
-      while(groomIter.hasNext()) {
+      while (groomIter.hasNext()) {
         GroomServerStatus groom = grooms.get(groomIter.next());
         Integer taskInGroom = tasksInGroomMap.get(groom);
-        taskInGroom = (taskInGroom == null)?0:taskInGroom;
-        if(taskInGroom < groom.getMaxTasks()){
+        taskInGroom = (taskInGroom == null) ? 0 : taskInGroom;
+        if (taskInGroom < groom.getMaxTasks()) {
           selectedGroom = groom;
           t = new BSPTask(jobId, jobFile, taskid, partition, splitClass, split);
           activeTasks.put(taskid, groom.getGroomName());
         }
       }
     }
-    
+
     myGroomStatus = selectedGroom;
 
     return t;
@@ -204,8 +205,8 @@ class TaskInProgress {
   public TreeMap<TaskAttemptID, String> getTasks() {
     return activeTasks;
   }
-  
-  public GroomServerStatus getGroomServerStatus(){
+
+  public GroomServerStatus getGroomServerStatus() {
     return myGroomStatus;
   }
 
@@ -278,7 +279,7 @@ class TaskInProgress {
 
     this.completes++;
   }
-  
+
   public void terminated(TaskAttemptID taskid) {
     LOG.info("Task '" + taskid.getTaskID().toString() + "' has failed.");
 
@@ -339,4 +340,29 @@ class TaskInProgress {
     return bspMaster;
   }
 
+  /**
+   * Set the event number that was raised for this tip
+   */
+  public void setSuccessEventNumber(int eventNumber) {
+    successEventNumber = eventNumber;
+  }
+
+  /**
+   * Get the event number that was raised for this tip
+   */
+  public int getSuccessEventNumber() {
+    return successEventNumber;
+  }
+
+  /**
+   * @return int the tip index
+   */
+  public int idWithinJob() {
+    return partition;
+  }
+
+  public String machineWhereTaskRan(TaskAttemptID taskid) {
+    return taskStatuses.get(taskid).getGroomServer();
+  }
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Mon May 28 22:40:33 2012
@@ -37,7 +37,7 @@ public class TaskLog {
   private static final Log LOG = LogFactory.getLog(TaskLog.class.getName());
 
   private static final File LOG_DIR = new File(
-      System.getProperty("hama.log.dir"), "userlogs").getAbsoluteFile();
+      System.getProperty("hama.log.dir"), "tasklogs").getAbsoluteFile();
 
   static {
     if (!LOG_DIR.exists()) {
@@ -46,7 +46,7 @@ public class TaskLog {
   }
 
   public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
-    return new File(new File(LOG_DIR, taskid.toString()), filter.toString());
+    return new File(LOG_DIR, taskid.getJobID() + "/" + taskid.toString() + ".log");
   }
 
   /**

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java?rev=1343412&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogServlet.java Mon May 28 22:40:33 2012
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.util.StringUtils;
+
+public class TaskLogServlet extends HttpServlet {
+  private static final long serialVersionUID = -6615764817774487321L;
+
+  private boolean haveTaskLog(TaskAttemptID taskId, TaskLog.LogName type) {
+    File f = TaskLog.getTaskLogFile(taskId, type);
+    return f.canRead();
+  }
+
+  /**
+   * Construct the taskLogUrl
+   * 
+   * @param taskTrackerHostName
+   * @param httpPort
+   * @param taskAttemptID
+   * @return the taskLogUrl
+   */
+  public static String getTaskLogUrl(String taskTrackerHostName,
+      String httpPort, String taskAttemptID) {
+    return ("http://" + taskTrackerHostName + ":" + httpPort
+        + "/tasklog?taskid=" + taskAttemptID);
+  }
+
+  /**
+   * Find the next quotable character in the given array.
+   * 
+   * @param data the bytes to look in
+   * @param offset the first index to look in
+   * @param end the index after the last one to look in
+   * @return the index of the quotable character or end if none was found
+   */
+  private static int findFirstQuotable(byte[] data, int offset, int end) {
+    while (offset < end) {
+      switch (data[offset]) {
+        case '<':
+        case '>':
+        case '&':
+          return offset;
+        default:
+          offset += 1;
+      }
+    }
+    return offset;
+  }
+
+  private static void quotedWrite(OutputStream out, byte[] data, int offset,
+      int length) throws IOException {
+    int end = offset + length;
+    while (offset < end) {
+      int next = findFirstQuotable(data, offset, end);
+      out.write(data, offset, next - offset);
+      offset = next;
+      if (offset < end) {
+        switch (data[offset]) {
+          case '<':
+            out.write("&lt;".getBytes());
+            break;
+          case '>':
+            out.write("&gt;".getBytes());
+            break;
+          case '&':
+            out.write("&amp;".getBytes());
+            break;
+          default:
+            out.write(data[offset]);
+            break;
+        }
+        offset += 1;
+      }
+    }
+  }
+
+  private void printTaskLog(HttpServletResponse response, OutputStream out,
+      TaskAttemptID taskId, long start, long end, boolean plainText,
+      TaskLog.LogName filter, boolean isCleanup) throws IOException {
+    if (!plainText) {
+      out.write(("<br><b><u>" + filter + " logs</u></b><br>\n" + "<pre>\n")
+          .getBytes());
+    }
+
+    try {
+      InputStream taskLogReader = new TaskLog.Reader(taskId, filter, start, end);
+      byte[] b = new byte[65536];
+      int result;
+      while (true) {
+        result = taskLogReader.read(b);
+        if (result > 0) {
+          if (plainText) {
+            out.write(b, 0, result);
+          } else {
+            quotedWrite(out, b, 0, result);
+          }
+        } else {
+          break;
+        }
+      }
+      taskLogReader.close();
+      if (!plainText) {
+        out.write("</pre></td></tr></table><hr><br>\n".getBytes());
+      }
+    } catch (IOException ioe) {
+      if (filter == TaskLog.LogName.DEBUGOUT) {
+        if (!plainText) {
+          out.write("</pre><hr><br>\n".getBytes());
+        }
+        // do nothing
+      } else {
+        response.sendError(HttpServletResponse.SC_GONE, "Failed to retrieve "
+            + filter + " log for task: " + taskId);
+        out.write(("TaskLogServlet exception:\n"
+            + StringUtils.stringifyException(ioe) + "\n").getBytes());
+      }
+    }
+  }
+
+  /**
+   * Get the logs via http.
+   */
+  @Override
+  public void doGet(HttpServletRequest request, HttpServletResponse response)
+      throws ServletException, IOException {
+    long start = 0;
+    long end = -1;
+    boolean plainText = false;
+    TaskLog.LogName filter = null;
+    boolean isCleanup = false;
+
+    String taskIdStr = request.getParameter("taskid");
+    if (taskIdStr == null) {
+      response.sendError(HttpServletResponse.SC_BAD_REQUEST,
+          "Argument taskid is required");
+      return;
+    }
+    TaskAttemptID taskId = TaskAttemptID.forName(taskIdStr);
+    String logFilter = request.getParameter("filter");
+    if (logFilter != null) {
+      try {
+        filter = TaskLog.LogName.valueOf(TaskLog.LogName.class, logFilter
+            .toUpperCase());
+      } catch (IllegalArgumentException iae) {
+        response.sendError(HttpServletResponse.SC_BAD_REQUEST,
+            "Illegal value for filter: " + logFilter);
+        return;
+      }
+    }
+
+    String sLogOff = request.getParameter("start");
+    if (sLogOff != null) {
+      start = Long.valueOf(sLogOff).longValue();
+    }
+
+    String sLogEnd = request.getParameter("end");
+    if (sLogEnd != null) {
+      end = Long.valueOf(sLogEnd).longValue();
+    }
+
+    String sPlainText = request.getParameter("plaintext");
+    if (sPlainText != null) {
+      plainText = Boolean.valueOf(sPlainText);
+    }
+
+    String sCleanup = request.getParameter("cleanup");
+    if (sCleanup != null) {
+      isCleanup = Boolean.valueOf(sCleanup);
+    }
+
+    OutputStream out = response.getOutputStream();
+    if (!plainText) {
+      out.write(("<html>\n" + "<title>Task Logs: '" + taskId + "'</title>\n"
+          + "<body>\n" + "<h1>Task Logs: '" + taskId + "'</h1><br>\n")
+          .getBytes());
+
+      if (filter == null) {
+        printTaskLog(response, out, taskId, start, end, plainText,
+            TaskLog.LogName.STDOUT, isCleanup);
+        printTaskLog(response, out, taskId, start, end, plainText,
+            TaskLog.LogName.STDERR, isCleanup);
+        printTaskLog(response, out, taskId, start, end, plainText,
+            TaskLog.LogName.SYSLOG, isCleanup);
+        if (haveTaskLog(taskId, TaskLog.LogName.DEBUGOUT)) {
+          printTaskLog(response, out, taskId, start, end, plainText,
+              TaskLog.LogName.DEBUGOUT, isCleanup);
+        }
+        if (haveTaskLog(taskId, TaskLog.LogName.PROFILE)) {
+          printTaskLog(response, out, taskId, start, end, plainText,
+              TaskLog.LogName.PROFILE, isCleanup);
+        }
+      } else {
+        printTaskLog(response, out, taskId, start, end, plainText, filter,
+            isCleanup);
+      }
+
+      out.write("</body></html>\n".getBytes());
+      out.close();
+    } else if (filter == null) {
+      response
+          .sendError(
+              HttpServletResponse.SC_BAD_REQUEST,
+              "You must supply a value for `filter' (STDOUT, STDERR, or SYSLOG) if you set plainText = true");
+    } else {
+      printTaskLog(response, out, taskId, start, end, plainText, filter,
+          isCleanup);
+    }
+  }
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java Mon May 28 22:40:33 2012
@@ -19,11 +19,12 @@ package org.apache.hama.ipc;
 
 import java.io.IOException;
 
-import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.JobProfile;
 import org.apache.hama.bsp.JobStatus;
 import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.TaskCompletionEvent;
 
 /**
  * Protocol that a groom server and the central BSP Master use to communicate.
@@ -120,4 +121,7 @@ public interface JobSubmissionProtocol e
   public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
       throws IOException;
 
+  public TaskCompletionEvent[] getTaskCompletionEvents(BSPJobID id,
+      int startFrom, int i);
+
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java?rev=1343412&r1=1343411&r2=1343412&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java Mon May 28 22:40:33 2012
@@ -31,7 +31,7 @@ import org.apache.hama.bsp.JobStatus;
 public class BSPServletUtil extends ServletUtil {
 
   public static final String HTML_TAIL = "<hr />\n"
-      + "<a href='http://incubator.apache.org/hama/'>Hama</a>, "
+      + "<a href='http://hama.apache.org/'>Hama</a>, "
       + Calendar.getInstance().get(Calendar.YEAR) + ".\n" + "</body></html>";
 
   /**
@@ -64,7 +64,8 @@ public class BSPServletUtil extends Serv
           + "<th>SuperSteps</th>" + "<th>Tasks</th>" + "<th>Starttime</th>"
           + "</tr>\n");
       for (JobStatus status : jobs) {
-        sb.append("<tr><td><a href=\"bspjob.jsp?jobid=").append(status.getJobID()).append("\">");
+        sb.append("<tr><td><a href=\"bspjob.jsp?jobid=").append(
+            status.getJobID()).append("\">");
         sb.append(status.getJobID());
         sb.append("</a></td><td>");
         sb.append(status.getUsername());
@@ -102,8 +103,10 @@ public class BSPServletUtil extends Serv
     for (Entry<String, GroomServerStatus> entry : status
         .getActiveGroomServerStatus().entrySet()) {
       sb.append("<tr><td>");
-      sb.append(entry.getKey()).append("</td><td>");
-      sb.append(entry.getValue().getGroomHostName()).append("</td>").append("<td>").append(entry.getValue().getMaxTasks()).append("</td><td>");
+      sb.append("<a href='http://" + entry.getKey() + "'>");
+      sb.append(entry.getKey()).append("</a></td><td>");
+      sb.append(entry.getValue().getGroomHostName()).append("</td>").append(
+          "<td>").append(entry.getValue().getMaxTasks()).append("</td><td>");
       sb.append(entry.getValue().countTasks()).append("</td><td>");
       sb.append(entry.getValue().getFailures()).append("</td><td>");
       sb.append(entry.getValue().getLastSeen()).append("</td>");

Added: incubator/hama/trunk/core/src/main/resources/webapp/groomserver/groomserver.jsp
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/resources/webapp/groomserver/groomserver.jsp?rev=1343412&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/resources/webapp/groomserver/groomserver.jsp (added)
+++ incubator/hama/trunk/core/src/main/resources/webapp/groomserver/groomserver.jsp Mon May 28 22:40:33 2012
@@ -0,0 +1,61 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<%@ page
+  contentType="text/html; charset=UTF-8"
+  import="javax.servlet.*"
+  import="javax.servlet.http.*"
+  import="java.io.*"
+  import="java.util.*"
+  import="java.text.DecimalFormat"
+  import="org.apache.hama.bsp.*"
+  import="org.apache.hama.util.*"
+%>
+<%!	private static final long serialVersionUID = 1L;
+%>
+<%
+  GroomServer groom = (GroomServer) application.getAttribute("groom.server");
+  String groomName = groom.getGroomServerName();
+%>
+
+<html>
+
+<title><%= groomName %> - Server Status</title>
+<body>
+<h2><%= groomName %></h2>
+<hr>
+<h2>Running tasks</h2>
+<table border=2 cellpadding="5" cellspacing="2">
+<tr><td align="center">Task Attempts</td><td>Status</td>
+    </tr>
+  <%
+     Iterator itr = groom.getRunningTaskStatuses().iterator();
+     while (itr.hasNext()) {
+       TaskStatus status = (TaskStatus) itr.next();
+       out.print("<tr><td>" + status.getTaskId());
+       out.print("</td><td>" + status.getRunState()); 
+       out.print("</td></tr>\n");
+     }
+  %>
+</table>
+<hr>
+<h2>Local Logs</h2>
+<a href="/logs/">Log</a> directory
+
+<%
+  out.println(BSPServletUtil.htmlFooter());
+%>
\ No newline at end of file

Added: incubator/hama/trunk/core/src/main/resources/webapp/groomserver/index.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/resources/webapp/groomserver/index.html?rev=1343412&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/resources/webapp/groomserver/index.html (added)
+++ incubator/hama/trunk/core/src/main/resources/webapp/groomserver/index.html Mon May 28 22:40:33 2012
@@ -0,0 +1,17 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<meta HTTP-EQUIV="REFRESH" content="0;url=groomserver.jsp"/>
\ No newline at end of file