You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2011/08/01 16:12:56 UTC

svn commit: r1152788 [5/9] - in /incubator/hama/trunk: ./ bin/ conf/ core/ core/bin/ core/conf/ core/src/ core/src/main/ core/src/main/java/ core/src/main/java/org/ core/src/main/java/org/apache/ core/src/main/java/org/apache/hama/ core/src/main/java/o...

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobStatus;
+
+/**
+ *TaskInProgress maintains all the info needed for a Task in the lifetime of
+ * its owning Job.
+ */
+class TaskInProgress {
+  public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
+
+  private Configuration conf;
+
+  // Constants
+  static final int MAX_TASK_EXECS = 1;
+  int maxTaskAttempts = 4;
+  private boolean failed = false;
+  private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
+
+  // Job Meta
+  private String jobFile = null;
+  private int partition;
+  private BSPMaster bspMaster;
+  private TaskID id;
+  private JobInProgress job;
+  private int completes = 0;
+
+  // Status
+  // private double progress = 0;
+  // private String state = "";
+  private long startTime = 0;
+
+  // The 'next' usable taskid of this tip
+  int nextTaskId = 0;
+
+  // The taskid that took this TIP to SUCCESS
+  private TaskAttemptID successfulTaskId;
+
+  // The first taskid of this tip
+  private TaskAttemptID firstTaskId;
+
+  // Map from task Id -> GroomServer Id, contains tasks that are
+  // currently runnings
+  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
+  // All attempt Ids of this TIP
+  // private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
+  /**
+   * Map from taskId -> TaskStatus
+   */
+  private TreeMap<TaskAttemptID, TaskStatus> taskStatuses = new TreeMap<TaskAttemptID, TaskStatus>();
+
+  private BSPJobID jobId;
+
+  /**
+   * Constructor for new nexus between BSPMaster and GroomServer.
+   * 
+   * @param jobId is identification of JobInProgress.
+   * @param jobFile the path of job file
+   * @param partition which partition this TaskInProgress owns.
+   */
+  public TaskInProgress(BSPJobID jobId, String jobFile, int partition) {
+    this.jobId = jobId;
+    this.jobFile = jobFile;
+    this.partition = partition;
+
+    this.id = new TaskID(jobId, partition);
+  }
+
+  public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
+      Configuration conf, JobInProgress job, int partition) {
+    this.jobId = jobId;
+    this.jobFile = jobFile;
+    this.setBspMaster(master);
+    this.job = job;
+    this.setConf(conf);
+    this.partition = partition;
+
+    this.id = new TaskID(jobId, partition);
+  }
+
+  /**
+   * Return a Task that can be sent to a GroomServer for execution.
+   */
+  public Task getTaskToRun(GroomServerStatus status) throws IOException {
+    Task t = null;
+
+    TaskAttemptID taskid = null;
+    if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
+      int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART
+          + nextTaskId;
+      taskid = new TaskAttemptID(id, attemptId);
+      ++nextTaskId;
+    } else {
+      LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts)
+          + " attempts for the tip '" + getTIPId() + "'");
+      return null;
+    }
+
+    t = new BSPTask(jobId, jobFile, taskid, partition);
+    activeTasks.put(taskid, status.getGroomName());
+
+    return t;
+  }
+
+  // //////////////////////////////////
+  // Accessors
+  // //////////////////////////////////
+  /**
+   * Return the start time
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Return the parent job
+   */
+  public JobInProgress getJob() {
+    return job;
+  }
+
+  public TaskID getTIPId() {
+    return id;
+  }
+
+  public TaskID getTaskId() {
+    return this.id;
+  }
+
+  public TreeMap<TaskAttemptID, String> getTasks() {
+    return activeTasks;
+  }
+
+  /**
+   * Is the Task associated with taskid is the first attempt of the tip?
+   * 
+   * @param taskId
+   * @return Returns true if the Task is the first attempt of the tip
+   */
+  public boolean isFirstAttempt(TaskAttemptID taskId) {
+    return firstTaskId == null ? false : firstTaskId.equals(taskId);
+  }
+
+  /**
+   * Is this tip currently running any tasks?
+   * 
+   * @return true if any tasks are running
+   */
+  public boolean isRunning() {
+    return !activeTasks.isEmpty();
+  }
+
+  /**
+   * Is this tip complete?
+   * 
+   * @return <code>true</code> if the tip is complete, else <code>false</code>
+   */
+  public synchronized boolean isComplete() {
+    return (completes > 0);
+  }
+
+  /**
+   * Is the given taskid the one that took this tip to completion?
+   * 
+   * @param taskid taskid of attempt to check for completion
+   * @return <code>true</code> if taskid is complete, else <code>false</code>
+   */
+  public boolean isComplete(TaskAttemptID taskid) {
+    return (completes > 0 && taskid.equals(getSuccessfulTaskid()));
+  }
+
+  private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
+
+  public boolean shouldCloseForClosedJob(TaskAttemptID taskid) {
+    TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
+    if ((ts != null) && (!tasksReportedClosed.contains(taskid))
+        && (job.getStatus().getRunState() != JobStatus.RUNNING)) {
+      tasksReportedClosed.add(taskid);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public void completed(TaskAttemptID taskid) {
+    LOG.info("Task '" + taskid.getTaskID().toString() + "' has completed.");
+
+    TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+    status.setRunState(TaskStatus.State.SUCCEEDED);
+    activeTasks.remove(taskid);
+
+    // Note the successful taskid
+    setSuccessfulTaskid(taskid);
+
+    //
+    // Now that the TIP is complete, the other speculative
+    // subtasks will be closed when the owning groom server
+    // reports in and calls shouldClose() on this object.
+    //
+
+    this.completes++;
+  }
+  
+  public void terminated(TaskAttemptID taskid) {
+    LOG.info("Task '" + taskid.getTaskID().toString() + "' has failed.");
+
+    TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+    status.setRunState(TaskStatus.State.FAILED);
+    activeTasks.remove(taskid);
+  }
+
+  private void setSuccessfulTaskid(TaskAttemptID taskid) {
+    this.successfulTaskId = taskid;
+  }
+
+  private TaskAttemptID getSuccessfulTaskid() {
+    return successfulTaskId;
+  }
+
+  public void updateStatus(TaskStatus status) {
+    taskStatuses.put(status.getTaskId(), status);
+  }
+
+  public TaskStatus getTaskStatus(TaskAttemptID taskId) {
+    return this.taskStatuses.get(taskId);
+  }
+
+  public void kill() {
+    this.failed = true;
+  }
+
+  public boolean isFailed() {
+    return failed;
+  }
+
+  /**
+   * @param conf the conf to set
+   */
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  /**
+   * @return the conf
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * @param bspMaster the bspMaster to set
+   */
+  public void setBspMaster(BSPMaster bspMaster) {
+    this.bspMaster = bspMaster;
+  }
+
+  /**
+   * @return the bspMaster
+   */
+  public BSPMaster getBspMaster() {
+    return bspMaster;
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * A simple logger to handle the task-specific user logs.
+ */
+public class TaskLog {
+  private static final Log LOG = LogFactory.getLog(TaskLog.class.getName());
+
+  private static final File LOG_DIR = new File(
+      System.getProperty("hama.log.dir"), "userlogs").getAbsoluteFile();
+
+  static {
+    if (!LOG_DIR.exists()) {
+      LOG_DIR.mkdirs();
+    }
+  }
+
+  public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
+    return new File(new File(LOG_DIR, taskid.toString()), filter.toString());
+  }
+
+  /**
+   * The filter for userlogs.
+   */
+  public static enum LogName {
+    /** Log on the stdout of the task. */
+    STDOUT("stdout"),
+
+    /** Log on the stderr of the task. */
+    STDERR("stderr"),
+
+    /** Log on the map-reduce system logs of the task. */
+    SYSLOG("syslog"),
+
+    /** The java profiler information. */
+    PROFILE("profile.out"),
+
+    /** Log the debug script's stdout */
+    DEBUGOUT("debugout");
+
+    private String prefix;
+
+    private LogName(String prefix) {
+      this.prefix = prefix;
+    }
+
+    @Override
+    public String toString() {
+      return prefix;
+    }
+  }
+
+  private static class TaskLogsPurgeFilter implements FileFilter {
+    long purgeTimeStamp;
+
+    TaskLogsPurgeFilter(long purgeTimeStamp) {
+      this.purgeTimeStamp = purgeTimeStamp;
+    }
+
+    public boolean accept(File file) {
+      LOG.debug("PurgeFilter - file: " + file + ", mtime: "
+          + file.lastModified() + ", purge: " + purgeTimeStamp);
+      return file.lastModified() < purgeTimeStamp;
+    }
+  }
+
+  /**
+   * Purge old user logs.
+   * 
+   * @throws IOException
+   */
+  public static synchronized void cleanup(int logsRetainHours)
+      throws IOException {
+    // Purge logs of tasks on this tasktracker if their
+    // mtime has exceeded "mapred.task.log.retain" hours
+    long purgeTimeStamp = System.currentTimeMillis()
+        - (logsRetainHours * 60L * 60 * 1000);
+    File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter(
+        purgeTimeStamp));
+    if (oldTaskLogs != null) {
+      for (int i = 0; i < oldTaskLogs.length; ++i) {
+        FileUtil.fullyDelete(oldTaskLogs[i]);
+      }
+    }
+  }
+
+  static class Reader extends InputStream {
+    private long bytesRemaining;
+    private FileInputStream file;
+
+    /**
+     * Read a log file from start to end positions. The offsets may be negative,
+     * in which case they are relative to the end of the file. For example,
+     * Reader(taskid, kind, 0, -1) is the entire file and Reader(taskid, kind,
+     * -4197, -1) is the last 4196 bytes.
+     * 
+     * @param taskid the id of the task to read the log file for
+     * @param kind the kind of log to read
+     * @param start the offset to read from (negative is relative to tail)
+     * @param end the offset to read upto (negative is relative to tail)
+     * @throws IOException
+     */
+    public Reader(TaskAttemptID taskid, LogName kind, long start, long end)
+        throws IOException {
+      // find the right log file
+      File filename = getTaskLogFile(taskid, kind);
+      // calculate the start and stop
+      long size = filename.length();
+      if (start < 0) {
+        start += size + 1;
+      }
+      if (end < 0) {
+        end += size + 1;
+      }
+      start = Math.max(0, Math.min(start, size));
+      end = Math.max(0, Math.min(end, size));
+      bytesRemaining = end - start;
+      file = new FileInputStream(filename);
+      // skip upto start
+      long pos = 0;
+      while (pos < start) {
+        long result = file.skip(start - pos);
+        if (result < 0) {
+          bytesRemaining = 0;
+          break;
+        }
+        pos += result;
+      }
+    }
+
+    @Override
+    public int read() throws IOException {
+      int result = -1;
+      if (bytesRemaining > 0) {
+        bytesRemaining -= 1;
+        result = file.read();
+      }
+      return result;
+    }
+
+    @Override
+    public int read(byte[] buffer, int offset, int length) throws IOException {
+      length = (int) Math.min(length, bytesRemaining);
+      int bytes = file.read(buffer, offset, length);
+      if (bytes > 0) {
+        bytesRemaining -= bytes;
+      }
+      return bytes;
+    }
+
+    @Override
+    public int available() throws IOException {
+      return (int) Math.min(bytesRemaining, file.available());
+    }
+
+    @Override
+    public void close() throws IOException {
+      file.close();
+    }
+  }
+
+  private static final String bashCommand = "bash";
+  private static final String tailCommand = "tail";
+
+  /**
+   * Get the desired maximum length of task's logs.
+   * 
+   * @param conf the job to look in
+   * @return the number of bytes to cap the log files at
+   */
+  public static long getTaskLogLength(HamaConfiguration conf) {
+    return conf.getLong("mapred.userlog.limit.kb", 100) * 1024;
+  }
+
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files. If the
+   * tailLength is 0, the entire output will be saved.
+   * 
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @return the modified command that should be run
+   */
+  public static List<String> captureOutAndError(List<String> cmd,
+      File stdoutFilename, File stderrFilename, long tailLength)
+      throws IOException {
+    return captureOutAndError(null, cmd, stdoutFilename, stderrFilename,
+        tailLength);
+  }
+
+  /**
+   * Wrap a command in a shell to capture stdout and stderr to files. Setup
+   * commands such as setting memory limit can be passed which will be executed
+   * before exec. If the tailLength is 0, the entire output will be saved.
+   * 
+   * @param setup The setup commands for the execed process.
+   * @param cmd The command and the arguments that should be run
+   * @param stdoutFilename The filename that stdout should be saved to
+   * @param stderrFilename The filename that stderr should be saved to
+   * @param tailLength The length of the tail to be saved.
+   * @return the modified command that should be run
+   */
+  public static List<String> captureOutAndError(List<String> setup,
+      List<String> cmd, File stdoutFilename, File stderrFilename,
+      long tailLength) throws IOException {
+    String stdout = FileUtil.makeShellPath(stdoutFilename);
+    String stderr = FileUtil.makeShellPath(stderrFilename);
+    List<String> result = new ArrayList<String>(3);
+    result.add(bashCommand);
+    result.add("-c");
+    StringBuffer mergedCmd = new StringBuffer();
+    if (setup != null && setup.size() > 0) {
+      mergedCmd.append(addCommand(setup, false));
+      mergedCmd.append(";");
+    }
+    if (tailLength > 0) {
+      mergedCmd.append("(");
+    } else {
+      mergedCmd.append("exec ");
+    }
+    mergedCmd.append(addCommand(cmd, true));
+    mergedCmd.append(" < /dev/null ");
+    if (tailLength > 0) {
+      mergedCmd.append(" | ");
+      mergedCmd.append(tailCommand);
+      mergedCmd.append(" -c ");
+      mergedCmd.append(tailLength);
+      mergedCmd.append(" >> ");
+      mergedCmd.append(stdout);
+      mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
+      mergedCmd.append(tailCommand);
+      mergedCmd.append(" -c ");
+      mergedCmd.append(tailLength);
+      mergedCmd.append(" >> ");
+      mergedCmd.append(stderr);
+      mergedCmd.append(" ; exit $PIPESTATUS");
+    } else {
+      mergedCmd.append(" 1>> ");
+      mergedCmd.append(stdout);
+      mergedCmd.append(" 2>> ");
+      mergedCmd.append(stderr);
+    }
+    result.add(mergedCmd.toString());
+    return result;
+  }
+
+  /**
+   * Add quotes to each of the command strings and return as a single string
+   * 
+   * @param cmd The command to be quoted
+   * @param isExecutable makes shell path if the first argument is executable
+   * @return returns The quoted string.
+   * @throws IOException
+   */
+  public static String addCommand(List<String> cmd, boolean isExecutable)
+      throws IOException {
+    StringBuffer command = new StringBuffer();
+    for (String s : cmd) {
+      command.append('\'');
+      if (isExecutable) {
+        // the executable name needs to be expressed as a shell path for the
+        // shell to find it.
+        command.append(FileUtil.makeShellPath(new File(s)));
+        isExecutable = false;
+      } else {
+        command.append(s);
+      }
+      command.append('\'');
+      command.append(" ");
+    }
+    return command.toString();
+  }
+
+  /**
+   * Wrap a command in a shell to capture debug script's stdout and stderr to
+   * debugout.
+   * 
+   * @param cmd The command and the arguments that should be run
+   * @param debugoutFilename The filename that stdout and stderr should be saved
+   *          to.
+   * @return the modified command that should be run
+   * @throws IOException
+   */
+  public static List<String> captureDebugOut(List<String> cmd,
+      File debugoutFilename) throws IOException {
+    String debugout = FileUtil.makeShellPath(debugoutFilename);
+    List<String> result = new ArrayList<String>(3);
+    result.add(bashCommand);
+    result.add("-c");
+    StringBuffer mergedCmd = new StringBuffer();
+    mergedCmd.append("exec ");
+    boolean isExecutable = true;
+    for (String s : cmd) {
+      if (isExecutable) {
+        // the executable name needs to be expressed as a shell path for the
+        // shell to find it.
+        mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+        isExecutable = false;
+      } else {
+        mergedCmd.append(s);
+      }
+      mergedCmd.append(" ");
+    }
+    mergedCmd.append(" < /dev/null ");
+    mergedCmd.append(" >");
+    mergedCmd.append(debugout);
+    mergedCmd.append(" 2>&1 ");
+    result.add(mergedCmd.toString());
+    return result;
+  }
+
+} // TaskLog

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * A simple log4j-appender for the task child's BSP system logs.
+ */
+public class TaskLogAppender extends FileAppender {
+  private String taskId; // taskId should be managed as String rather than
+                         // TaskID object
+  // so that log4j can configure it from the configuration(log4j.properties).
+  private int maxEvents;
+  private Queue<LoggingEvent> tail = null;
+
+  @Override
+  public void activateOptions() {
+    synchronized (this) {
+      if (maxEvents > 0) {
+        tail = new LinkedList<LoggingEvent>();
+      }
+      setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId),
+          TaskLog.LogName.SYSLOG).toString());
+      setAppend(true);
+      super.activateOptions();
+    }
+  }
+
+  @Override
+  public void append(LoggingEvent event) {
+    synchronized (this) {
+      if (tail == null) {
+        super.append(event);
+      } else {
+        if (tail.size() >= maxEvents) {
+          tail.remove();
+        }
+        tail.add(event);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void close() {
+    if (tail != null) {
+      for (LoggingEvent event : tail) {
+        super.append(event);
+      }
+    }
+    super.close();
+  }
+
+  /**
+   * Getter/Setter methods for log4j.
+   */
+
+  public String getTaskId() {
+    return taskId;
+  }
+
+  public void setTaskId(String taskId) {
+    this.taskId = taskId;
+  }
+
+  private static final int EVENT_SIZE = 100;
+
+  public long getTotalLogFileSize() {
+    return maxEvents * EVENT_SIZE;
+  }
+
+  public void setTotalLogFileSize(long logSize) {
+    maxEvents = (int) logSize / EVENT_SIZE;
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.RunJar;
+
+/** 
+ * Base class that runs a task in a separate process. 
+ */
+public class TaskRunner extends Thread {
+
+  public static final Log LOG = LogFactory.getLog(TaskRunner.class);
+
+  boolean killed = false;
+  private Process process;
+  private Task task;
+  private BSPJob conf;
+  private GroomServer groomServer;
+
+  public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
+    this.task = bspTask;
+    this.conf = conf;
+    this.groomServer = groom;
+  }
+
+  public Task getTask() {
+    return task;
+  }
+
+  /**
+   * Called to assemble this task's input. This method is run in the parent
+   * process before the child is spawned. It should not execute user code, only
+   * system code.
+   */
+  public boolean prepare() throws IOException {
+    return true;
+  }
+
+  public void run() {
+    try {
+      String sep = System.getProperty("path.separator");
+      File workDir = new File(new File(task.getJobFile()).getParent(), "work");
+      boolean isCreated = workDir.mkdirs();
+      if(!isCreated) {
+        LOG.debug("TaskRunner.workDir : " + workDir);
+      }
+
+      StringBuffer classPath = new StringBuffer();
+      // start with same classpath as parent process
+      classPath.append(System.getProperty("java.class.path"));
+      classPath.append(sep);
+
+      String jar = conf.getJar();
+      if (jar != null) { // if jar exists, it into workDir
+        RunJar.unJar(new File(jar), workDir);
+        File[] libs = new File(workDir, "lib").listFiles();
+        if (libs != null) {
+          for (int i = 0; i < libs.length; i++) {
+            classPath.append(sep); // add libs from jar to classpath
+            classPath.append(libs[i]);
+          }
+        }
+        classPath.append(sep);
+        classPath.append(new File(workDir, "classes"));
+        classPath.append(sep);
+        classPath.append(workDir);
+      }
+
+      // Build exec child jmv args.
+      Vector<String> vargs = new Vector<String>();
+      File jvm = // use same jvm as parent
+      new File(new File(System.getProperty("java.home"), "bin"), "java");
+      vargs.add(jvm.toString());
+
+      // bsp.child.java.opts
+      String javaOpts = conf.getConf().get("bsp.child.java.opts", "-Xmx200m");
+      javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
+      
+      String[] javaOptsSplit = javaOpts.split(" ");
+      for (int i = 0; i < javaOptsSplit.length; i++) {
+        vargs.add(javaOptsSplit[i]);
+      }
+
+      // Add classpath.
+      vargs.add("-classpath");
+      vargs.add(classPath.toString());
+      // Add main class and its arguments
+      vargs.add(GroomServer.Child.class.getName()); // main of Child
+
+      InetSocketAddress addr = groomServer.getTaskTrackerReportAddress();
+      vargs.add(addr.getHostName());
+      vargs.add(Integer.toString(addr.getPort()));
+      vargs.add(task.getTaskID().toString());
+      vargs.add(groomServer.groomHostName);
+
+      // Run java
+      runChild((String[]) vargs.toArray(new String[0]), workDir);
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Run the child process
+   */
+  private void runChild(String[] args, File dir) throws IOException {
+    this.process = Runtime.getRuntime().exec(args, null, dir);
+    try {
+      new Thread() {
+        public void run() {
+          logStream(process.getErrorStream()); // copy log output
+        }
+      }.start();
+
+      logStream(process.getInputStream()); // normally empty
+
+      int exit_code = process.waitFor();
+      if (!killed && exit_code != 0) {
+        throw new IOException("Task process exit with nonzero status of "
+            + exit_code + ".");
+      }
+
+    } catch (InterruptedException e) {
+      throw new IOException(e.toString());
+    } finally {
+      kill();
+    }
+  }
+
+  /**
+   * Kill the child process
+   */
+  public void kill() {
+    if (process != null) {
+      process.destroy();
+    }
+    killed = true;
+  }
+
+  /**
+   */
+  private void logStream(InputStream output) {
+    try {
+      BufferedReader in = new BufferedReader(new InputStreamReader(output));
+      String line;
+      while ((line = in.readLine()) != null) {
+        LOG.info(task.getTaskID() + " " + line);
+      }
+    } catch (IOException e) {
+      LOG.warn(task.getTaskID() + " Error reading child output", e);
+    } finally {
+      try {
+        output.close();
+      } catch (IOException e) {
+        LOG.warn(task.getTaskID() + " Error closing child output", e);
+      }
+    }
+  }
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer}
+ * s.
+ */
+abstract class TaskScheduler implements Configurable {
+
+  protected Configuration conf;
+  protected GroomServerManager groomServerManager;
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public synchronized void setGroomServerManager(
+      GroomServerManager groomServerManager) {
+    this.groomServerManager = groomServerManager;
+  }
+
+  /**
+   * Lifecycle method to allow the scheduler to start any work in separate
+   * threads.
+   * 
+   * @throws IOException
+   */
+  public void start() throws IOException {
+    // do nothing
+  }
+
+  /**
+   * Lifecycle method to allow the scheduler to stop any work it is doing.
+   * 
+   * @throws IOException
+   */
+  public void terminate() throws IOException {
+    // do nothing
+  }
+
+  // public abstract void addJob(JobInProgress job);
+
+  /**
+   * Returns a collection of jobs in an order which is specific to the
+   * particular scheduler.
+   * 
+   * @param Queue name.
+   * @return JobInProgress corresponded to the specified queue.
+   */
+  public abstract Collection<JobInProgress> getJobs(String queue);
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Describes the current status of a task. This is not intended to be a
+ * comprehensive piece of data.
+ */
+class TaskStatus implements Writable, Cloneable {
+  static final Log LOG = LogFactory.getLog(TaskStatus.class);
+
+  // enumeration for reporting current phase of a task.
+  public static enum Phase {
+    STARTING, COMPUTE, BARRIER_SYNC, CLEANUP
+  }
+
+  // what state is the task in?
+  public static enum State {
+    RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN
+  }
+
+  private BSPJobID jobId;
+  private TaskAttemptID taskId;
+  private float progress;
+  private volatile State runState;
+  private String stateString;
+  private String groomServer;
+  private long superstepCount;
+
+  private long startTime;
+  private long finishTime;
+
+  private volatile Phase phase = Phase.STARTING;
+
+  /**
+   * 
+   */
+  public TaskStatus() {
+    jobId = new BSPJobID();
+    taskId = new TaskAttemptID();
+    this.superstepCount = 0;
+  }
+
+  public TaskStatus(BSPJobID jobId, TaskAttemptID taskId, float progress,
+      State runState, String stateString, String groomServer, Phase phase) {
+    this.jobId = jobId;
+    this.taskId = taskId;
+    this.progress = progress;
+    this.runState = runState;
+    this.stateString = stateString;
+    this.groomServer = groomServer;
+    this.phase = phase;
+    this.superstepCount = 0;
+  }
+
+  // //////////////////////////////////////////////////
+  // Accessors and Modifiers
+  // //////////////////////////////////////////////////
+
+  public BSPJobID getJobId() {
+    return jobId;
+  }
+
+  public TaskAttemptID getTaskId() {
+    return taskId;
+  }
+
+  public float getProgress() {
+    return progress;
+  }
+
+  public void setProgress(float progress) {
+    this.progress = progress;
+  }
+
+  public State getRunState() {
+    return runState;
+  }
+
+  public void setRunState(State state) {
+    this.runState = state;
+  }
+
+  public String getStateString() {
+    return stateString;
+  }
+
+  public void setStateString(String stateString) {
+    this.stateString = stateString;
+  }
+
+  public String getGroomServer() {
+    return groomServer;
+  }
+
+  public void setGroomServer(String groomServer) {
+    this.groomServer = groomServer;
+  }
+
+  public long getFinishTime() {
+    return finishTime;
+  }
+
+  void setFinishTime(long finishTime) {
+    this.finishTime = finishTime;
+  }
+
+  /**
+   * Get start time of the task.
+   * 
+   * @return 0 is start time is not set, else returns start time.
+   */
+  public long getStartTime() {
+    return startTime;
+  }
+
+  /**
+   * Set startTime of the task.
+   * 
+   * @param startTime start time
+   */
+  void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
+
+  /**
+   * Get current phase of this task.
+   * 
+   * @return .
+   */
+  public Phase getPhase() {
+    return this.phase;
+  }
+
+  /**
+   * Set current phase of this task.
+   * 
+   * @param phase phase of this task
+   */
+  void setPhase(Phase phase) {
+    this.phase = phase;
+  }
+
+  /**
+   * Update the status of the task.
+   * 
+   * This update is done by ping thread before sending the status.
+   * 
+   * @param progress
+   * @param state
+   * @param counters
+   */
+  synchronized void statusUpdate(float progress, String state) {
+    setProgress(progress);
+    setStateString(state);
+  }
+
+  /**
+   * Update the status of the task.
+   * 
+   * @param status updated status
+   */
+  synchronized void statusUpdate(TaskStatus status) {
+    this.progress = status.getProgress();
+    this.runState = status.getRunState();
+    this.stateString = status.getStateString();
+
+    if (status.getStartTime() != 0) {
+      this.startTime = status.getStartTime();
+    }
+    if (status.getFinishTime() != 0) {
+      this.finishTime = status.getFinishTime();
+    }
+
+    this.phase = status.getPhase();
+  }
+
+  /**
+   * Update specific fields of task status
+   * 
+   * This update is done in BSPMaster when a cleanup attempt of task reports its
+   * status. Then update only specific fields, not all.
+   * 
+   * @param runState
+   * @param progress
+   * @param state
+   * @param phase
+   * @param finishTime
+   */
+  synchronized void statusUpdate(State runState, float progress, String state,
+      Phase phase, long finishTime) {
+    setRunState(runState);
+    setProgress(progress);
+    setStateString(state);
+    setPhase(phase);
+    if (finishTime != 0) {
+      this.finishTime = finishTime;
+    }
+  }
+
+  /**
+   * @return The number of BSP super steps executed by the task.
+   */
+  public long getSuperstepCount() {
+    return superstepCount;
+  }
+
+  /**
+   * Increments the number of BSP super steps executed by the task.
+   */
+  public void incrementSuperstepCount() {
+    superstepCount += 1;
+  }
+
+  @Override
+  public Object clone() {
+    try {
+      return super.clone();
+    } catch (CloneNotSupportedException cnse) {
+      // Shouldn't happen since we do implement Clonable
+      throw new InternalError(cnse.toString());
+    }
+  }
+
+  // ////////////////////////////////////////////
+  // Writable
+  // ////////////////////////////////////////////
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.jobId.readFields(in);
+    this.taskId.readFields(in);
+    this.progress = in.readFloat();
+    this.runState = WritableUtils.readEnum(in, State.class);
+    this.stateString = Text.readString(in);
+    this.phase = WritableUtils.readEnum(in, Phase.class);
+    this.startTime = in.readLong();
+    this.finishTime = in.readLong();
+    this.superstepCount = in.readLong();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    jobId.write(out);
+    taskId.write(out);
+    out.writeFloat(progress);
+    WritableUtils.writeEnum(out, runState);
+    Text.writeString(out, stateString);
+    WritableUtils.writeEnum(out, phase);
+    out.writeLong(startTime);
+    out.writeLong(finishTime);
+    out.writeLong(superstepCount);
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/package.html?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/package.html (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/package.html Mon Aug  1 14:12:46 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+BSP computing framework.
+</body>
+</html>

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,501 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.http;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.log.LogLevel;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Handler;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.handler.ContextHandlerCollection;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.security.SslSocketConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.DefaultServlet;
+import org.mortbay.jetty.servlet.FilterHolder;
+import org.mortbay.jetty.servlet.FilterMapping;
+import org.mortbay.jetty.servlet.ServletHandler;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.jetty.webapp.WebAppContext;
+import org.mortbay.thread.BoundedThreadPool;
+import org.mortbay.util.MultiException;
+
+/**
+ * Create a Jetty embedded server to answer http requests. The primary goal is
+ * to serve up status information for the server. There are three contexts:
+ * "/logs/" -> points to the log directory "/static/" -> points to common static
+ * files (src/main/webapp/static) "/" -> the jsp server code from
+ * (src/main/webapp/<name>)
+ */
+public class HttpServer {
+  public static final Log LOG = LogFactory.getLog(HttpServer.class);
+
+  static final String FILTER_INITIALIZER_PROPERTY = "hama.http.filter.initializers";
+
+  protected final Server webServer;
+  protected final Connector listener;
+  protected final WebAppContext webAppContext;
+  protected final boolean findPort;
+  protected final Map<Context, Boolean> defaultContexts = new HashMap<Context, Boolean>();
+  protected final List<String> filterNames = new ArrayList<String>();
+  private static final int MAX_RETRIES = 10;
+
+  /** Same as this(name, bindAddress, port, findPort, null); */
+  public HttpServer(String name, String bindAddress, int port, boolean findPort)
+      throws IOException {
+    this(name, bindAddress, port, findPort, new Configuration());
+  }
+
+  /**
+   * Create a status server on the given port. The jsp scripts are taken from
+   * src/main/webapp/<name>.
+   * 
+   * @param name The name of the server
+   * @param port The port to use on the server
+   * @param findPort whether the server should start at the given port and
+   *          increment by 1 until it finds a free port.
+   * @param conf Configuration
+   */
+  public HttpServer(String name, String bindAddress, int port,
+      boolean findPort, Configuration conf) throws IOException {
+    webServer = new Server();
+    this.findPort = findPort;
+
+    listener = createBaseListener(conf);
+    listener.setHost(bindAddress);
+    listener.setPort(port);
+    webServer.addConnector(listener);
+
+    webServer.setThreadPool(new BoundedThreadPool());
+
+    final String appDir = getWebAppsPath();
+    ContextHandlerCollection contexts = new ContextHandlerCollection();
+    webServer.setHandler(contexts);
+    webAppContext = new WebAppContext();
+
+    System.setProperty("java.naming.factory.initial",
+        "org.mortbay.naming.InitialContextFactory");
+    System.setProperty("java.naming.factory.url.pkgs", "org.mortbay.naming");
+
+    webAppContext.setContextPath("/");
+    webAppContext.setWar(appDir + "/" + name);
+    webServer.addHandler(webAppContext);
+
+    addDefaultApps(contexts, appDir);
+
+    addDefaultServlets();
+  }
+
+  /**
+   * Create a required listener for the Jetty instance listening on the port
+   * provided. This wrapper and all subclasses must create at least one
+   * listener.
+   */
+  protected Connector createBaseListener(Configuration conf) throws IOException {
+    SelectChannelConnector ret = new SelectChannelConnector();
+    ret.setLowResourceMaxIdleTime(10000);
+    ret.setAcceptQueueSize(128);
+    ret.setResolveNames(false);
+    ret.setUseDirectBuffers(false);
+    return ret;
+  }
+
+  /**
+   * Add default apps.
+   * 
+   * @param appDir The application directory
+   * @throws IOException
+   */
+  protected void addDefaultApps(ContextHandlerCollection parent,
+      final String appDir) throws IOException {
+    // set up the context for "/logs/" if "hama.log.dir" property is defined.
+    String logDir = System.getProperty("hama.log.dir");
+    if (logDir != null) {
+      Context logContext = new Context(parent, "/logs");
+      logContext.setResourceBase(logDir);
+      logContext.addServlet(DefaultServlet.class, "/");
+      defaultContexts.put(logContext, true);
+    }
+    // set up the context for "/static/*"
+    Context staticContext = new Context(parent, "/static");
+    staticContext.setResourceBase(appDir + "/static");
+    staticContext.addServlet(DefaultServlet.class, "/*");
+    defaultContexts.put(staticContext, true);
+  }
+
+  /**
+   * Add default servlets.
+   */
+  protected void addDefaultServlets() {
+    // set up default servlets
+    addServlet("stacks", "/stacks", StackServlet.class);
+    addServlet("logLevel", "/logLevel", LogLevel.Servlet.class);
+  }
+
+  public void addContext(Context ctxt, boolean isFiltered) throws IOException {
+    webServer.addHandler(ctxt);
+    defaultContexts.put(ctxt, isFiltered);
+  }
+
+  /**
+   * Add a context
+   * 
+   * @param pathSpec The path spec for the context
+   * @param dir The directory containing the context
+   * @param isFiltered if true, the servlet is added to the filter path mapping
+   * @throws IOException
+   */
+  protected void addContext(String pathSpec, String dir, boolean isFiltered)
+      throws IOException {
+    if (0 == webServer.getHandlers().length) {
+      throw new RuntimeException("Couldn't find handler");
+    }
+    WebAppContext webAppCtx = new WebAppContext();
+    webAppCtx.setContextPath(pathSpec);
+    webAppCtx.setWar(dir);
+    addContext(webAppCtx, true);
+  }
+
+  /**
+   * Set a value in the webapp context. These values are available to the jsp
+   * pages as "application.getAttribute(name)".
+   * 
+   * @param name The name of the attribute
+   * @param value The value of the attribute
+   */
+  public void setAttribute(String name, Object value) {
+    webAppContext.setAttribute(name, value);
+  }
+
+  /**
+   * Add a servlet in the server.
+   * 
+   * @param name The name of the servlet (can be passed as null)
+   * @param pathSpec The path spec for the servlet
+   * @param clazz The servlet class
+   */
+  public void addServlet(String name, String pathSpec,
+      Class<? extends HttpServlet> clazz) {
+    addInternalServlet(name, pathSpec, clazz);
+    addFilterPathMapping(pathSpec, webAppContext);
+  }
+
+  /**
+   * Add an internal servlet in the server.
+   * 
+   * @param name The name of the servlet (can be passed as null)
+   * @param pathSpec The path spec for the servlet
+   * @param clazz The servlet class
+   * @deprecated this is a temporary method
+   */
+  @Deprecated
+  public void addInternalServlet(String name, String pathSpec,
+      Class<? extends HttpServlet> clazz) {
+    ServletHolder holder = new ServletHolder(clazz);
+    if (name != null) {
+      holder.setName(name);
+    }
+    webAppContext.addServlet(holder, pathSpec);
+  }
+
+  /**
+   * Define a filter for a context and set up default url mappings.
+   */
+  protected void defineFilter(Context ctx, String name, String classname,
+      Map<String, String> parameters, String[] urls) {
+
+    FilterHolder holder = new FilterHolder();
+    holder.setName(name);
+    holder.setClassName(classname);
+    holder.setInitParameters(parameters);
+    FilterMapping fmap = new FilterMapping();
+    fmap.setPathSpecs(urls);
+    fmap.setDispatches(Handler.ALL);
+    fmap.setFilterName(name);
+    ServletHandler handler = ctx.getServletHandler();
+    handler.addFilter(holder, fmap);
+  }
+
+  /**
+   * Add the path spec to the filter path mapping.
+   * 
+   * @param pathSpec The path spec
+   * @param webAppCtx The WebApplicationContext to add to
+   */
+  protected void addFilterPathMapping(String pathSpec, Context webAppCtx) {
+    ServletHandler handler = webAppCtx.getServletHandler();
+    for (String name : filterNames) {
+      FilterMapping fmap = new FilterMapping();
+      fmap.setPathSpec(pathSpec);
+      fmap.setFilterName(name);
+      fmap.setDispatches(Handler.ALL);
+      handler.addFilterMapping(fmap);
+    }
+  }
+
+  /**
+   * Get the value in the webapp context.
+   * 
+   * @param name The name of the attribute
+   * @return The value of the attribute
+   */
+  public Object getAttribute(String name) {
+    return webAppContext.getAttribute(name);
+  }
+
+  /**
+   * Get the pathname to the webapps files.
+   * 
+   * @return the pathname as a URL
+   * @throws IOException if 'webapps' directory cannot be found on CLASSPATH.
+   */
+  protected String getWebAppsPath() throws IOException {
+    // URL url = BSPMaster.class.getClassLoader().getResource("webapps");
+    // if (url == null)
+    // throw new IOException("webapps not found in CLASSPATH");
+    // return url.toString();
+    return "src/main/webapp";
+  }
+
+  /**
+   * Get the port that the server is on
+   * 
+   * @return the port
+   */
+  public int getPort() {
+    return webServer.getConnectors()[0].getLocalPort();
+  }
+
+  /**
+   * Set the min, max number of worker threads (simultaneous connections).
+   */
+  public void setThreads(int min, int max) {
+    BoundedThreadPool pool = (BoundedThreadPool) webServer.getThreadPool();
+    pool.setMinThreads(min);
+    pool.setMaxThreads(max);
+  }
+
+  /**
+   * Configure an ssl listener on the server.
+   * 
+   * @param addr address to listen on
+   * @param keystore location of the keystore
+   * @param storPass password for the keystore
+   * @param keyPass password for the key
+   * @deprecated Use
+   *             {@link #addSslListener(InetSocketAddress, Configuration, boolean)}
+   */
+  @Deprecated
+  public void addSslListener(InetSocketAddress addr, String keystore,
+      String storPass, String keyPass) throws IOException {
+    if (webServer.isStarted()) {
+      throw new IOException("Failed to add ssl listener");
+    }
+    SslSocketConnector sslListener = new SslSocketConnector();
+    sslListener.setHost(addr.getHostName());
+    sslListener.setPort(addr.getPort());
+    sslListener.setKeystore(keystore);
+    sslListener.setPassword(storPass);
+    sslListener.setKeyPassword(keyPass);
+    webServer.addConnector(sslListener);
+  }
+
+  /**
+   * Configure an ssl listener on the server.
+   * 
+   * @param addr address to listen on
+   * @param sslConf conf to retrieve ssl options
+   * @param needClientAuth whether client authentication is required
+   */
+  public void addSslListener(InetSocketAddress addr, Configuration sslConf,
+      boolean needClientAuth) throws IOException {
+    if (webServer.isStarted()) {
+      throw new IOException("Failed to add ssl listener");
+    }
+    if (needClientAuth) {
+      // setting up SSL truststore for authenticating clients
+      System.setProperty("javax.net.ssl.trustStore", sslConf.get(
+          "ssl.server.truststore.location", ""));
+      System.setProperty("javax.net.ssl.trustStorePassword", sslConf.get(
+          "ssl.server.truststore.password", ""));
+      System.setProperty("javax.net.ssl.trustStoreType", sslConf.get(
+          "ssl.server.truststore.type", "jks"));
+    }
+    SslSocketConnector sslListener = new SslSocketConnector();
+    sslListener.setHost(addr.getHostName());
+    sslListener.setPort(addr.getPort());
+    sslListener.setKeystore(sslConf.get("ssl.server.keystore.location"));
+    sslListener.setPassword(sslConf.get("ssl.server.keystore.password", ""));
+    sslListener.setKeyPassword(sslConf.get("ssl.server.keystore.keypassword",
+        ""));
+    sslListener.setKeystoreType(sslConf.get("ssl.server.keystore.type", "jks"));
+    sslListener.setNeedClientAuth(needClientAuth);
+    webServer.addConnector(sslListener);
+  }
+
+  /**
+   * Start the server. Does not wait for the server to start.
+   */
+  public void start() throws IOException {
+    try {
+      int port = 0;
+      int oriPort = listener.getPort(); // The original requested port
+      while (true) {
+        try {
+          port = webServer.getConnectors()[0].getLocalPort();
+          LOG.info("Port returned by webServer.getConnectors()[0]."
+              + "getLocalPort() before open() is " + port
+              + ". Opening the listener on " + oriPort);
+          listener.open();
+          port = listener.getLocalPort();
+          LOG.info("listener.getLocalPort() returned "
+              + listener.getLocalPort()
+              + " webServer.getConnectors()[0].getLocalPort() returned "
+              + webServer.getConnectors()[0].getLocalPort());
+          // Workaround to handle the problem reported in HADOOP-4744
+          if (port < 0) {
+            Thread.sleep(100);
+            int numRetries = 1;
+            while (port < 0) {
+              LOG.warn("listener.getLocalPort returned " + port);
+              if (numRetries++ > MAX_RETRIES) {
+                throw new Exception(" listener.getLocalPort is returning "
+                    + "less than 0 even after " + numRetries + " resets");
+              }
+              for (int i = 0; i < 2; i++) {
+                LOG.info("Retrying listener.getLocalPort()");
+                port = listener.getLocalPort();
+                if (port > 0) {
+                  break;
+                }
+                Thread.sleep(200);
+              }
+              if (port > 0) {
+                break;
+              }
+              LOG.info("Bouncing the listener");
+              listener.close();
+              Thread.sleep(1000);
+              listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+              listener.open();
+              Thread.sleep(100);
+              port = listener.getLocalPort();
+            }
+          } // Workaround end
+          LOG.info("Jetty bound to port " + port);
+          webServer.start();
+          // Workaround for HADOOP-6386
+          port = listener.getLocalPort();
+          if (port < 0) {
+            LOG.warn("Bounds port is " + port + " after webserver start");
+            for (int i = 0; i < MAX_RETRIES / 2; i++) {
+              try {
+                webServer.stop();
+              } catch (Exception e) {
+                LOG.warn("Can't stop  web-server", e);
+              }
+              Thread.sleep(1000);
+
+              listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+              listener.open();
+              Thread.sleep(100);
+              webServer.start();
+              LOG.info(i + "attempts to restart webserver");
+              port = listener.getLocalPort();
+              if (port > 0)
+                break;
+            }
+            if (port < 0)
+              throw new Exception("listener.getLocalPort() is returning "
+                  + "less than 0 even after " + MAX_RETRIES + " resets");
+          }
+          // End of HADOOP-6386 workaround
+          break;
+        } catch (IOException ex) {
+          // if this is a bind exception,
+          // then try the next port number.
+          if (ex instanceof BindException) {
+            if (!findPort) {
+              throw (BindException) ex;
+            }
+          } else {
+            LOG.info("HttpServer.start() threw a non Bind IOException");
+            throw ex;
+          }
+        } catch (MultiException ex) {
+          LOG.info("HttpServer.start() threw a MultiException");
+          throw ex;
+        }
+        listener.setPort((oriPort += 1));
+      }
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException("Problem starting http server", e);
+    }
+  }
+
+  /**
+   * stop the server
+   */
+  public void stop() throws Exception {
+    listener.close();
+    webServer.stop();
+  }
+
+  public void join() throws InterruptedException {
+    webServer.join();
+  }
+
+  /**
+   * A very simple servlet to serve up a text representation of the current
+   * stack traces. It both returns the stacks to the caller and logs them.
+   * Currently the stack traces are done sequentially rather than exactly the
+   * same data.
+   */
+  public static class StackServlet extends HttpServlet {
+    private static final long serialVersionUID = -6284183679759467039L;
+
+    @Override
+    public void doGet(HttpServletRequest request, HttpServletResponse response)
+        throws ServletException, IOException {
+
+      PrintWriter out = new PrintWriter(response.getOutputStream());
+      ReflectionUtils.printThreadInfo(out, "");
+      out.close();
+      ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1);
+    }
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/http/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/http/package.html?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/http/package.html (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/http/package.html Mon Aug  1 14:12:46 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Contains the administrative web interfaces. 
+</body>
+</html>

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.PeerNames;
+import org.apache.hama.bsp.Task;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Protocol that task child process uses to contact its parent process.
+ */
+public interface BSPPeerProtocol extends HamaRPCProtocolVersion, Closeable,
+    Constants {
+
+  /** Called when a child task process starts, to get its task. */
+  Task getTask(TaskAttemptID taskid) throws IOException;
+
+  /**
+   * Periodically called by child to check if parent is still alive.
+   * 
+   * @return True if the task is known
+   */
+  boolean ping(TaskAttemptID taskid) throws IOException;
+
+  /**
+   * Report that the task is successfully completed. Failure is assumed if the
+   * task process exits without calling this.
+   * 
+   * @param taskid task's id
+   * @param shouldBePromoted whether to promote the task's output or not
+   */
+  void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException;
+
+  /** Report that the task encounted a local filesystem error. */
+  void fsError(TaskAttemptID taskId, String message) throws IOException;
+
+  void incrementSuperstepCount(TaskAttemptID taskid) throws IOException;
+
+  /**
+   * @return the all BSPPeer names.
+   */
+  PeerNames getAllPeerNames();
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.Directive;
+
+/**
+ * A protocol for BSPMaster talks to GroomServer. 
+ */
+public interface GroomProtocol extends HamaRPCProtocolVersion {
+
+  /**
+   * Instruct GroomServer performaning tasks.
+   * 
+   * @param directive instructs a GroomServer performing necessary
+   *        execution.
+   * @throws IOException
+   */
+  void dispatch(Directive directive) throws IOException;
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * There is one version id for all the RPC interfaces. If any interface is
+ * changed, the versionID must be changed here.
+ */
+public interface HamaRPCProtocolVersion extends VersionedProtocol {
+  public static final long versionID = 1L;
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.JobProfile;
+import org.apache.hama.bsp.JobStatus;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Protocol that a groom server and the central BSP Master use to communicate.
+ * This interface will contains several methods: submitJob, killJob, and
+ * killTask.
+ */
+public interface JobSubmissionProtocol extends HamaRPCProtocolVersion {
+
+  /**
+   * Allocate a new id for the job.
+   * 
+   * @return job id
+   * @throws IOException
+   */
+  public BSPJobID getNewJobId() throws IOException;
+
+  /**
+   * Submit a Job for execution. Returns the latest profile for that job. The
+   * job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
+   * 
+   * @param jobID
+   * @param jobFile
+   * @return jobStatus
+   * @throws IOException
+   */
+  // public JobStatus submitJob(BSPJobID jobName) throws IOException;
+
+  public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
+
+  /**
+   * Get the current status of the cluster
+   * 
+   * @param detailed if true then report groom names as well
+   * @return summary of the state of the cluster
+   */
+  public ClusterStatus getClusterStatus(boolean detailed) throws IOException;
+
+  /**
+   * Grab a handle to a job that is already known to the BSPMaster.
+   * 
+   * @return Profile of the job, or null if not found.
+   */
+  public JobProfile getJobProfile(BSPJobID jobid) throws IOException;
+
+  /**
+   * Grab a handle to a job that is already known to the BSPMaster.
+   * 
+   * @return Status of the job, or null if not found.
+   */
+  public JobStatus getJobStatus(BSPJobID jobid) throws IOException;
+
+  /**
+   * A BSP system always operates on a single filesystem. This function returns
+   * the fs name. ('local' if the localfs; 'addr:port' if dfs). The client can
+   * then copy files into the right locations prior to submitting the job.
+   */
+  public String getFilesystemName() throws IOException;
+
+  /**
+   * Get the jobs that are not completed and not failed
+   * 
+   * @return array of JobStatus for the running/to-be-run jobs.
+   */
+  public JobStatus[] jobsToComplete() throws IOException;
+
+  /**
+   * Get all the jobs submitted.
+   * 
+   * @return array of JobStatus for the submitted jobs
+   */
+  public JobStatus[] getAllJobs() throws IOException;
+
+  /**
+   * Grab the bspmaster system directory path where job-specific files are to be
+   * placed.
+   * 
+   * @return the system directory where job-specific files are to be placed.
+   */
+  public String getSystemDir();
+
+  /**
+   * Kill the indicated job
+   */
+  public void killJob(BSPJobID jobid) throws IOException;
+
+  /**
+   * Kill indicated task attempt.
+   * 
+   * @param taskId the id of the task to kill.
+   * @param shouldFail if true the task is failed and added to failed tasks
+   *          list, otherwise it is just killed, w/o affecting job failure
+   *          status.
+   */
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+      throws IOException;
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.Directive;
+
+/**
+ * A new protocol for GroomServers communicate with BSPMaster. This
+ * protocol paired with WorkerProtocl, let GroomServers enrol with 
+ * BSPMaster, so that BSPMaster can dispatch tasks to GroomServers.
+ */
+public interface MasterProtocol extends HamaRPCProtocolVersion {
+
+  /**
+   * A GroomServer register with its status to BSPMaster, which will update
+   * GroomServers cache.
+   *
+   * @param status to be updated in cache.
+   * @return true if successfully register with BSPMaster; false if fail.
+   */
+  boolean register(GroomServerStatus status) throws IOException;
+
+  /**
+   * A GroomServer (periodically) reports task statuses back to the BSPMaster.
+   * @param directive 
+   */
+  boolean report(Directive directive) throws IOException;
+
+  public String getSystemDir();  
+
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/package.html?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/package.html (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/package.html Mon Aug  1 14:12:46 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Tools to help define network clients and servers. 
+</body>
+</html>

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/package.html?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/package.html (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/package.html Mon Aug  1 14:12:46 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Hama base package. 
+</body>
+</html>

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java Mon Aug  1 14:12:46 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Calendar;
+
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.hama.bsp.JobStatus;
+
+public class BSPServletUtil extends ServletUtil {
+
+  public static final String HTML_TAIL = "<hr />\n"
+      + "<a href='http://incubator.apache.org/hama/'>Hama</a>, "
+      + Calendar.getInstance().get(Calendar.YEAR) + ".\n" + "</body></html>";
+
+  /**
+   * HTML footer to be added in the jsps.
+   * 
+   * @return the HTML footer.
+   */
+  public static String htmlFooter() {
+    return HTML_TAIL;
+  }
+
+  /**
+   * Method used to generate the Job table for Job pages.
+   * 
+   * @param label display heading to be used in the job table.
+   * @param jobs vector of jobs to be displayed in table.
+   * @param refresh refresh interval to be used in jobdetails page.
+   * @param rowId beginning row id to be used in the table.
+   * @return generated HTML
+   * @throws IOException
+   */
+  public static String generateJobTable(String label, JobStatus[] jobs,
+      int refresh, int rowId) throws IOException {
+
+    StringBuffer sb = new StringBuffer();
+
+    if (jobs.length > 0) {
+      sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
+      sb.append("<tr><th>Jobid</th>" + "<th>User</th>" + "<th>Name</th>"
+          + "<th>SuperStep</th>" + "<th>Starttime</th>" + "</tr>\n");
+      for (JobStatus status : jobs) {
+        sb.append("<tr><td><a href=\"bspjob.jsp?jobid="+status.getJobID()+ "\">");
+        sb.append(status.getJobID());
+        sb.append("</a></td><td>");
+        sb.append(status.getUsername());
+        sb.append("</td><td>");
+        sb.append(status.getName());
+        sb.append("</td><td>");
+        sb.append(status.getSuperstepCount());
+        sb.append("</td><td>");
+        sb.append(new Date(status.getStartTime()));
+        sb.append("</td></tr>\n");
+      }
+      sb.append("</table>");
+    } else {
+      sb.append("No jobs found!");
+    }
+
+    return sb.toString();
+  }
+
+}