You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/12 10:51:32 UTC
svn commit: r1182265 - in
/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp:
BSPApplicationMaster.java BSPTaskLauncher.java
Author: tjungblut
Date: Wed Oct 12 08:51:32 2011
New Revision: 1182265
URL: http://svn.apache.org/viewvc?rev=1182265&view=rev
Log:
several runtime fixes
Modified:
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1182265&r1=1182264&r2=1182265&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Wed Oct 12 08:51:32 2011
@@ -28,6 +28,8 @@ import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.ipc.ProtocolSignature;
@@ -117,6 +119,12 @@ public class BSPApplicationMaster implem
this.clientServer = RPC.getServer(this, hostname, clientPort, 10, false,
jobConf);
+ /*
+ * Make sure that this executes after the start of the sync server, because
+ * we are readjusting the configuration.
+ */
+ rewriteSubmitConfiguration(jobFile, jobConf);
+
amrmRPC = getYarnRPCConnection(localConf);
registerApplicationMaster(amrmRPC, appAttemptId, hostname, clientPort, null);
}
@@ -139,7 +147,7 @@ public class BSPApplicationMaster implem
LOG.info("Waiting for the Sync Master at " + syncAddress);
RPC.waitForProxy(SyncServer.class, SyncServer.versionID, syncAddress,
jobConf);
- jobConf.set("bsp.sync.server.address", hostname + ":" + syncPort);
+ jobConf.set("hama.sync.server.address", hostname + ":" + syncPort);
}
/**
@@ -237,6 +245,8 @@ public class BSPApplicationMaster implem
case FAILED:
finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
break;
+ default:
+ finishReq.setFinishApplicationStatus(FinalApplicationStatus.FAILED);
}
this.amrmRPC.finishApplicationMaster(finishReq);
}
@@ -250,7 +260,6 @@ public class BSPApplicationMaster implem
master.start();
} catch (Exception e) {
LOG.fatal("Error starting BSPApplicationMaster", e);
- System.exit(1);
} finally {
if (master != null) {
master.cleanup();
@@ -273,6 +282,23 @@ public class BSPApplicationMaster implem
}
/**
+ * Writes the current configuration to a given path to reflect changes. For
+ * example the sync server address is put after the file has been written.
+ * TODO this should upload to HDFS to a given path as well.
+ *
+ * @throws IOException
+ */
+ private void rewriteSubmitConfiguration(String path, Configuration conf)
+ throws IOException {
+ Path jobSubmitPath = new Path(path);
+ FileSystem fs = FileSystem.get(conf);
+ FSDataOutputStream out = fs.create(jobSubmitPath);
+ conf.writeXml(out);
+ out.close();
+ LOG.info("Written new configuration back to " + path);
+ }
+
+ /**
* Uses Minas AvailablePortFinder to find a port, starting at 14000.
*
* @return a free port.
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java?rev=1182265&r1=1182264&r2=1182265&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPTaskLauncher.java Wed Oct 12 08:51:32 2011
@@ -87,7 +87,8 @@ public class BSPTaskLauncher implements
LOG.info("Spawned task with id: " + this.id
+ " for allocated container id: "
+ this.allocatedContainer.getId().toString());
- final GetContainerStatusRequest statusRequest = setupContainer(allocatedContainer, cm, user, id);
+ final GetContainerStatusRequest statusRequest = setupContainer(
+ allocatedContainer, cm, user, id);
ContainerStatus lastStatus;
while ((lastStatus = cm.getContainerStatus(statusRequest).getStatus())
@@ -111,11 +112,13 @@ public class BSPTaskLauncher implements
ctx.setResource(allocatedContainer.getResource());
ctx.setUser(user);
+ /*
+ * jar
+ */
LocalResource packageResource = Records.newRecord(LocalResource.class);
FileSystem fs = FileSystem.get(conf);
Path packageFile = new Path(conf.get("bsp.jar"));
- URL packageUrl = ConverterUtils.getYarnUrlFromPath(new Path(conf
- .get("bsp.jar")));
+ URL packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile);
FileStatus fileStatus = fs.getFileStatus(packageFile);
packageResource.setResource(packageUrl);
@@ -123,14 +126,33 @@ public class BSPTaskLauncher implements
packageResource.setTimestamp(fileStatus.getModificationTime());
packageResource.setType(LocalResourceType.ARCHIVE);
packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ LOG.info("Package resource: " + packageResource.getResource());
- ctx.setCommands(Arrays.asList("${JAVA_HOME}"
- + "/bin/java -cp './package/*' ", BSPTaskLauncher.class
- .getCanonicalName(), jobId.getJtIdentifier(), id + "", this.jobFile
- .makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString(), " 1>"
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", " 2>"
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
ctx.setLocalResources(Collections.singletonMap("package", packageResource));
+
+ /*
+ * TODO Package classpath seems not to work if you're in pseudo distributed
+ * mode, because the resource must not be moved, it will never be unpacked.
+ * So we will check if our jar file has the file:// prefix and put it into
+ * the CP directly
+ */
+ String cp = "$CLASSPATH:./*:./package/*:./*:";
+ if (packageUrl.getScheme() != null && packageUrl.getScheme().equals("file")) {
+ cp += packageFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+ .toString() + ":";
+ LOG.info("Localized file scheme detected, adjusting CP to: " + cp);
+ }
+ String[] cmds = {
+ "${JAVA_HOME}" + "/bin/java -cp '" + cp + "' "
+ + BSPRunner.class.getCanonicalName(),
+ jobId.getJtIdentifier(),
+ id + "",
+ this.jobFile.makeQualified(fs.getUri(), fs.getWorkingDirectory())
+ .toString(),
+ " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" };
+ ctx.setCommands(Arrays.asList(cmds));
+ LOG.info("Starting command: " + Arrays.toString(cmds));
StartContainerRequest startReq = Records
.newRecord(StartContainerRequest.class);