You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by to...@apache.org on 2011/08/01 16:12:56 UTC
svn commit: r1152788 [5/9] - in /incubator/hama/trunk: ./ bin/ conf/ core/
core/bin/ core/conf/ core/src/ core/src/main/ core/src/main/java/
core/src/main/java/org/ core/src/main/java/org/apache/
core/src/main/java/org/apache/hama/ core/src/main/java/o...
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskInProgress.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobStatus;
+
+/**
+ *TaskInProgress maintains all the info needed for a Task in the lifetime of
+ * its owning Job.
+ */
+class TaskInProgress {
+ public static final Log LOG = LogFactory.getLog(TaskInProgress.class);
+
+ private Configuration conf;
+
+ // Constants
+ static final int MAX_TASK_EXECS = 1;
+ int maxTaskAttempts = 4;
+ private boolean failed = false;
+ private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
+
+ // Job Meta
+ private String jobFile = null;
+ private int partition;
+ private BSPMaster bspMaster;
+ private TaskID id;
+ private JobInProgress job;
+ private int completes = 0;
+
+ // Status
+ // private double progress = 0;
+ // private String state = "";
+ private long startTime = 0;
+
+ // The 'next' usable taskid of this tip
+ int nextTaskId = 0;
+
+ // The taskid that took this TIP to SUCCESS
+ private TaskAttemptID successfulTaskId;
+
+ // The first taskid of this tip
+ private TaskAttemptID firstTaskId;
+
+ // Map from task Id -> GroomServer Id, contains tasks that are
+ // currently runnings
+ private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
+ // All attempt Ids of this TIP
+ // private TreeSet<TaskAttemptID> tasks = new TreeSet<TaskAttemptID>();
+ /**
+ * Map from taskId -> TaskStatus
+ */
+ private TreeMap<TaskAttemptID, TaskStatus> taskStatuses = new TreeMap<TaskAttemptID, TaskStatus>();
+
+ private BSPJobID jobId;
+
+ /**
+ * Constructor for new nexus between BSPMaster and GroomServer.
+ *
+ * @param jobId is identification of JobInProgress.
+ * @param jobFile the path of job file
+ * @param partition which partition this TaskInProgress owns.
+ */
+ public TaskInProgress(BSPJobID jobId, String jobFile, int partition) {
+ this.jobId = jobId;
+ this.jobFile = jobFile;
+ this.partition = partition;
+
+ this.id = new TaskID(jobId, partition);
+ }
+
+ public TaskInProgress(BSPJobID jobId, String jobFile, BSPMaster master,
+ Configuration conf, JobInProgress job, int partition) {
+ this.jobId = jobId;
+ this.jobFile = jobFile;
+ this.setBspMaster(master);
+ this.job = job;
+ this.setConf(conf);
+ this.partition = partition;
+
+ this.id = new TaskID(jobId, partition);
+ }
+
+ /**
+ * Return a Task that can be sent to a GroomServer for execution.
+ */
+ public Task getTaskToRun(GroomServerStatus status) throws IOException {
+ Task t = null;
+
+ TaskAttemptID taskid = null;
+ if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts)) {
+ int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART
+ + nextTaskId;
+ taskid = new TaskAttemptID(id, attemptId);
+ ++nextTaskId;
+ } else {
+ LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts)
+ + " attempts for the tip '" + getTIPId() + "'");
+ return null;
+ }
+
+ t = new BSPTask(jobId, jobFile, taskid, partition);
+ activeTasks.put(taskid, status.getGroomName());
+
+ return t;
+ }
+
+ // //////////////////////////////////
+ // Accessors
+ // //////////////////////////////////
+ /**
+ * Return the start time
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Return the parent job
+ */
+ public JobInProgress getJob() {
+ return job;
+ }
+
+ public TaskID getTIPId() {
+ return id;
+ }
+
+ public TaskID getTaskId() {
+ return this.id;
+ }
+
+ public TreeMap<TaskAttemptID, String> getTasks() {
+ return activeTasks;
+ }
+
+ /**
+ * Is the Task associated with taskid is the first attempt of the tip?
+ *
+ * @param taskId
+ * @return Returns true if the Task is the first attempt of the tip
+ */
+ public boolean isFirstAttempt(TaskAttemptID taskId) {
+ return firstTaskId == null ? false : firstTaskId.equals(taskId);
+ }
+
+ /**
+ * Is this tip currently running any tasks?
+ *
+ * @return true if any tasks are running
+ */
+ public boolean isRunning() {
+ return !activeTasks.isEmpty();
+ }
+
+ /**
+ * Is this tip complete?
+ *
+ * @return <code>true</code> if the tip is complete, else <code>false</code>
+ */
+ public synchronized boolean isComplete() {
+ return (completes > 0);
+ }
+
+ /**
+ * Is the given taskid the one that took this tip to completion?
+ *
+ * @param taskid taskid of attempt to check for completion
+ * @return <code>true</code> if taskid is complete, else <code>false</code>
+ */
+ public boolean isComplete(TaskAttemptID taskid) {
+ return (completes > 0 && taskid.equals(getSuccessfulTaskid()));
+ }
+
+ private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
+
+ public boolean shouldCloseForClosedJob(TaskAttemptID taskid) {
+ TaskStatus ts = (TaskStatus) taskStatuses.get(taskid);
+ if ((ts != null) && (!tasksReportedClosed.contains(taskid))
+ && (job.getStatus().getRunState() != JobStatus.RUNNING)) {
+ tasksReportedClosed.add(taskid);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public void completed(TaskAttemptID taskid) {
+ LOG.info("Task '" + taskid.getTaskID().toString() + "' has completed.");
+
+ TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+ status.setRunState(TaskStatus.State.SUCCEEDED);
+ activeTasks.remove(taskid);
+
+ // Note the successful taskid
+ setSuccessfulTaskid(taskid);
+
+ //
+ // Now that the TIP is complete, the other speculative
+ // subtasks will be closed when the owning groom server
+ // reports in and calls shouldClose() on this object.
+ //
+
+ this.completes++;
+ }
+
+ public void terminated(TaskAttemptID taskid) {
+ LOG.info("Task '" + taskid.getTaskID().toString() + "' has failed.");
+
+ TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
+ status.setRunState(TaskStatus.State.FAILED);
+ activeTasks.remove(taskid);
+ }
+
+ private void setSuccessfulTaskid(TaskAttemptID taskid) {
+ this.successfulTaskId = taskid;
+ }
+
+ private TaskAttemptID getSuccessfulTaskid() {
+ return successfulTaskId;
+ }
+
+ public void updateStatus(TaskStatus status) {
+ taskStatuses.put(status.getTaskId(), status);
+ }
+
+ public TaskStatus getTaskStatus(TaskAttemptID taskId) {
+ return this.taskStatuses.get(taskId);
+ }
+
+ public void kill() {
+ this.failed = true;
+ }
+
+ public boolean isFailed() {
+ return failed;
+ }
+
+ /**
+ * @param conf the conf to set
+ */
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * @return the conf
+ */
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * @param bspMaster the bspMaster to set
+ */
+ public void setBspMaster(BSPMaster bspMaster) {
+ this.bspMaster = bspMaster;
+ }
+
+ /**
+ * @return the bspMaster
+ */
+ public BSPMaster getBspMaster() {
+ return bspMaster;
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLog.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,343 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hama.HamaConfiguration;
+
+/**
+ * A simple logger to handle the task-specific user logs.
+ */
+public class TaskLog {
+ private static final Log LOG = LogFactory.getLog(TaskLog.class.getName());
+
+ private static final File LOG_DIR = new File(
+ System.getProperty("hama.log.dir"), "userlogs").getAbsoluteFile();
+
+ static {
+ if (!LOG_DIR.exists()) {
+ LOG_DIR.mkdirs();
+ }
+ }
+
+ public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
+ return new File(new File(LOG_DIR, taskid.toString()), filter.toString());
+ }
+
+ /**
+ * The filter for userlogs.
+ */
+ public static enum LogName {
+ /** Log on the stdout of the task. */
+ STDOUT("stdout"),
+
+ /** Log on the stderr of the task. */
+ STDERR("stderr"),
+
+ /** Log on the map-reduce system logs of the task. */
+ SYSLOG("syslog"),
+
+ /** The java profiler information. */
+ PROFILE("profile.out"),
+
+ /** Log the debug script's stdout */
+ DEBUGOUT("debugout");
+
+ private String prefix;
+
+ private LogName(String prefix) {
+ this.prefix = prefix;
+ }
+
+ @Override
+ public String toString() {
+ return prefix;
+ }
+ }
+
+ private static class TaskLogsPurgeFilter implements FileFilter {
+ long purgeTimeStamp;
+
+ TaskLogsPurgeFilter(long purgeTimeStamp) {
+ this.purgeTimeStamp = purgeTimeStamp;
+ }
+
+ public boolean accept(File file) {
+ LOG.debug("PurgeFilter - file: " + file + ", mtime: "
+ + file.lastModified() + ", purge: " + purgeTimeStamp);
+ return file.lastModified() < purgeTimeStamp;
+ }
+ }
+
+ /**
+ * Purge old user logs.
+ *
+ * @throws IOException
+ */
+ public static synchronized void cleanup(int logsRetainHours)
+ throws IOException {
+ // Purge logs of tasks on this tasktracker if their
+ // mtime has exceeded "mapred.task.log.retain" hours
+ long purgeTimeStamp = System.currentTimeMillis()
+ - (logsRetainHours * 60L * 60 * 1000);
+ File[] oldTaskLogs = LOG_DIR.listFiles(new TaskLogsPurgeFilter(
+ purgeTimeStamp));
+ if (oldTaskLogs != null) {
+ for (int i = 0; i < oldTaskLogs.length; ++i) {
+ FileUtil.fullyDelete(oldTaskLogs[i]);
+ }
+ }
+ }
+
+ static class Reader extends InputStream {
+ private long bytesRemaining;
+ private FileInputStream file;
+
+ /**
+ * Read a log file from start to end positions. The offsets may be negative,
+ * in which case they are relative to the end of the file. For example,
+ * Reader(taskid, kind, 0, -1) is the entire file and Reader(taskid, kind,
+ * -4197, -1) is the last 4196 bytes.
+ *
+ * @param taskid the id of the task to read the log file for
+ * @param kind the kind of log to read
+ * @param start the offset to read from (negative is relative to tail)
+ * @param end the offset to read upto (negative is relative to tail)
+ * @throws IOException
+ */
+ public Reader(TaskAttemptID taskid, LogName kind, long start, long end)
+ throws IOException {
+ // find the right log file
+ File filename = getTaskLogFile(taskid, kind);
+ // calculate the start and stop
+ long size = filename.length();
+ if (start < 0) {
+ start += size + 1;
+ }
+ if (end < 0) {
+ end += size + 1;
+ }
+ start = Math.max(0, Math.min(start, size));
+ end = Math.max(0, Math.min(end, size));
+ bytesRemaining = end - start;
+ file = new FileInputStream(filename);
+ // skip upto start
+ long pos = 0;
+ while (pos < start) {
+ long result = file.skip(start - pos);
+ if (result < 0) {
+ bytesRemaining = 0;
+ break;
+ }
+ pos += result;
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ int result = -1;
+ if (bytesRemaining > 0) {
+ bytesRemaining -= 1;
+ result = file.read();
+ }
+ return result;
+ }
+
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ length = (int) Math.min(length, bytesRemaining);
+ int bytes = file.read(buffer, offset, length);
+ if (bytes > 0) {
+ bytesRemaining -= bytes;
+ }
+ return bytes;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int) Math.min(bytesRemaining, file.available());
+ }
+
+ @Override
+ public void close() throws IOException {
+ file.close();
+ }
+ }
+
+ private static final String bashCommand = "bash";
+ private static final String tailCommand = "tail";
+
+ /**
+ * Get the desired maximum length of task's logs.
+ *
+ * @param conf the job to look in
+ * @return the number of bytes to cap the log files at
+ */
+ public static long getTaskLogLength(HamaConfiguration conf) {
+ return conf.getLong("mapred.userlog.limit.kb", 100) * 1024;
+ }
+
+ /**
+ * Wrap a command in a shell to capture stdout and stderr to files. If the
+ * tailLength is 0, the entire output will be saved.
+ *
+ * @param cmd The command and the arguments that should be run
+ * @param stdoutFilename The filename that stdout should be saved to
+ * @param stderrFilename The filename that stderr should be saved to
+ * @param tailLength The length of the tail to be saved.
+ * @return the modified command that should be run
+ */
+ public static List<String> captureOutAndError(List<String> cmd,
+ File stdoutFilename, File stderrFilename, long tailLength)
+ throws IOException {
+ return captureOutAndError(null, cmd, stdoutFilename, stderrFilename,
+ tailLength);
+ }
+
+ /**
+ * Wrap a command in a shell to capture stdout and stderr to files. Setup
+ * commands such as setting memory limit can be passed which will be executed
+ * before exec. If the tailLength is 0, the entire output will be saved.
+ *
+ * @param setup The setup commands for the execed process.
+ * @param cmd The command and the arguments that should be run
+ * @param stdoutFilename The filename that stdout should be saved to
+ * @param stderrFilename The filename that stderr should be saved to
+ * @param tailLength The length of the tail to be saved.
+ * @return the modified command that should be run
+ */
+ public static List<String> captureOutAndError(List<String> setup,
+ List<String> cmd, File stdoutFilename, File stderrFilename,
+ long tailLength) throws IOException {
+ String stdout = FileUtil.makeShellPath(stdoutFilename);
+ String stderr = FileUtil.makeShellPath(stderrFilename);
+ List<String> result = new ArrayList<String>(3);
+ result.add(bashCommand);
+ result.add("-c");
+ StringBuffer mergedCmd = new StringBuffer();
+ if (setup != null && setup.size() > 0) {
+ mergedCmd.append(addCommand(setup, false));
+ mergedCmd.append(";");
+ }
+ if (tailLength > 0) {
+ mergedCmd.append("(");
+ } else {
+ mergedCmd.append("exec ");
+ }
+ mergedCmd.append(addCommand(cmd, true));
+ mergedCmd.append(" < /dev/null ");
+ if (tailLength > 0) {
+ mergedCmd.append(" | ");
+ mergedCmd.append(tailCommand);
+ mergedCmd.append(" -c ");
+ mergedCmd.append(tailLength);
+ mergedCmd.append(" >> ");
+ mergedCmd.append(stdout);
+ mergedCmd.append(" ; exit $PIPESTATUS ) 2>&1 | ");
+ mergedCmd.append(tailCommand);
+ mergedCmd.append(" -c ");
+ mergedCmd.append(tailLength);
+ mergedCmd.append(" >> ");
+ mergedCmd.append(stderr);
+ mergedCmd.append(" ; exit $PIPESTATUS");
+ } else {
+ mergedCmd.append(" 1>> ");
+ mergedCmd.append(stdout);
+ mergedCmd.append(" 2>> ");
+ mergedCmd.append(stderr);
+ }
+ result.add(mergedCmd.toString());
+ return result;
+ }
+
+ /**
+ * Add quotes to each of the command strings and return as a single string
+ *
+ * @param cmd The command to be quoted
+ * @param isExecutable makes shell path if the first argument is executable
+ * @return returns The quoted string.
+ * @throws IOException
+ */
+ public static String addCommand(List<String> cmd, boolean isExecutable)
+ throws IOException {
+ StringBuffer command = new StringBuffer();
+ for (String s : cmd) {
+ command.append('\'');
+ if (isExecutable) {
+ // the executable name needs to be expressed as a shell path for the
+ // shell to find it.
+ command.append(FileUtil.makeShellPath(new File(s)));
+ isExecutable = false;
+ } else {
+ command.append(s);
+ }
+ command.append('\'');
+ command.append(" ");
+ }
+ return command.toString();
+ }
+
+ /**
+ * Wrap a command in a shell to capture debug script's stdout and stderr to
+ * debugout.
+ *
+ * @param cmd The command and the arguments that should be run
+ * @param debugoutFilename The filename that stdout and stderr should be saved
+ * to.
+ * @return the modified command that should be run
+ * @throws IOException
+ */
+ public static List<String> captureDebugOut(List<String> cmd,
+ File debugoutFilename) throws IOException {
+ String debugout = FileUtil.makeShellPath(debugoutFilename);
+ List<String> result = new ArrayList<String>(3);
+ result.add(bashCommand);
+ result.add("-c");
+ StringBuffer mergedCmd = new StringBuffer();
+ mergedCmd.append("exec ");
+ boolean isExecutable = true;
+ for (String s : cmd) {
+ if (isExecutable) {
+ // the executable name needs to be expressed as a shell path for the
+ // shell to find it.
+ mergedCmd.append(FileUtil.makeShellPath(new File(s)));
+ isExecutable = false;
+ } else {
+ mergedCmd.append(s);
+ }
+ mergedCmd.append(" ");
+ }
+ mergedCmd.append(" < /dev/null ");
+ mergedCmd.append(" >");
+ mergedCmd.append(debugout);
+ mergedCmd.append(" 2>&1 ");
+ result.add(mergedCmd.toString());
+ return result;
+ }
+
+} // TaskLog
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskLogAppender.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * A simple log4j-appender for the task child's BSP system logs.
+ */
+public class TaskLogAppender extends FileAppender {
+ private String taskId; // taskId should be managed as String rather than
+ // TaskID object
+ // so that log4j can configure it from the configuration(log4j.properties).
+ private int maxEvents;
+ private Queue<LoggingEvent> tail = null;
+
+ @Override
+ public void activateOptions() {
+ synchronized (this) {
+ if (maxEvents > 0) {
+ tail = new LinkedList<LoggingEvent>();
+ }
+ setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId),
+ TaskLog.LogName.SYSLOG).toString());
+ setAppend(true);
+ super.activateOptions();
+ }
+ }
+
+ @Override
+ public void append(LoggingEvent event) {
+ synchronized (this) {
+ if (tail == null) {
+ super.append(event);
+ } else {
+ if (tail.size() >= maxEvents) {
+ tail.remove();
+ }
+ tail.add(event);
+ }
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ if (tail != null) {
+ for (LoggingEvent event : tail) {
+ super.append(event);
+ }
+ }
+ super.close();
+ }
+
+ /**
+ * Getter/Setter methods for log4j.
+ */
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ private static final int EVENT_SIZE = 100;
+
+ public long getTotalLogFileSize() {
+ return maxEvents * EVENT_SIZE;
+ }
+
+ public void setTotalLogFileSize(long logSize) {
+ maxEvents = (int) logSize / EVENT_SIZE;
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.InetSocketAddress;
+import java.util.Vector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.RunJar;
+
+/**
+ * Base class that runs a task in a separate process.
+ */
+public class TaskRunner extends Thread {
+
+ public static final Log LOG = LogFactory.getLog(TaskRunner.class);
+
+ boolean killed = false;
+ private Process process;
+ private Task task;
+ private BSPJob conf;
+ private GroomServer groomServer;
+
+ public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
+ this.task = bspTask;
+ this.conf = conf;
+ this.groomServer = groom;
+ }
+
+ public Task getTask() {
+ return task;
+ }
+
+ /**
+ * Called to assemble this task's input. This method is run in the parent
+ * process before the child is spawned. It should not execute user code, only
+ * system code.
+ */
+ public boolean prepare() throws IOException {
+ return true;
+ }
+
+ public void run() {
+ try {
+ String sep = System.getProperty("path.separator");
+ File workDir = new File(new File(task.getJobFile()).getParent(), "work");
+ boolean isCreated = workDir.mkdirs();
+ if(!isCreated) {
+ LOG.debug("TaskRunner.workDir : " + workDir);
+ }
+
+ StringBuffer classPath = new StringBuffer();
+ // start with same classpath as parent process
+ classPath.append(System.getProperty("java.class.path"));
+ classPath.append(sep);
+
+ String jar = conf.getJar();
+ if (jar != null) { // if jar exists, it into workDir
+ RunJar.unJar(new File(jar), workDir);
+ File[] libs = new File(workDir, "lib").listFiles();
+ if (libs != null) {
+ for (int i = 0; i < libs.length; i++) {
+ classPath.append(sep); // add libs from jar to classpath
+ classPath.append(libs[i]);
+ }
+ }
+ classPath.append(sep);
+ classPath.append(new File(workDir, "classes"));
+ classPath.append(sep);
+ classPath.append(workDir);
+ }
+
+ // Build exec child jmv args.
+ Vector<String> vargs = new Vector<String>();
+ File jvm = // use same jvm as parent
+ new File(new File(System.getProperty("java.home"), "bin"), "java");
+ vargs.add(jvm.toString());
+
+ // bsp.child.java.opts
+ String javaOpts = conf.getConf().get("bsp.child.java.opts", "-Xmx200m");
+ javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
+
+ String[] javaOptsSplit = javaOpts.split(" ");
+ for (int i = 0; i < javaOptsSplit.length; i++) {
+ vargs.add(javaOptsSplit[i]);
+ }
+
+ // Add classpath.
+ vargs.add("-classpath");
+ vargs.add(classPath.toString());
+ // Add main class and its arguments
+ vargs.add(GroomServer.Child.class.getName()); // main of Child
+
+ InetSocketAddress addr = groomServer.getTaskTrackerReportAddress();
+ vargs.add(addr.getHostName());
+ vargs.add(Integer.toString(addr.getPort()));
+ vargs.add(task.getTaskID().toString());
+ vargs.add(groomServer.groomHostName);
+
+ // Run java
+ runChild((String[]) vargs.toArray(new String[0]), workDir);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+
+ /**
+ * Run the child process
+ */
+ private void runChild(String[] args, File dir) throws IOException {
+ this.process = Runtime.getRuntime().exec(args, null, dir);
+ try {
+ new Thread() {
+ public void run() {
+ logStream(process.getErrorStream()); // copy log output
+ }
+ }.start();
+
+ logStream(process.getInputStream()); // normally empty
+
+ int exit_code = process.waitFor();
+ if (!killed && exit_code != 0) {
+ throw new IOException("Task process exit with nonzero status of "
+ + exit_code + ".");
+ }
+
+ } catch (InterruptedException e) {
+ throw new IOException(e.toString());
+ } finally {
+ kill();
+ }
+ }
+
+ /**
+ * Kill the child process
+ */
+ public void kill() {
+ if (process != null) {
+ process.destroy();
+ }
+ killed = true;
+ }
+
+ /**
+ */
+ private void logStream(InputStream output) {
+ try {
+ BufferedReader in = new BufferedReader(new InputStreamReader(output));
+ String line;
+ while ((line = in.readLine()) != null) {
+ LOG.info(task.getTaskID() + " " + line);
+ }
+ } catch (IOException e) {
+ LOG.warn(task.getTaskID() + " Error reading child output", e);
+ } finally {
+ try {
+ output.close();
+ } catch (IOException e) {
+ LOG.warn(task.getTaskID() + " Error closing child output", e);
+ }
+ }
+ }
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskScheduler.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used by a {@link BSPMaster} to schedule {@link Task}s on {@link GroomServer}
+ * s.
+ */
+abstract class TaskScheduler implements Configurable {
+
+ protected Configuration conf;
+ protected GroomServerManager groomServerManager;
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public synchronized void setGroomServerManager(
+ GroomServerManager groomServerManager) {
+ this.groomServerManager = groomServerManager;
+ }
+
+ /**
+ * Lifecycle method to allow the scheduler to start any work in separate
+ * threads.
+ *
+ * @throws IOException
+ */
+ public void start() throws IOException {
+ // do nothing
+ }
+
+ /**
+ * Lifecycle method to allow the scheduler to stop any work it is doing.
+ *
+ * @throws IOException
+ */
+ public void terminate() throws IOException {
+ // do nothing
+ }
+
+ // public abstract void addJob(JobInProgress job);
+
+ /**
+ * Returns a collection of jobs in an order which is specific to the
+ * particular scheduler.
+ *
+ * @param Queue name.
+ * @return JobInProgress corresponded to the specified queue.
+ */
+ public abstract Collection<JobInProgress> getJobs(String queue);
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskStatus.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+/**
+ * Describes the current status of a task. This is not intended to be a
+ * comprehensive piece of data.
+ */
+class TaskStatus implements Writable, Cloneable {
+ static final Log LOG = LogFactory.getLog(TaskStatus.class);
+
+ // enumeration for reporting current phase of a task.
+ public static enum Phase {
+ STARTING, COMPUTE, BARRIER_SYNC, CLEANUP
+ }
+
+ // what state is the task in?
+ public static enum State {
+ RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN
+ }
+
+ private BSPJobID jobId;
+ private TaskAttemptID taskId;
+ private float progress;
+ private volatile State runState;
+ private String stateString;
+ private String groomServer;
+ private long superstepCount;
+
+ private long startTime;
+ private long finishTime;
+
+ private volatile Phase phase = Phase.STARTING;
+
+ /**
+ *
+ */
+ public TaskStatus() {
+ jobId = new BSPJobID();
+ taskId = new TaskAttemptID();
+ this.superstepCount = 0;
+ }
+
+ public TaskStatus(BSPJobID jobId, TaskAttemptID taskId, float progress,
+ State runState, String stateString, String groomServer, Phase phase) {
+ this.jobId = jobId;
+ this.taskId = taskId;
+ this.progress = progress;
+ this.runState = runState;
+ this.stateString = stateString;
+ this.groomServer = groomServer;
+ this.phase = phase;
+ this.superstepCount = 0;
+ }
+
+ // //////////////////////////////////////////////////
+ // Accessors and Modifiers
+ // //////////////////////////////////////////////////
+
+ public BSPJobID getJobId() {
+ return jobId;
+ }
+
+ public TaskAttemptID getTaskId() {
+ return taskId;
+ }
+
+ public float getProgress() {
+ return progress;
+ }
+
+ public void setProgress(float progress) {
+ this.progress = progress;
+ }
+
+ public State getRunState() {
+ return runState;
+ }
+
+ public void setRunState(State state) {
+ this.runState = state;
+ }
+
+ public String getStateString() {
+ return stateString;
+ }
+
+ public void setStateString(String stateString) {
+ this.stateString = stateString;
+ }
+
+ public String getGroomServer() {
+ return groomServer;
+ }
+
+ public void setGroomServer(String groomServer) {
+ this.groomServer = groomServer;
+ }
+
+ public long getFinishTime() {
+ return finishTime;
+ }
+
+ void setFinishTime(long finishTime) {
+ this.finishTime = finishTime;
+ }
+
+ /**
+ * Get start time of the task.
+ *
+ * @return 0 is start time is not set, else returns start time.
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Set startTime of the task.
+ *
+ * @param startTime start time
+ */
+ void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ /**
+ * Get current phase of this task.
+ *
+ * @return .
+ */
+ public Phase getPhase() {
+ return this.phase;
+ }
+
+ /**
+ * Set current phase of this task.
+ *
+ * @param phase phase of this task
+ */
+ void setPhase(Phase phase) {
+ this.phase = phase;
+ }
+
+ /**
+ * Update the status of the task.
+ *
+ * This update is done by ping thread before sending the status.
+ *
+ * @param progress
+ * @param state
+ * @param counters
+ */
+ synchronized void statusUpdate(float progress, String state) {
+ setProgress(progress);
+ setStateString(state);
+ }
+
+ /**
+ * Update the status of the task.
+ *
+ * @param status updated status
+ */
+ synchronized void statusUpdate(TaskStatus status) {
+ this.progress = status.getProgress();
+ this.runState = status.getRunState();
+ this.stateString = status.getStateString();
+
+ if (status.getStartTime() != 0) {
+ this.startTime = status.getStartTime();
+ }
+ if (status.getFinishTime() != 0) {
+ this.finishTime = status.getFinishTime();
+ }
+
+ this.phase = status.getPhase();
+ }
+
+ /**
+ * Update specific fields of task status
+ *
+ * This update is done in BSPMaster when a cleanup attempt of task reports its
+ * status. Then update only specific fields, not all.
+ *
+ * @param runState
+ * @param progress
+ * @param state
+ * @param phase
+ * @param finishTime
+ */
+ synchronized void statusUpdate(State runState, float progress, String state,
+ Phase phase, long finishTime) {
+ setRunState(runState);
+ setProgress(progress);
+ setStateString(state);
+ setPhase(phase);
+ if (finishTime != 0) {
+ this.finishTime = finishTime;
+ }
+ }
+
+ /**
+ * @return The number of BSP super steps executed by the task.
+ */
+ public long getSuperstepCount() {
+ return superstepCount;
+ }
+
+ /**
+ * Increments the number of BSP super steps executed by the task.
+ */
+ public void incrementSuperstepCount() {
+ superstepCount += 1;
+ }
+
+ @Override
+ public Object clone() {
+ try {
+ return super.clone();
+ } catch (CloneNotSupportedException cnse) {
+ // Shouldn't happen since we do implement Clonable
+ throw new InternalError(cnse.toString());
+ }
+ }
+
+ // ////////////////////////////////////////////
+ // Writable
+ // ////////////////////////////////////////////
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ this.jobId.readFields(in);
+ this.taskId.readFields(in);
+ this.progress = in.readFloat();
+ this.runState = WritableUtils.readEnum(in, State.class);
+ this.stateString = Text.readString(in);
+ this.phase = WritableUtils.readEnum(in, Phase.class);
+ this.startTime = in.readLong();
+ this.finishTime = in.readLong();
+ this.superstepCount = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ jobId.write(out);
+ taskId.write(out);
+ out.writeFloat(progress);
+ WritableUtils.writeEnum(out, runState);
+ Text.writeString(out, stateString);
+ WritableUtils.writeEnum(out, phase);
+ out.writeLong(startTime);
+ out.writeLong(finishTime);
+ out.writeLong(superstepCount);
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/package.html?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/package.html (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/package.html Mon Aug 1 14:12:46 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+BSP computing framework.
+</body>
+</html>
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/http/HttpServer.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,501 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.http;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.log.LogLevel;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.mortbay.jetty.Connector;
+import org.mortbay.jetty.Handler;
+import org.mortbay.jetty.Server;
+import org.mortbay.jetty.handler.ContextHandlerCollection;
+import org.mortbay.jetty.nio.SelectChannelConnector;
+import org.mortbay.jetty.security.SslSocketConnector;
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.DefaultServlet;
+import org.mortbay.jetty.servlet.FilterHolder;
+import org.mortbay.jetty.servlet.FilterMapping;
+import org.mortbay.jetty.servlet.ServletHandler;
+import org.mortbay.jetty.servlet.ServletHolder;
+import org.mortbay.jetty.webapp.WebAppContext;
+import org.mortbay.thread.BoundedThreadPool;
+import org.mortbay.util.MultiException;
+
+/**
+ * Create a Jetty embedded server to answer http requests. The primary goal is
+ * to serve up status information for the server. There are three contexts:
+ * "/logs/" -> points to the log directory "/static/" -> points to common static
+ * files (src/main/webapp/static) "/" -> the jsp server code from
+ * (src/main/webapp/<name>)
+ */
+public class HttpServer {
+ public static final Log LOG = LogFactory.getLog(HttpServer.class);
+
+ static final String FILTER_INITIALIZER_PROPERTY = "hama.http.filter.initializers";
+
+ protected final Server webServer;
+ protected final Connector listener;
+ protected final WebAppContext webAppContext;
+ protected final boolean findPort;
+ protected final Map<Context, Boolean> defaultContexts = new HashMap<Context, Boolean>();
+ protected final List<String> filterNames = new ArrayList<String>();
+ private static final int MAX_RETRIES = 10;
+
+ /** Same as this(name, bindAddress, port, findPort, null); */
+ public HttpServer(String name, String bindAddress, int port, boolean findPort)
+ throws IOException {
+ this(name, bindAddress, port, findPort, new Configuration());
+ }
+
+ /**
+ * Create a status server on the given port. The jsp scripts are taken from
+ * src/main/webapp/<name>.
+ *
+ * @param name The name of the server
+ * @param port The port to use on the server
+ * @param findPort whether the server should start at the given port and
+ * increment by 1 until it finds a free port.
+ * @param conf Configuration
+ */
+ public HttpServer(String name, String bindAddress, int port,
+ boolean findPort, Configuration conf) throws IOException {
+ webServer = new Server();
+ this.findPort = findPort;
+
+ listener = createBaseListener(conf);
+ listener.setHost(bindAddress);
+ listener.setPort(port);
+ webServer.addConnector(listener);
+
+ webServer.setThreadPool(new BoundedThreadPool());
+
+ final String appDir = getWebAppsPath();
+ ContextHandlerCollection contexts = new ContextHandlerCollection();
+ webServer.setHandler(contexts);
+ webAppContext = new WebAppContext();
+
+ System.setProperty("java.naming.factory.initial",
+ "org.mortbay.naming.InitialContextFactory");
+ System.setProperty("java.naming.factory.url.pkgs", "org.mortbay.naming");
+
+ webAppContext.setContextPath("/");
+ webAppContext.setWar(appDir + "/" + name);
+ webServer.addHandler(webAppContext);
+
+ addDefaultApps(contexts, appDir);
+
+ addDefaultServlets();
+ }
+
+ /**
+ * Create a required listener for the Jetty instance listening on the port
+ * provided. This wrapper and all subclasses must create at least one
+ * listener.
+ */
+ protected Connector createBaseListener(Configuration conf) throws IOException {
+ SelectChannelConnector ret = new SelectChannelConnector();
+ ret.setLowResourceMaxIdleTime(10000);
+ ret.setAcceptQueueSize(128);
+ ret.setResolveNames(false);
+ ret.setUseDirectBuffers(false);
+ return ret;
+ }
+
+ /**
+ * Add default apps.
+ *
+ * @param appDir The application directory
+ * @throws IOException
+ */
+ protected void addDefaultApps(ContextHandlerCollection parent,
+ final String appDir) throws IOException {
+ // set up the context for "/logs/" if "hama.log.dir" property is defined.
+ String logDir = System.getProperty("hama.log.dir");
+ if (logDir != null) {
+ Context logContext = new Context(parent, "/logs");
+ logContext.setResourceBase(logDir);
+ logContext.addServlet(DefaultServlet.class, "/");
+ defaultContexts.put(logContext, true);
+ }
+ // set up the context for "/static/*"
+ Context staticContext = new Context(parent, "/static");
+ staticContext.setResourceBase(appDir + "/static");
+ staticContext.addServlet(DefaultServlet.class, "/*");
+ defaultContexts.put(staticContext, true);
+ }
+
+ /**
+ * Add default servlets.
+ */
+ protected void addDefaultServlets() {
+ // set up default servlets
+ addServlet("stacks", "/stacks", StackServlet.class);
+ addServlet("logLevel", "/logLevel", LogLevel.Servlet.class);
+ }
+
+ public void addContext(Context ctxt, boolean isFiltered) throws IOException {
+ webServer.addHandler(ctxt);
+ defaultContexts.put(ctxt, isFiltered);
+ }
+
+ /**
+ * Add a context
+ *
+ * @param pathSpec The path spec for the context
+ * @param dir The directory containing the context
+ * @param isFiltered if true, the servlet is added to the filter path mapping
+ * @throws IOException
+ */
+ protected void addContext(String pathSpec, String dir, boolean isFiltered)
+ throws IOException {
+ if (0 == webServer.getHandlers().length) {
+ throw new RuntimeException("Couldn't find handler");
+ }
+ WebAppContext webAppCtx = new WebAppContext();
+ webAppCtx.setContextPath(pathSpec);
+ webAppCtx.setWar(dir);
+ addContext(webAppCtx, true);
+ }
+
+ /**
+ * Set a value in the webapp context. These values are available to the jsp
+ * pages as "application.getAttribute(name)".
+ *
+ * @param name The name of the attribute
+ * @param value The value of the attribute
+ */
+ public void setAttribute(String name, Object value) {
+ webAppContext.setAttribute(name, value);
+ }
+
+ /**
+ * Add a servlet in the server.
+ *
+ * @param name The name of the servlet (can be passed as null)
+ * @param pathSpec The path spec for the servlet
+ * @param clazz The servlet class
+ */
+ public void addServlet(String name, String pathSpec,
+ Class<? extends HttpServlet> clazz) {
+ addInternalServlet(name, pathSpec, clazz);
+ addFilterPathMapping(pathSpec, webAppContext);
+ }
+
+ /**
+ * Add an internal servlet in the server.
+ *
+ * @param name The name of the servlet (can be passed as null)
+ * @param pathSpec The path spec for the servlet
+ * @param clazz The servlet class
+ * @deprecated this is a temporary method
+ */
+ @Deprecated
+ public void addInternalServlet(String name, String pathSpec,
+ Class<? extends HttpServlet> clazz) {
+ ServletHolder holder = new ServletHolder(clazz);
+ if (name != null) {
+ holder.setName(name);
+ }
+ webAppContext.addServlet(holder, pathSpec);
+ }
+
+ /**
+ * Define a filter for a context and set up default url mappings.
+ */
+ protected void defineFilter(Context ctx, String name, String classname,
+ Map<String, String> parameters, String[] urls) {
+
+ FilterHolder holder = new FilterHolder();
+ holder.setName(name);
+ holder.setClassName(classname);
+ holder.setInitParameters(parameters);
+ FilterMapping fmap = new FilterMapping();
+ fmap.setPathSpecs(urls);
+ fmap.setDispatches(Handler.ALL);
+ fmap.setFilterName(name);
+ ServletHandler handler = ctx.getServletHandler();
+ handler.addFilter(holder, fmap);
+ }
+
+ /**
+ * Add the path spec to the filter path mapping.
+ *
+ * @param pathSpec The path spec
+ * @param webAppCtx The WebApplicationContext to add to
+ */
+ protected void addFilterPathMapping(String pathSpec, Context webAppCtx) {
+ ServletHandler handler = webAppCtx.getServletHandler();
+ for (String name : filterNames) {
+ FilterMapping fmap = new FilterMapping();
+ fmap.setPathSpec(pathSpec);
+ fmap.setFilterName(name);
+ fmap.setDispatches(Handler.ALL);
+ handler.addFilterMapping(fmap);
+ }
+ }
+
+ /**
+ * Get the value in the webapp context.
+ *
+ * @param name The name of the attribute
+ * @return The value of the attribute
+ */
+ public Object getAttribute(String name) {
+ return webAppContext.getAttribute(name);
+ }
+
+ /**
+ * Get the pathname to the webapps files.
+ *
+ * @return the pathname as a URL
+ * @throws IOException if 'webapps' directory cannot be found on CLASSPATH.
+ */
+ protected String getWebAppsPath() throws IOException {
+ // URL url = BSPMaster.class.getClassLoader().getResource("webapps");
+ // if (url == null)
+ // throw new IOException("webapps not found in CLASSPATH");
+ // return url.toString();
+ return "src/main/webapp";
+ }
+
+ /**
+ * Get the port that the server is on
+ *
+ * @return the port
+ */
+ public int getPort() {
+ return webServer.getConnectors()[0].getLocalPort();
+ }
+
+ /**
+ * Set the min, max number of worker threads (simultaneous connections).
+ */
+ public void setThreads(int min, int max) {
+ BoundedThreadPool pool = (BoundedThreadPool) webServer.getThreadPool();
+ pool.setMinThreads(min);
+ pool.setMaxThreads(max);
+ }
+
+ /**
+ * Configure an ssl listener on the server.
+ *
+ * @param addr address to listen on
+ * @param keystore location of the keystore
+ * @param storPass password for the keystore
+ * @param keyPass password for the key
+ * @deprecated Use
+ * {@link #addSslListener(InetSocketAddress, Configuration, boolean)}
+ */
+ @Deprecated
+ public void addSslListener(InetSocketAddress addr, String keystore,
+ String storPass, String keyPass) throws IOException {
+ if (webServer.isStarted()) {
+ throw new IOException("Failed to add ssl listener");
+ }
+ SslSocketConnector sslListener = new SslSocketConnector();
+ sslListener.setHost(addr.getHostName());
+ sslListener.setPort(addr.getPort());
+ sslListener.setKeystore(keystore);
+ sslListener.setPassword(storPass);
+ sslListener.setKeyPassword(keyPass);
+ webServer.addConnector(sslListener);
+ }
+
+ /**
+ * Configure an ssl listener on the server.
+ *
+ * @param addr address to listen on
+ * @param sslConf conf to retrieve ssl options
+ * @param needClientAuth whether client authentication is required
+ */
+ public void addSslListener(InetSocketAddress addr, Configuration sslConf,
+ boolean needClientAuth) throws IOException {
+ if (webServer.isStarted()) {
+ throw new IOException("Failed to add ssl listener");
+ }
+ if (needClientAuth) {
+ // setting up SSL truststore for authenticating clients
+ System.setProperty("javax.net.ssl.trustStore", sslConf.get(
+ "ssl.server.truststore.location", ""));
+ System.setProperty("javax.net.ssl.trustStorePassword", sslConf.get(
+ "ssl.server.truststore.password", ""));
+ System.setProperty("javax.net.ssl.trustStoreType", sslConf.get(
+ "ssl.server.truststore.type", "jks"));
+ }
+ SslSocketConnector sslListener = new SslSocketConnector();
+ sslListener.setHost(addr.getHostName());
+ sslListener.setPort(addr.getPort());
+ sslListener.setKeystore(sslConf.get("ssl.server.keystore.location"));
+ sslListener.setPassword(sslConf.get("ssl.server.keystore.password", ""));
+ sslListener.setKeyPassword(sslConf.get("ssl.server.keystore.keypassword",
+ ""));
+ sslListener.setKeystoreType(sslConf.get("ssl.server.keystore.type", "jks"));
+ sslListener.setNeedClientAuth(needClientAuth);
+ webServer.addConnector(sslListener);
+ }
+
+ /**
+ * Start the server. Does not wait for the server to start.
+ */
+ public void start() throws IOException {
+ try {
+ int port = 0;
+ int oriPort = listener.getPort(); // The original requested port
+ while (true) {
+ try {
+ port = webServer.getConnectors()[0].getLocalPort();
+ LOG.info("Port returned by webServer.getConnectors()[0]."
+ + "getLocalPort() before open() is " + port
+ + ". Opening the listener on " + oriPort);
+ listener.open();
+ port = listener.getLocalPort();
+ LOG.info("listener.getLocalPort() returned "
+ + listener.getLocalPort()
+ + " webServer.getConnectors()[0].getLocalPort() returned "
+ + webServer.getConnectors()[0].getLocalPort());
+ // Workaround to handle the problem reported in HADOOP-4744
+ if (port < 0) {
+ Thread.sleep(100);
+ int numRetries = 1;
+ while (port < 0) {
+ LOG.warn("listener.getLocalPort returned " + port);
+ if (numRetries++ > MAX_RETRIES) {
+ throw new Exception(" listener.getLocalPort is returning "
+ + "less than 0 even after " + numRetries + " resets");
+ }
+ for (int i = 0; i < 2; i++) {
+ LOG.info("Retrying listener.getLocalPort()");
+ port = listener.getLocalPort();
+ if (port > 0) {
+ break;
+ }
+ Thread.sleep(200);
+ }
+ if (port > 0) {
+ break;
+ }
+ LOG.info("Bouncing the listener");
+ listener.close();
+ Thread.sleep(1000);
+ listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+ listener.open();
+ Thread.sleep(100);
+ port = listener.getLocalPort();
+ }
+ } // Workaround end
+ LOG.info("Jetty bound to port " + port);
+ webServer.start();
+ // Workaround for HADOOP-6386
+ port = listener.getLocalPort();
+ if (port < 0) {
+ LOG.warn("Bounds port is " + port + " after webserver start");
+ for (int i = 0; i < MAX_RETRIES / 2; i++) {
+ try {
+ webServer.stop();
+ } catch (Exception e) {
+ LOG.warn("Can't stop web-server", e);
+ }
+ Thread.sleep(1000);
+
+ listener.setPort(oriPort == 0 ? 0 : (oriPort += 1));
+ listener.open();
+ Thread.sleep(100);
+ webServer.start();
+ LOG.info(i + "attempts to restart webserver");
+ port = listener.getLocalPort();
+ if (port > 0)
+ break;
+ }
+ if (port < 0)
+ throw new Exception("listener.getLocalPort() is returning "
+ + "less than 0 even after " + MAX_RETRIES + " resets");
+ }
+ // End of HADOOP-6386 workaround
+ break;
+ } catch (IOException ex) {
+ // if this is a bind exception,
+ // then try the next port number.
+ if (ex instanceof BindException) {
+ if (!findPort) {
+ throw (BindException) ex;
+ }
+ } else {
+ LOG.info("HttpServer.start() threw a non Bind IOException");
+ throw ex;
+ }
+ } catch (MultiException ex) {
+ LOG.info("HttpServer.start() threw a MultiException");
+ throw ex;
+ }
+ listener.setPort((oriPort += 1));
+ }
+ } catch (IOException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new IOException("Problem starting http server", e);
+ }
+ }
+
+ /**
+ * stop the server
+ */
+ public void stop() throws Exception {
+ listener.close();
+ webServer.stop();
+ }
+
+ public void join() throws InterruptedException {
+ webServer.join();
+ }
+
+ /**
+ * A very simple servlet to serve up a text representation of the current
+ * stack traces. It both returns the stacks to the caller and logs them.
+ * Currently the stack traces are done sequentially rather than exactly the
+ * same data.
+ */
+ public static class StackServlet extends HttpServlet {
+ private static final long serialVersionUID = -6284183679759467039L;
+
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+
+ PrintWriter out = new PrintWriter(response.getOutputStream());
+ ReflectionUtils.printThreadInfo(out, "");
+ out.close();
+ ReflectionUtils.logThreadInfo(LOG, "jsp requested", 1);
+ }
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/http/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/http/package.html?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/http/package.html (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/http/package.html Mon Aug 1 14:12:46 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+Contains the administrative web interfaces.
+</body>
+</html>
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/BSPPeerProtocol.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hama.Constants;
+import org.apache.hama.bsp.PeerNames;
+import org.apache.hama.bsp.Task;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Protocol that task child process uses to contact its parent process.
+ */
+public interface BSPPeerProtocol extends HamaRPCProtocolVersion, Closeable,
+ Constants {
+
+ /** Called when a child task process starts, to get its task. */
+ Task getTask(TaskAttemptID taskid) throws IOException;
+
+ /**
+ * Periodically called by child to check if parent is still alive.
+ *
+ * @return True if the task is known
+ */
+ boolean ping(TaskAttemptID taskid) throws IOException;
+
+ /**
+ * Report that the task is successfully completed. Failure is assumed if the
+ * task process exits without calling this.
+ *
+ * @param taskid task's id
+ * @param shouldBePromoted whether to promote the task's output or not
+ */
+ void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException;
+
+ /** Report that the task encounted a local filesystem error. */
+ void fsError(TaskAttemptID taskId, String message) throws IOException;
+
+ void incrementSuperstepCount(TaskAttemptID taskid) throws IOException;
+
+ /**
+ * @return the all BSPPeer names.
+ */
+ PeerNames getAllPeerNames();
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/GroomProtocol.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.Directive;
+
+/**
+ * A protocol for BSPMaster talks to GroomServer.
+ */
+public interface GroomProtocol extends HamaRPCProtocolVersion {
+
+ /**
+ * Instruct GroomServer performaning tasks.
+ *
+ * @param directive instructs a GroomServer performing necessary
+ * execution.
+ * @throws IOException
+ */
+ void dispatch(Directive directive) throws IOException;
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/HamaRPCProtocolVersion.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+
+/**
+ * There is one version id for all the RPC interfaces. If any interface is
+ * changed, the versionID must be changed here.
+ */
+public interface HamaRPCProtocolVersion extends VersionedProtocol {
+ public static final long versionID = 1L;
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/JobSubmissionProtocol.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.BSPJobID;
+import org.apache.hama.bsp.JobProfile;
+import org.apache.hama.bsp.JobStatus;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Protocol that a groom server and the central BSP Master use to communicate.
+ * This interface will contains several methods: submitJob, killJob, and
+ * killTask.
+ */
+public interface JobSubmissionProtocol extends HamaRPCProtocolVersion {
+
+ /**
+ * Allocate a new id for the job.
+ *
+ * @return job id
+ * @throws IOException
+ */
+ public BSPJobID getNewJobId() throws IOException;
+
+ /**
+ * Submit a Job for execution. Returns the latest profile for that job. The
+ * job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
+ *
+ * @param jobID
+ * @param jobFile
+ * @return jobStatus
+ * @throws IOException
+ */
+ // public JobStatus submitJob(BSPJobID jobName) throws IOException;
+
+ public JobStatus submitJob(BSPJobID jobID, String jobFile) throws IOException;
+
+ /**
+ * Get the current status of the cluster
+ *
+ * @param detailed if true then report groom names as well
+ * @return summary of the state of the cluster
+ */
+ public ClusterStatus getClusterStatus(boolean detailed) throws IOException;
+
+ /**
+ * Grab a handle to a job that is already known to the BSPMaster.
+ *
+ * @return Profile of the job, or null if not found.
+ */
+ public JobProfile getJobProfile(BSPJobID jobid) throws IOException;
+
+ /**
+ * Grab a handle to a job that is already known to the BSPMaster.
+ *
+ * @return Status of the job, or null if not found.
+ */
+ public JobStatus getJobStatus(BSPJobID jobid) throws IOException;
+
+ /**
+ * A BSP system always operates on a single filesystem. This function returns
+ * the fs name. ('local' if the localfs; 'addr:port' if dfs). The client can
+ * then copy files into the right locations prior to submitting the job.
+ */
+ public String getFilesystemName() throws IOException;
+
+ /**
+ * Get the jobs that are not completed and not failed
+ *
+ * @return array of JobStatus for the running/to-be-run jobs.
+ */
+ public JobStatus[] jobsToComplete() throws IOException;
+
+ /**
+ * Get all the jobs submitted.
+ *
+ * @return array of JobStatus for the submitted jobs
+ */
+ public JobStatus[] getAllJobs() throws IOException;
+
+ /**
+ * Grab the bspmaster system directory path where job-specific files are to be
+ * placed.
+ *
+ * @return the system directory where job-specific files are to be placed.
+ */
+ public String getSystemDir();
+
+ /**
+ * Kill the indicated job
+ */
+ public void killJob(BSPJobID jobid) throws IOException;
+
+ /**
+ * Kill indicated task attempt.
+ *
+ * @param taskId the id of the task to kill.
+ * @param shouldFail if true the task is failed and added to failed tasks
+ * list, otherwise it is just killed, w/o affecting job failure
+ * status.
+ */
+ public boolean killTask(TaskAttemptID taskId, boolean shouldFail)
+ throws IOException;
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/MasterProtocol.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.ipc;
+
+import java.io.IOException;
+
+import org.apache.hama.bsp.GroomServerStatus;
+import org.apache.hama.bsp.Directive;
+
+/**
+ * A new protocol for GroomServers communicate with BSPMaster. This
+ * protocol paired with WorkerProtocl, let GroomServers enrol with
+ * BSPMaster, so that BSPMaster can dispatch tasks to GroomServers.
+ */
+public interface MasterProtocol extends HamaRPCProtocolVersion {
+
+ /**
+ * A GroomServer register with its status to BSPMaster, which will update
+ * GroomServers cache.
+ *
+ * @param status to be updated in cache.
+ * @return true if successfully register with BSPMaster; false if fail.
+ */
+ boolean register(GroomServerStatus status) throws IOException;
+
+ /**
+ * A GroomServer (periodically) reports task statuses back to the BSPMaster.
+ * @param directive
+ */
+ boolean report(Directive directive) throws IOException;
+
+ public String getSystemDir();
+
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/package.html?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/package.html (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/ipc/package.html Mon Aug 1 14:12:46 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+Tools to help define network clients and servers.
+</body>
+</html>
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/package.html
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/package.html?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/package.html (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/package.html Mon Aug 1 14:12:46 2011
@@ -0,0 +1,23 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+Hama base package.
+</body>
+</html>
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java?rev=1152788&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPServletUtil.java Mon Aug 1 14:12:46 2011
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.Calendar;
+
+import org.apache.hadoop.util.ServletUtil;
+import org.apache.hama.bsp.JobStatus;
+
+public class BSPServletUtil extends ServletUtil {
+
+ public static final String HTML_TAIL = "<hr />\n"
+ + "<a href='http://incubator.apache.org/hama/'>Hama</a>, "
+ + Calendar.getInstance().get(Calendar.YEAR) + ".\n" + "</body></html>";
+
+ /**
+ * HTML footer to be added in the jsps.
+ *
+ * @return the HTML footer.
+ */
+ public static String htmlFooter() {
+ return HTML_TAIL;
+ }
+
+ /**
+ * Method used to generate the Job table for Job pages.
+ *
+ * @param label display heading to be used in the job table.
+ * @param jobs vector of jobs to be displayed in table.
+ * @param refresh refresh interval to be used in jobdetails page.
+ * @param rowId beginning row id to be used in the table.
+ * @return generated HTML
+ * @throws IOException
+ */
+ public static String generateJobTable(String label, JobStatus[] jobs,
+ int refresh, int rowId) throws IOException {
+
+ StringBuffer sb = new StringBuffer();
+
+ if (jobs.length > 0) {
+ sb.append("<table border=\"1\" cellpadding=\"5\" cellspacing=\"0\">\n");
+ sb.append("<tr><th>Jobid</th>" + "<th>User</th>" + "<th>Name</th>"
+ + "<th>SuperStep</th>" + "<th>Starttime</th>" + "</tr>\n");
+ for (JobStatus status : jobs) {
+ sb.append("<tr><td><a href=\"bspjob.jsp?jobid="+status.getJobID()+ "\">");
+ sb.append(status.getJobID());
+ sb.append("</a></td><td>");
+ sb.append(status.getUsername());
+ sb.append("</td><td>");
+ sb.append(status.getName());
+ sb.append("</td><td>");
+ sb.append(status.getSuperstepCount());
+ sb.append("</td><td>");
+ sb.append(new Date(status.getStartTime()));
+ sb.append("</td></tr>\n");
+ }
+ sb.append("</table>");
+ } else {
+ sb.append("No jobs found!");
+ }
+
+ return sb.toString();
+ }
+
+}