You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/10/05 05:38:57 UTC
svn commit: r1179045 -
/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
Author: edwardyoon
Date: Wed Oct 5 03:38:57 2011
New Revision: 1179045
URL: http://svn.apache.org/viewvc?rev=1179045&view=rev
Log:
Re-formatting
Modified:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1179045&r1=1179044&r2=1179045&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Wed Oct 5 03:38:57 2011
@@ -46,290 +46,283 @@ import org.apache.hama.checkpoint.Checkp
*/
public class TaskRunner extends Thread {
- public static final Log LOG = LogFactory.getLog(TaskRunner.class);
- private static final String SYSTEM_PATH_SEPARATOR = System
- .getProperty("path.separator");
-
- private enum LogType {
- STDOUT, ERROR
- }
-
- boolean bspKilled = false;
- private Process bspProcess;
-
- private final Task task;
- private final BSPJob conf;
- private final GroomServer groomServer;
-
- private File logDir;
-
- class BspChildRunner implements Callable<Object> {
- private final List<String> commands;
- private final File workDir;
- private final ScheduledExecutorService sched;
- private final AtomicReference<ScheduledFuture<Object>> future;
-
- BspChildRunner(List<String> commands, File workDir) {
- this.commands = commands;
- this.workDir = workDir;
- this.sched = Executors.newScheduledThreadPool(1);
- this.future = new AtomicReference<ScheduledFuture<Object>>();
- }
-
- void start() {
- this.future.set(this.sched.schedule(this, 0, SECONDS));
- LOG.info("Start building BSPPeer process.");
- }
-
- void stop() {
- killBsp();
- this.sched.schedule(this, 0, SECONDS);
- LOG.info("Stop BSPPeer process.");
- }
-
- void join() throws InterruptedException, ExecutionException {
- this.future.get().get();
- }
-
- public Object call() throws Exception {
- ProcessBuilder builder = new ProcessBuilder(commands);
- builder.directory(workDir);
- try {
- bspProcess = builder.start();
- new Thread() {
- public void run() {
- logStream(bspProcess.getErrorStream(), LogType.ERROR);
- }
- }.start();
-
- new Thread() {
- public void run() {
- logStream(bspProcess.getInputStream(), LogType.STDOUT);
- }
- }.start();
-
- int exit_code = bspProcess.waitFor();
- if (!bspKilled && exit_code != 0) {
- throw new IOException(
- "BSP task process exit with nonzero status of "
- + exit_code + ".");
- }
- } catch (InterruptedException e) {
- LOG.warn(
- "Thread is interrupted when execeuting Checkpointer process.",
- e);
- } catch (IOException ioe) {
- LOG.error("Error when executing BSPPeer process.", ioe);
- } finally {
- killBsp();
- }
- return null;
- }
- }
-
- public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
- this.task = bspTask;
- this.conf = conf;
- this.groomServer = groom;
- }
-
- public Task getTask() {
- return task;
- }
-
- /**
- * Called to assemble this task's input. This method is run in the parent
- * process before the child is spawned. It should not execute user code,
- * only system code.
- */
- public boolean prepare() throws IOException {
- return true;
- }
-
- private File createWorkDirectory() {
- File workDir = new File(new File(task.getJobFile()).getParent(), "work");
- boolean isCreated = workDir.mkdirs();
- if (isCreated) {
- LOG.debug("TaskRunner.workDir : " + workDir);
- }
- return workDir;
- }
-
- private String assembleClasspath(BSPJob jobConf, File workDir) {
- StringBuffer classPath = new StringBuffer();
- // start with same classpath as parent process
- classPath.append(System.getProperty("java.class.path"));
- classPath.append(SYSTEM_PATH_SEPARATOR);
-
- String jar = jobConf.getJar();
- if (jar != null) { // if jar exists, it into workDir
- try {
- RunJar.unJar(new File(jar), workDir);
- } catch (IOException ioe) {
- LOG.error(
- "Unable to uncompressing file to " + workDir.toString(),
- ioe);
- }
- File[] libs = new File(workDir, "lib").listFiles();
- if (libs != null) {
- for (int i = 0; i < libs.length; i++) {
- // add libs from jar to classpath
- classPath.append(SYSTEM_PATH_SEPARATOR);
- classPath.append(libs[i]);
- }
- }
- classPath.append(SYSTEM_PATH_SEPARATOR);
- classPath.append(new File(workDir, "classes"));
- classPath.append(SYSTEM_PATH_SEPARATOR);
- classPath.append(workDir);
- }
- return classPath.toString();
- }
-
- private List<String> buildJvmArgs(BSPJob jobConf, String classPath,
- Class<?> child) {
- // Build exec child jmv args.
- List<String> vargs = new ArrayList<String>();
- File jvm = // use same jvm as parent
- new File(new File(System.getProperty("java.home"), "bin"), "java");
- vargs.add(jvm.toString());
-
- // bsp.child.java.opts
- String javaOpts = jobConf.getConf().get("bsp.child.java.opts",
- "-Xmx200m");
- javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
-
- String[] javaOptsSplit = javaOpts.split(" ");
- for (int i = 0; i < javaOptsSplit.length; i++) {
- vargs.add(javaOptsSplit[i]);
- }
-
- // Add classpath.
- vargs.add("-classpath");
- vargs.add(classPath.toString());
- // Add main class and its arguments
- LOG.debug("Executing child Process " + child.getName());
- vargs.add(child.getName()); // main of bsp or checkpointer Child
-
- if (GroomServer.BSPPeerChild.class.equals(child)) {
- InetSocketAddress addr = groomServer.getTaskTrackerReportAddress();
- vargs.add(addr.getHostName());
- vargs.add(Integer.toString(addr.getPort()));
- vargs.add(task.getTaskID().toString());
- vargs.add(groomServer.groomHostName);
- }
-
- if (jobConf.getConf().getBoolean("bsp.checkpoint.enabled", false)) {
- String ckptPort = jobConf.getConf().get("bsp.checkpoint.port",
- CheckpointRunner.DEFAULT_PORT);
- LOG.debug("Checkpointer's port:" + ckptPort);
- vargs.add(ckptPort);
- }
-
- return vargs;
- }
-
- /**
- * Build working environment and launch BSPPeer and Checkpointer processes.
- * And transmit data from BSPPeer's inputstream to Checkpointer's
- * OutputStream.
- */
- public void run() {
- File workDir = createWorkDirectory();
- logDir = createLogDirectory();
- String classPath = assembleClasspath(conf, workDir);
- LOG.debug("Spawned child's classpath " + classPath);
- List<String> bspArgs = buildJvmArgs(conf, classPath,
- GroomServer.BSPPeerChild.class);
-
- BspChildRunner bspPeer = new BspChildRunner(bspArgs, workDir);
- bspPeer.start();
- try {
- bspPeer.join();
- } catch (InterruptedException ie) {
- LOG.error("BSPPeer child process is interrupted.", ie);
- } catch (ExecutionException ee) {
- LOG.error("Failure occurs when retrieving tasks result.", ee);
- }
- LOG.info("Finishes executing BSPPeer child process.");
- }
-
- /**
- * Creates the tasks log directory if needed.
- *
- * @return the top directory of the tasks logging area.
- */
- private File createLogDirectory() {
- // our log dir looks following: log/tasklogs/job_id/
- File f = new File(System.getProperty("hama.log.dir") + File.separator
- + "tasklogs" + File.separator + task.jobId.id);
- // TODO if we have attemps: + File.separator+ task.getTaskID());
-
- if (!f.exists()) {
- f.mkdirs();
- }
-
- return f;
- }
-
- /**
- * Kill bsppeer child process.
- */
- public void killBsp() {
- if (bspProcess != null) {
- bspProcess.destroy();
- }
- bspKilled = true;
- }
-
- /**
- * Log process's stream.
- *
- * @param input
- * stream to be logged.
- * @param stdout
- * type of the log
- */
- private void logStream(InputStream input, LogType type) {
- // STDOUT file can be found under LOG_DIR/task_attempt_id.log
- // ERROR file can be found under LOG_DIR/task_attempt_id.err
- File taskLogFile = new File(logDir, task.getTaskAttemptId()
- + getFileEndingForType(type));
- BufferedWriter writer = null;
- try {
- writer = new BufferedWriter(new FileWriter(taskLogFile));
- BufferedReader in = new BufferedReader(new InputStreamReader(input));
- String line;
- while ((line = in.readLine()) != null) {
- writer.write(line);
- writer.newLine();
- }
- } catch (IOException e) {
- LOG.warn(task.getTaskID() + " Error reading child output", e);
- } finally {
- try {
- input.close();
- } catch (IOException e) {
- LOG.warn(task.getTaskID() + " Error closing child output", e);
- }
- try {
- writer.close();
- } catch (IOException e) {
- LOG.warn(task.getTaskID() + " Error closing log file", e);
- }
- }
- }
-
- /**
- * Returns the ending of the logfile for each LogType. e.G. ".log".
- *
- * @param type
- * @return an ending, including a dot.
- */
- private String getFileEndingForType(LogType type) {
- if (type != LogType.ERROR)
- return ".err";
- else
- return ".log";
- }
+ public static final Log LOG = LogFactory.getLog(TaskRunner.class);
+ private static final String SYSTEM_PATH_SEPARATOR = System
+ .getProperty("path.separator");
+
+ private enum LogType {
+ STDOUT, ERROR
+ }
+
+ boolean bspKilled = false;
+ private Process bspProcess;
+
+ private final Task task;
+ private final BSPJob conf;
+ private final GroomServer groomServer;
+
+ private File logDir;
+
+ class BspChildRunner implements Callable<Object> {
+ private final List<String> commands;
+ private final File workDir;
+ private final ScheduledExecutorService sched;
+ private final AtomicReference<ScheduledFuture<Object>> future;
+
+ BspChildRunner(List<String> commands, File workDir) {
+ this.commands = commands;
+ this.workDir = workDir;
+ this.sched = Executors.newScheduledThreadPool(1);
+ this.future = new AtomicReference<ScheduledFuture<Object>>();
+ }
+
+ void start() {
+ this.future.set(this.sched.schedule(this, 0, SECONDS));
+ LOG.info("Start building BSPPeer process.");
+ }
+
+ void stop() {
+ killBsp();
+ this.sched.schedule(this, 0, SECONDS);
+ LOG.info("Stop BSPPeer process.");
+ }
+
+ void join() throws InterruptedException, ExecutionException {
+ this.future.get().get();
+ }
+
+ public Object call() throws Exception {
+ ProcessBuilder builder = new ProcessBuilder(commands);
+ builder.directory(workDir);
+ try {
+ bspProcess = builder.start();
+ new Thread() {
+ public void run() {
+ logStream(bspProcess.getErrorStream(), LogType.ERROR);
+ }
+ }.start();
+
+ new Thread() {
+ public void run() {
+ logStream(bspProcess.getInputStream(), LogType.STDOUT);
+ }
+ }.start();
+
+ int exit_code = bspProcess.waitFor();
+ if (!bspKilled && exit_code != 0) {
+ throw new IOException("BSP task process exit with nonzero status of "
+ + exit_code + ".");
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Thread is interrupted when execeuting Checkpointer process.",
+ e);
+ } catch (IOException ioe) {
+ LOG.error("Error when executing BSPPeer process.", ioe);
+ } finally {
+ killBsp();
+ }
+ return null;
+ }
+ }
+
+ public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
+ this.task = bspTask;
+ this.conf = conf;
+ this.groomServer = groom;
+ }
+
+ public Task getTask() {
+ return task;
+ }
+
+ /**
+ * Called to assemble this task's input. This method is run in the parent
+ * process before the child is spawned. It should not execute user code, only
+ * system code.
+ */
+ public boolean prepare() throws IOException {
+ return true;
+ }
+
+ private File createWorkDirectory() {
+ File workDir = new File(new File(task.getJobFile()).getParent(), "work");
+ boolean isCreated = workDir.mkdirs();
+ if (isCreated) {
+ LOG.debug("TaskRunner.workDir : " + workDir);
+ }
+ return workDir;
+ }
+
+ private String assembleClasspath(BSPJob jobConf, File workDir) {
+ StringBuffer classPath = new StringBuffer();
+ // start with same classpath as parent process
+ classPath.append(System.getProperty("java.class.path"));
+ classPath.append(SYSTEM_PATH_SEPARATOR);
+
+ String jar = jobConf.getJar();
+ if (jar != null) { // if jar exists, it into workDir
+ try {
+ RunJar.unJar(new File(jar), workDir);
+ } catch (IOException ioe) {
+ LOG.error("Unable to uncompressing file to " + workDir.toString(), ioe);
+ }
+ File[] libs = new File(workDir, "lib").listFiles();
+ if (libs != null) {
+ for (int i = 0; i < libs.length; i++) {
+ // add libs from jar to classpath
+ classPath.append(SYSTEM_PATH_SEPARATOR);
+ classPath.append(libs[i]);
+ }
+ }
+ classPath.append(SYSTEM_PATH_SEPARATOR);
+ classPath.append(new File(workDir, "classes"));
+ classPath.append(SYSTEM_PATH_SEPARATOR);
+ classPath.append(workDir);
+ }
+ return classPath.toString();
+ }
+
+ private List<String> buildJvmArgs(BSPJob jobConf, String classPath,
+ Class<?> child) {
+ // Build exec child jmv args.
+ List<String> vargs = new ArrayList<String>();
+ File jvm = // use same jvm as parent
+ new File(new File(System.getProperty("java.home"), "bin"), "java");
+ vargs.add(jvm.toString());
+
+ // bsp.child.java.opts
+ String javaOpts = jobConf.getConf().get("bsp.child.java.opts", "-Xmx200m");
+ javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
+
+ String[] javaOptsSplit = javaOpts.split(" ");
+ for (int i = 0; i < javaOptsSplit.length; i++) {
+ vargs.add(javaOptsSplit[i]);
+ }
+
+ // Add classpath.
+ vargs.add("-classpath");
+ vargs.add(classPath.toString());
+ // Add main class and its arguments
+ LOG.debug("Executing child Process " + child.getName());
+ vargs.add(child.getName()); // main of bsp or checkpointer Child
+
+ if (GroomServer.BSPPeerChild.class.equals(child)) {
+ InetSocketAddress addr = groomServer.getTaskTrackerReportAddress();
+ vargs.add(addr.getHostName());
+ vargs.add(Integer.toString(addr.getPort()));
+ vargs.add(task.getTaskID().toString());
+ vargs.add(groomServer.groomHostName);
+ }
+
+ if (jobConf.getConf().getBoolean("bsp.checkpoint.enabled", false)) {
+ String ckptPort = jobConf.getConf().get("bsp.checkpoint.port",
+ CheckpointRunner.DEFAULT_PORT);
+ LOG.debug("Checkpointer's port:" + ckptPort);
+ vargs.add(ckptPort);
+ }
+
+ return vargs;
+ }
+
+ /**
+ * Build working environment and launch BSPPeer and Checkpointer processes.
+ * And transmit data from BSPPeer's inputstream to Checkpointer's
+ * OutputStream.
+ */
+ public void run() {
+ File workDir = createWorkDirectory();
+ logDir = createLogDirectory();
+ String classPath = assembleClasspath(conf, workDir);
+ LOG.debug("Spawned child's classpath " + classPath);
+ List<String> bspArgs = buildJvmArgs(conf, classPath,
+ GroomServer.BSPPeerChild.class);
+
+ BspChildRunner bspPeer = new BspChildRunner(bspArgs, workDir);
+ bspPeer.start();
+ try {
+ bspPeer.join();
+ } catch (InterruptedException ie) {
+ LOG.error("BSPPeer child process is interrupted.", ie);
+ } catch (ExecutionException ee) {
+ LOG.error("Failure occurs when retrieving tasks result.", ee);
+ }
+ LOG.info("Finishes executing BSPPeer child process.");
+ }
+
+ /**
+ * Creates the tasks log directory if needed.
+ *
+ * @return the top directory of the tasks logging area.
+ */
+ private File createLogDirectory() {
+ // our log dir looks following: log/tasklogs/job_id/
+ File f = new File(System.getProperty("hama.log.dir") + File.separator
+ + "tasklogs" + File.separator + task.jobId.id);
+ // TODO if we have attemps: + File.separator+ task.getTaskID());
+
+ if (!f.exists()) {
+ f.mkdirs();
+ }
+
+ return f;
+ }
+
+ /**
+ * Kill bsppeer child process.
+ */
+ public void killBsp() {
+ if (bspProcess != null) {
+ bspProcess.destroy();
+ }
+ bspKilled = true;
+ }
+
+ /**
+ * Log process's stream.
+ *
+ * @param input stream to be logged.
+ * @param stdout type of the log
+ */
+ private void logStream(InputStream input, LogType type) {
+ // STDOUT file can be found under LOG_DIR/task_attempt_id.log
+ // ERROR file can be found under LOG_DIR/task_attempt_id.err
+ File taskLogFile = new File(logDir, task.getTaskAttemptId()
+ + getFileEndingForType(type));
+ BufferedWriter writer = null;
+ try {
+ writer = new BufferedWriter(new FileWriter(taskLogFile));
+ BufferedReader in = new BufferedReader(new InputStreamReader(input));
+ String line;
+ while ((line = in.readLine()) != null) {
+ writer.write(line);
+ writer.newLine();
+ }
+ } catch (IOException e) {
+ LOG.warn(task.getTaskID() + " Error reading child output", e);
+ } finally {
+ try {
+ input.close();
+ } catch (IOException e) {
+ LOG.warn(task.getTaskID() + " Error closing child output", e);
+ }
+ try {
+ writer.close();
+ } catch (IOException e) {
+ LOG.warn(task.getTaskID() + " Error closing log file", e);
+ }
+ }
+ }
+
+ /**
+ * Returns the ending of the logfile for each LogType. e.G. ".log".
+ *
+ * @param type
+ * @return an ending, including a dot.
+ */
+ private String getFileEndingForType(LogType type) {
+ if (type != LogType.ERROR)
+ return ".err";
+ else
+ return ".log";
+ }
}