You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/12 10:51:32 UTC

svn commit: r1182265 - in /incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp: BSPApplicationMaster.java BSPTaskLauncher.java

Author: tjungblut
Date: Wed Oct 12 08:51:32 2011
New Revision: 1182265

URL: http://svn.apache.org/viewvc?rev=1182265&view=rev
Log:
several runtime fixes

Modified:
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1182265&r1=1182264&r2=1182265&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Wed Oct 12 08:51:32 2011
@@ -28,6 +28,8 @@ import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.ipc.ProtocolSignature;
@@ -117,6 +119,12 @@ public class BSPApplicationMaster implem
     this.clientServer = RPC.getServer(this, hostname, clientPort, 10, false,
         jobConf);
 
+    /*
+     * Make sure that this executes after the start of the sync server, because
+     * we are readjusting the configuration.
+     */
+    rewriteSubmitConfiguration(jobFile, jobConf);
+
     amrmRPC = getYarnRPCConnection(localConf);
     registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, null);
   }
@@ -139,7 +147,7 @@ public class BSPApplicationMaster implem
     LOG.info("Waiting for the Sync Master at " + syncAddress);
     RPC.waitForProxy(SyncServer.class, SyncServer.versionID, syncAddress,
         jobConf);
-    jobConf.set("bsp.sync.server.address", hostname + ":" + syncPort);
+    jobConf.set("hama.sync.server.address", hostname + ":" + syncPort);
   }
 
   /**
@@ -237,6 +245,8 @@ public class BSPApplicationMaster implem
       case FAILED:
         finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
         break;
+      default:
+        finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
     }
     this.amrmRPC.finishApplicationMaster(finishReq);
   }
@@ -250,7 +260,6 @@ public class BSPApplicationMaster implem
       master.start();
     } catch (Exception e) {
       LOG.fatal("Error starting BSPApplicationMaster", e);
-      System.exit(1);
     } finally {
       if (master != null) {
         master.cleanup();
@@ -273,6 +282,23 @@ public class BSPApplicationMaster implem
   }
 
   /**
+   * Writes the current configuration to a given path to reflect changes. For
+   * example the sync server address is put after the file has been written.
+   * TODO this should upload to HDFS to a given path as well.
+   * 
+   * @throws IOException
+   */
+  private void rewriteSubmitConfiguration(String path, Configuration conf)
+      throws IOException {
+    Path jobSubmitPath = new Path(path);
+    FileSystem fs = FileSystem.get(conf);
+    FSDataOutputStream out = fs.create(jobSubmitPath);
+    conf.writeXml(out);
+    out.close();
+    LOG.info("Written new configuration back to " + path);
+  }
+
+  /**
    * Uses Minas AvailablePortFinder to find a port, starting at 14000.
    * 
    * @return a free port.

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1182265&r1=1182264&r2=1182265&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Wed Oct 12 08:51:32 2011
@@ -87,7 +87,8 @@ public class BSPTaskLauncher implements 
     LOG.info("Spawned task with id: " + this.id
         + " for allocated container id: "
         + this.allocatedContainer.getId().toString());
-    final GetContainerStatusRequest statusRequest = setupContainer(allocatedContainer, cm, user, id);
+    final GetContainerStatusRequest statusRequest = setupContainer(
+        allocatedContainer, cm, user, id);
 
     ContainerStatus lastStatus;
     while ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
@@ -111,11 +112,13 @@ public class BSPTaskLauncher implements 
     ctx.setResource(allocatedContainer.getResource());
     ctx.setUser(user);
 
+    /*
+     * jar
+     */
     LocalResource packageResource = Records.newRecord(LocalResource.class);
     FileSystem fs = FileSystem.get(conf);
     Path packageFile = new Path(conf.get("bsp.jar"));
-    URL packageUrl = ConverterUtils.getYarnUrlFromPath(new Path(conf
-        .get("bsp.jar")));
+    URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile);
 
     FileStatus fileStatus = fs.getFileStatus(packageFile);
     packageResource.setResource(packageUrl);
@@ -123,14 +126,33 @@ public class BSPTaskLauncher implements 
     packageResource.setTimestamp(fileStatus.getModificationTime());
     packageResource.setType(LocalResourceType.ARCHIVE);
     packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+    LOG.info("Package resource: " + packageResource.getResource());
 
-    ctx.setCommands(Arrays.asList("${JAVA_HOME}"
-        + "/bin/java -cp './package/*' ", BSPTaskLauncher.class
-        .getCanonicalName(), jobId.getJtIdentifier(), id + "", this.jobFile
-        .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString(), " 1>"
-        + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", " 2>"
-        + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
     ctx.setLocalResources(Collections.singletonMap("package", packageResource));
+    
+    /*
+     * TODO Package classpath seems not to work if you're in pseudo distributed
+     * mode, because the resource must not be moved, it will never be unpacked.
+     * So we will check if our jar file has the file:// prefix and put it into
+     * the CP directly
+     */
+    String cp = "$CLASSPATH:./*:./package/*:./*:";
+    if (packageUrl.getScheme() != null && packageUrl.getScheme().equals("file")) {
+      cp += packageFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+          .toString() + ":";
+      LOG.info("Localized file scheme detected, adjusting CP to: " + cp);
+    }
+    String[] cmds = {
+        "${JAVA_HOME}" + "/bin/java -cp '" + cp + "' "
+            + BSPRunner.class.getCanonicalName(),
+        jobId.getJtIdentifier(),
+        id + "",
+        this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+            .toString(),
+        " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+        " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" };
+    ctx.setCommands(Arrays.asList(cmds));
+    LOG.info("Starting command: " + Arrays.toString(cmds));
 
     StartContainerRequest startReq = Records
         .newRecord(StartContainerRequest.class);