You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/10/05 05:38:57 UTC

svn commit: r1179045 - /incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java

Author: edwardyoon
Date: Wed Oct  5 03:38:57 2011
New Revision: 1179045

URL: http://svn.apache.org/viewvc?rev=1179045&view=rev
Log:
Re-formatting

Modified:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java?rev=1179045&r1=1179044&r2=1179045&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/TaskRunner.java Wed Oct  5 03:38:57 2011
@@ -46,290 +46,283 @@ import org.apache.hama.checkpoint.Checkp
  */
 public class TaskRunner extends Thread {
 
-	public static final Log LOG = LogFactory.getLog(TaskRunner.class);
-	private static final String SYSTEM_PATH_SEPARATOR = System
-			.getProperty("path.separator");
-
-	private enum LogType {
-		STDOUT, ERROR
-	}
-
-	boolean bspKilled = false;
-	private Process bspProcess;
-
-	private final Task task;
-	private final BSPJob conf;
-	private final GroomServer groomServer;
-
-	private File logDir;
-
-	class BspChildRunner implements Callable<Object> {
-		private final List<String> commands;
-		private final File workDir;
-		private final ScheduledExecutorService sched;
-		private final AtomicReference<ScheduledFuture<Object>> future;
-
-		BspChildRunner(List<String> commands, File workDir) {
-			this.commands = commands;
-			this.workDir = workDir;
-			this.sched = Executors.newScheduledThreadPool(1);
-			this.future = new AtomicReference<ScheduledFuture<Object>>();
-		}
-
-		void start() {
-			this.future.set(this.sched.schedule(this, 0, SECONDS));
-			LOG.info("Start building BSPPeer process.");
-		}
-
-		void stop() {
-			killBsp();
-			this.sched.schedule(this, 0, SECONDS);
-			LOG.info("Stop BSPPeer process.");
-		}
-
-		void join() throws InterruptedException, ExecutionException {
-			this.future.get().get();
-		}
-
-		public Object call() throws Exception {
-			ProcessBuilder builder = new ProcessBuilder(commands);
-			builder.directory(workDir);
-			try {
-				bspProcess = builder.start();
-				new Thread() {
-					public void run() {
-						logStream(bspProcess.getErrorStream(), LogType.ERROR);
-					}
-				}.start();
-
-				new Thread() {
-					public void run() {
-						logStream(bspProcess.getInputStream(), LogType.STDOUT);
-					}
-				}.start();
-
-				int exit_code = bspProcess.waitFor();
-				if (!bspKilled && exit_code != 0) {
-					throw new IOException(
-							"BSP task process exit with nonzero status of "
-									+ exit_code + ".");
-				}
-			} catch (InterruptedException e) {
-				LOG.warn(
-						"Thread is interrupted when execeuting Checkpointer process.",
-						e);
-			} catch (IOException ioe) {
-				LOG.error("Error when executing BSPPeer process.", ioe);
-			} finally {
-				killBsp();
-			}
-			return null;
-		}
-	}
-
-	public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
-		this.task = bspTask;
-		this.conf = conf;
-		this.groomServer = groom;
-	}
-
-	public Task getTask() {
-		return task;
-	}
-
-	/**
-	 * Called to assemble this task's input. This method is run in the parent
-	 * process before the child is spawned. It should not execute user code,
-	 * only system code.
-	 */
-	public boolean prepare() throws IOException {
-		return true;
-	}
-
-	private File createWorkDirectory() {
-		File workDir = new File(new File(task.getJobFile()).getParent(), "work");
-		boolean isCreated = workDir.mkdirs();
-		if (isCreated) {
-			LOG.debug("TaskRunner.workDir : " + workDir);
-		}
-		return workDir;
-	}
-
-	private String assembleClasspath(BSPJob jobConf, File workDir) {
-		StringBuffer classPath = new StringBuffer();
-		// start with same classpath as parent process
-		classPath.append(System.getProperty("java.class.path"));
-		classPath.append(SYSTEM_PATH_SEPARATOR);
-
-		String jar = jobConf.getJar();
-		if (jar != null) { // if jar exists, it into workDir
-			try {
-				RunJar.unJar(new File(jar), workDir);
-			} catch (IOException ioe) {
-				LOG.error(
-						"Unable to uncompressing file to " + workDir.toString(),
-						ioe);
-			}
-			File[] libs = new File(workDir, "lib").listFiles();
-			if (libs != null) {
-				for (int i = 0; i < libs.length; i++) {
-					// add libs from jar to classpath
-					classPath.append(SYSTEM_PATH_SEPARATOR);
-					classPath.append(libs[i]);
-				}
-			}
-			classPath.append(SYSTEM_PATH_SEPARATOR);
-			classPath.append(new File(workDir, "classes"));
-			classPath.append(SYSTEM_PATH_SEPARATOR);
-			classPath.append(workDir);
-		}
-		return classPath.toString();
-	}
-
-	private List<String> buildJvmArgs(BSPJob jobConf, String classPath,
-			Class<?> child) {
-		// Build exec child jmv args.
-		List<String> vargs = new ArrayList<String>();
-		File jvm = // use same jvm as parent
-		new File(new File(System.getProperty("java.home"), "bin"), "java");
-		vargs.add(jvm.toString());
-
-		// bsp.child.java.opts
-		String javaOpts = jobConf.getConf().get("bsp.child.java.opts",
-				"-Xmx200m");
-		javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
-
-		String[] javaOptsSplit = javaOpts.split(" ");
-		for (int i = 0; i < javaOptsSplit.length; i++) {
-			vargs.add(javaOptsSplit[i]);
-		}
-
-		// Add classpath.
-		vargs.add("-classpath");
-		vargs.add(classPath.toString());
-		// Add main class and its arguments
-		LOG.debug("Executing child Process " + child.getName());
-		vargs.add(child.getName()); // main of bsp or checkpointer Child
-
-		if (GroomServer.BSPPeerChild.class.equals(child)) {
-			InetSocketAddress addr = groomServer.getTaskTrackerReportAddress();
-			vargs.add(addr.getHostName());
-			vargs.add(Integer.toString(addr.getPort()));
-			vargs.add(task.getTaskID().toString());
-			vargs.add(groomServer.groomHostName);
-		}
-
-		if (jobConf.getConf().getBoolean("bsp.checkpoint.enabled", false)) {
-			String ckptPort = jobConf.getConf().get("bsp.checkpoint.port",
-					CheckpointRunner.DEFAULT_PORT);
-			LOG.debug("Checkpointer's port:" + ckptPort);
-			vargs.add(ckptPort);
-		}
-
-		return vargs;
-	}
-
-	/**
-	 * Build working environment and launch BSPPeer and Checkpointer processes.
-	 * And transmit data from BSPPeer's inputstream to Checkpointer's
-	 * OutputStream.
-	 */
-	public void run() {
-		File workDir = createWorkDirectory();
-		logDir = createLogDirectory();
-		String classPath = assembleClasspath(conf, workDir);
-		LOG.debug("Spawned child's classpath " + classPath);
-		List<String> bspArgs = buildJvmArgs(conf, classPath,
-				GroomServer.BSPPeerChild.class);
-
-		BspChildRunner bspPeer = new BspChildRunner(bspArgs, workDir);
-		bspPeer.start();
-		try {
-			bspPeer.join();
-		} catch (InterruptedException ie) {
-			LOG.error("BSPPeer child process is interrupted.", ie);
-		} catch (ExecutionException ee) {
-			LOG.error("Failure occurs when retrieving tasks result.", ee);
-		}
-		LOG.info("Finishes executing BSPPeer child process.");
-	}
-
-	/**
-	 * Creates the tasks log directory if needed.
-	 * 
-	 * @return the top directory of the tasks logging area.
-	 */
-	private File createLogDirectory() {
-		// our log dir looks following: log/tasklogs/job_id/
-		File f = new File(System.getProperty("hama.log.dir") + File.separator
-				+ "tasklogs" + File.separator + task.jobId.id);
-		// TODO if we have attemps: + File.separator+ task.getTaskID());
-
-		if (!f.exists()) {
-			f.mkdirs();
-		}
-
-		return f;
-	}
-
-	/**
-	 * Kill bsppeer child process.
-	 */
-	public void killBsp() {
-		if (bspProcess != null) {
-			bspProcess.destroy();
-		}
-		bspKilled = true;
-	}
-
-	/**
-	 * Log process's stream.
-	 * 
-	 * @param input
-	 *            stream to be logged.
-	 * @param stdout
-	 *            type of the log
-	 */
-	private void logStream(InputStream input, LogType type) {
-		// STDOUT file can be found under LOG_DIR/task_attempt_id.log
-		// ERROR file can be found under LOG_DIR/task_attempt_id.err
-		File taskLogFile = new File(logDir, task.getTaskAttemptId()
-				+ getFileEndingForType(type));
-		BufferedWriter writer = null;
-		try {
-			writer = new BufferedWriter(new FileWriter(taskLogFile));
-			BufferedReader in = new BufferedReader(new InputStreamReader(input));
-			String line;
-			while ((line = in.readLine()) != null) {
-				writer.write(line);
-				writer.newLine();
-			}
-		} catch (IOException e) {
-			LOG.warn(task.getTaskID() + " Error reading child output", e);
-		} finally {
-			try {
-				input.close();
-			} catch (IOException e) {
-				LOG.warn(task.getTaskID() + " Error closing child output", e);
-			}
-			try {
-				writer.close();
-			} catch (IOException e) {
-				LOG.warn(task.getTaskID() + " Error closing log file", e);
-			}
-		}
-	}
-
-	/**
-	 * Returns the ending of the logfile for each LogType. e.G. ".log".
-	 * 
-	 * @param type
-	 * @return an ending, including a dot.
-	 */
-	private String getFileEndingForType(LogType type) {
-		if (type != LogType.ERROR)
-			return ".err";
-		else
-			return ".log";
-	}
+  public static final Log LOG = LogFactory.getLog(TaskRunner.class);
+  private static final String SYSTEM_PATH_SEPARATOR = System
+      .getProperty("path.separator");
+
+  private enum LogType {
+    STDOUT, ERROR
+  }
+
+  boolean bspKilled = false;
+  private Process bspProcess;
+
+  private final Task task;
+  private final BSPJob conf;
+  private final GroomServer groomServer;
+
+  private File logDir;
+
+  class BspChildRunner implements Callable<Object> {
+    private final List<String> commands;
+    private final File workDir;
+    private final ScheduledExecutorService sched;
+    private final AtomicReference<ScheduledFuture<Object>> future;
+
+    BspChildRunner(List<String> commands, File workDir) {
+      this.commands = commands;
+      this.workDir = workDir;
+      this.sched = Executors.newScheduledThreadPool(1);
+      this.future = new AtomicReference<ScheduledFuture<Object>>();
+    }
+
+    void start() {
+      this.future.set(this.sched.schedule(this, 0, SECONDS));
+      LOG.info("Start building BSPPeer process.");
+    }
+
+    void stop() {
+      killBsp();
+      this.sched.schedule(this, 0, SECONDS);
+      LOG.info("Stop BSPPeer process.");
+    }
+
+    void join() throws InterruptedException, ExecutionException {
+      this.future.get().get();
+    }
+
+    public Object call() throws Exception {
+      ProcessBuilder builder = new ProcessBuilder(commands);
+      builder.directory(workDir);
+      try {
+        bspProcess = builder.start();
+        new Thread() {
+          public void run() {
+            logStream(bspProcess.getErrorStream(), LogType.ERROR);
+          }
+        }.start();
+
+        new Thread() {
+          public void run() {
+            logStream(bspProcess.getInputStream(), LogType.STDOUT);
+          }
+        }.start();
+
+        int exit_code = bspProcess.waitFor();
+        if (!bspKilled && exit_code != 0) {
+          throw new IOException("BSP task process exit with nonzero status of "
+              + exit_code + ".");
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Thread is interrupted when execeuting Checkpointer process.",
+            e);
+      } catch (IOException ioe) {
+        LOG.error("Error when executing BSPPeer process.", ioe);
+      } finally {
+        killBsp();
+      }
+      return null;
+    }
+  }
+
+  public TaskRunner(BSPTask bspTask, GroomServer groom, BSPJob conf) {
+    this.task = bspTask;
+    this.conf = conf;
+    this.groomServer = groom;
+  }
+
+  public Task getTask() {
+    return task;
+  }
+
+  /**
+   * Called to assemble this task's input. This method is run in the parent
+   * process before the child is spawned. It should not execute user code, only
+   * system code.
+   */
+  public boolean prepare() throws IOException {
+    return true;
+  }
+
+  private File createWorkDirectory() {
+    File workDir = new File(new File(task.getJobFile()).getParent(), "work");
+    boolean isCreated = workDir.mkdirs();
+    if (isCreated) {
+      LOG.debug("TaskRunner.workDir : " + workDir);
+    }
+    return workDir;
+  }
+
+  private String assembleClasspath(BSPJob jobConf, File workDir) {
+    StringBuffer classPath = new StringBuffer();
+    // start with same classpath as parent process
+    classPath.append(System.getProperty("java.class.path"));
+    classPath.append(SYSTEM_PATH_SEPARATOR);
+
+    String jar = jobConf.getJar();
+    if (jar != null) { // if jar exists, it into workDir
+      try {
+        RunJar.unJar(new File(jar), workDir);
+      } catch (IOException ioe) {
+        LOG.error("Unable to uncompressing file to " + workDir.toString(), ioe);
+      }
+      File[] libs = new File(workDir, "lib").listFiles();
+      if (libs != null) {
+        for (int i = 0; i < libs.length; i++) {
+          // add libs from jar to classpath
+          classPath.append(SYSTEM_PATH_SEPARATOR);
+          classPath.append(libs[i]);
+        }
+      }
+      classPath.append(SYSTEM_PATH_SEPARATOR);
+      classPath.append(new File(workDir, "classes"));
+      classPath.append(SYSTEM_PATH_SEPARATOR);
+      classPath.append(workDir);
+    }
+    return classPath.toString();
+  }
+
+  private List<String> buildJvmArgs(BSPJob jobConf, String classPath,
+      Class<?> child) {
+    // Build exec child jmv args.
+    List<String> vargs = new ArrayList<String>();
+    File jvm = // use same jvm as parent
+    new File(new File(System.getProperty("java.home"), "bin"), "java");
+    vargs.add(jvm.toString());
+
+    // bsp.child.java.opts
+    String javaOpts = jobConf.getConf().get("bsp.child.java.opts", "-Xmx200m");
+    javaOpts = javaOpts.replace("@taskid@", task.getTaskID().toString());
+
+    String[] javaOptsSplit = javaOpts.split(" ");
+    for (int i = 0; i < javaOptsSplit.length; i++) {
+      vargs.add(javaOptsSplit[i]);
+    }
+
+    // Add classpath.
+    vargs.add("-classpath");
+    vargs.add(classPath.toString());
+    // Add main class and its arguments
+    LOG.debug("Executing child Process " + child.getName());
+    vargs.add(child.getName()); // main of bsp or checkpointer Child
+
+    if (GroomServer.BSPPeerChild.class.equals(child)) {
+      InetSocketAddress addr = groomServer.getTaskTrackerReportAddress();
+      vargs.add(addr.getHostName());
+      vargs.add(Integer.toString(addr.getPort()));
+      vargs.add(task.getTaskID().toString());
+      vargs.add(groomServer.groomHostName);
+    }
+
+    if (jobConf.getConf().getBoolean("bsp.checkpoint.enabled", false)) {
+      String ckptPort = jobConf.getConf().get("bsp.checkpoint.port",
+          CheckpointRunner.DEFAULT_PORT);
+      LOG.debug("Checkpointer's port:" + ckptPort);
+      vargs.add(ckptPort);
+    }
+
+    return vargs;
+  }
+
+  /**
+   * Build working environment and launch BSPPeer and Checkpointer processes.
+   * And transmit data from BSPPeer's inputstream to Checkpointer's
+   * OutputStream.
+   */
+  public void run() {
+    File workDir = createWorkDirectory();
+    logDir = createLogDirectory();
+    String classPath = assembleClasspath(conf, workDir);
+    LOG.debug("Spawned child's classpath " + classPath);
+    List<String> bspArgs = buildJvmArgs(conf, classPath,
+        GroomServer.BSPPeerChild.class);
+
+    BspChildRunner bspPeer = new BspChildRunner(bspArgs, workDir);
+    bspPeer.start();
+    try {
+      bspPeer.join();
+    } catch (InterruptedException ie) {
+      LOG.error("BSPPeer child process is interrupted.", ie);
+    } catch (ExecutionException ee) {
+      LOG.error("Failure occurs when retrieving tasks result.", ee);
+    }
+    LOG.info("Finishes executing BSPPeer child process.");
+  }
+
+  /**
+   * Creates the tasks log directory if needed.
+   * 
+   * @return the top directory of the tasks logging area.
+   */
+  private File createLogDirectory() {
+    // our log dir looks following: log/tasklogs/job_id/
+    File f = new File(System.getProperty("hama.log.dir") + File.separator
+        + "tasklogs" + File.separator + task.jobId.id);
+    // TODO if we have attemps: + File.separator+ task.getTaskID());
+
+    if (!f.exists()) {
+      f.mkdirs();
+    }
+
+    return f;
+  }
+
+  /**
+   * Kill bsppeer child process.
+   */
+  public void killBsp() {
+    if (bspProcess != null) {
+      bspProcess.destroy();
+    }
+    bspKilled = true;
+  }
+
+  /**
+   * Log process's stream.
+   * 
+   * @param input stream to be logged.
+   * @param stdout type of the log
+   */
+  private void logStream(InputStream input, LogType type) {
+    // STDOUT file can be found under LOG_DIR/task_attempt_id.log
+    // ERROR file can be found under LOG_DIR/task_attempt_id.err
+    File taskLogFile = new File(logDir, task.getTaskAttemptId()
+        + getFileEndingForType(type));
+    BufferedWriter writer = null;
+    try {
+      writer = new BufferedWriter(new FileWriter(taskLogFile));
+      BufferedReader in = new BufferedReader(new InputStreamReader(input));
+      String line;
+      while ((line = in.readLine()) != null) {
+        writer.write(line);
+        writer.newLine();
+      }
+    } catch (IOException e) {
+      LOG.warn(task.getTaskID() + " Error reading child output", e);
+    } finally {
+      try {
+        input.close();
+      } catch (IOException e) {
+        LOG.warn(task.getTaskID() + " Error closing child output", e);
+      }
+      try {
+        writer.close();
+      } catch (IOException e) {
+        LOG.warn(task.getTaskID() + " Error closing log file", e);
+      }
+    }
+  }
+
+  /**
+   * Returns the ending of the logfile for each LogType. e.G. ".log".
+   * 
+   * @param type
+   * @return an ending, including a dot.
+   */
+  private String getFileEndingForType(LogType type) {
+    if (type != LogType.ERROR)
+      return ".err";
+    else
+      return ".log";
+  }
 }