You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/06/09 11:37:28 UTC
svn commit: r1684362 -
/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
Author: edwardyoon
Date: Tue Jun 9 09:37:28 2015
New Revision: 1684362
URL: http://svn.apache.org/r1684362
Log:
HAMA-939: Refactoring which was implement using out-of-date status response
Modified:
hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
Modified: hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java?rev=1684362&r1=1684361&r2=1684362&view=diff
==============================================================================
--- hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java (original)
+++ hama/trunk/yarn/src/main/java/org/apache/hama/bsp/ApplicationMaster.java Tue Jun 9 09:37:28 2015
@@ -67,7 +67,7 @@ import java.util.concurrent.ExecutorServ
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
-public class ApplicationMaster implements BSPClient, BSPPeerProtocol {
+public class ApplicationMaster implements BSPClient, BSPPeerProtocol {
private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
// Configuration
@@ -117,7 +117,6 @@ public class ApplicationMaster implemen
@VisibleForTesting
protected ApplicationAttemptId appAttemptID;
-
// TODO
// For status update for clients - yet to be implemented
// Hostname of the container
@@ -133,7 +132,8 @@ public class ApplicationMaster implemen
protected int numTotalContainers;
// Memory to request for the container on which the shell command will run
private int containerMemory;
- // VirtualCores to request for the container on which the shell command will run
+ // VirtualCores to request for the container on which the shell command will
+ // run
private int containerVirtualCores = 1;
// Priority of the request
@@ -160,8 +160,8 @@ public class ApplicationMaster implemen
private List<Thread> launchThreads = new ArrayList<Thread>();
@VisibleForTesting
- protected final Set<ContainerId> launchedContainers =
- Collections.newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+ protected final Set<ContainerId> launchedContainers = Collections
+ .newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
public ApplicationMaster() {
// Set up the configuration
@@ -171,7 +171,7 @@ public class ApplicationMaster implemen
public static void main(String[] args) throws IOException {
boolean result = false;
ApplicationMaster appMaster = new ApplicationMaster();
-
+
try {
LOG.info("Initializing ApplicationMaster");
boolean doRun = appMaster.init(args);
@@ -188,7 +188,7 @@ public class ApplicationMaster implemen
LOG.info("Stop SyncServer and RPCServer.");
appMaster.close();
}
-
+
if (result) {
LOG.info("Application Master completed successfully. exiting");
System.exit(0);
@@ -254,7 +254,7 @@ public class ApplicationMaster implemen
/**
* Main run function for the application master
- *
+ *
* @throws org.apache.hadoop.yarn.exceptions.YarnException
* @throws IOException
*/
@@ -264,8 +264,8 @@ public class ApplicationMaster implemen
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
// are marked as LimitedPrivate
- Credentials credentials =
- UserGroupInformation.getCurrentUser().getCredentials();
+ Credentials credentials = UserGroupInformation.getCurrentUser()
+ .getCredentials();
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
// Now remove the AM->RM token so that containers cannot access it.
@@ -281,13 +281,12 @@ public class ApplicationMaster implemen
allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
// Create appSubmitterUgi and add original tokens to it
- String appSubmitterUserName =
- System.getenv(ApplicationConstants.Environment.USER.name());
- appSubmitterUgi =
- UserGroupInformation.createRemoteUser(appSubmitterUserName);
+ String appSubmitterUserName = System
+ .getenv(ApplicationConstants.Environment.USER.name());
+ appSubmitterUgi = UserGroupInformation
+ .createRemoteUser(appSubmitterUserName);
appSubmitterUgi.addCredentials(credentials);
-
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(localConf);
@@ -328,23 +327,24 @@ public class ApplicationMaster implemen
if (containerVirtualCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
- + " Using max value." + ", specified=" + containerVirtualCores + ", max="
- + maxVCores);
+ + " Using max value."
+ + ", specified="
+ + containerVirtualCores
+ + ", max=" + maxVCores);
containerVirtualCores = maxVCores;
}
- List<Container> previousAMRunningContainers =
- response.getContainersFromPreviousAttempts();
+ List<Container> previousAMRunningContainers = response
+ .getContainersFromPreviousAttempts();
LOG.info(appAttemptID + " received " + previousAMRunningContainers.size()
+ " previous attempts' running containers on AM registration.");
- for(Container container: previousAMRunningContainers) {
+ for (Container container : previousAMRunningContainers) {
launchedContainers.add(container.getId());
}
numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
-
- int numTotalContainersToRequest =
- numTotalContainers - previousAMRunningContainers.size();
+ int numTotalContainersToRequest = numTotalContainers
+ - previousAMRunningContainers.size();
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
@@ -366,11 +366,11 @@ public class ApplicationMaster implemen
@VisibleForTesting
protected boolean finish() {
// wait for completion.
- while (!done
- && (numCompletedContainers.get() != numTotalContainers)) {
+ while (!done && (numCompletedContainers.get() != numTotalContainers)) {
try {
Thread.sleep(200);
- } catch (InterruptedException ex) {}
+ } catch (InterruptedException ex) {
+ }
}
// Join all launched threads
@@ -396,8 +396,8 @@ public class ApplicationMaster implemen
FinalApplicationStatus appStatus;
String appMessage = null;
boolean success = true;
- if (numFailedContainers.get() == 0 &&
- numCompletedContainers.get() == numTotalContainers) {
+ if (numFailedContainers.get() == 0
+ && numCompletedContainers.get() == numTotalContainers) {
appStatus = FinalApplicationStatus.SUCCEEDED;
} else {
appStatus = FinalApplicationStatus.FAILED;
@@ -504,7 +504,7 @@ public class ApplicationMaster implemen
+ ", containerResourceVirtualCores"
+ allocatedContainer.getResource().getVirtualCores());
- Thread launchThread = createLaunchContainerThread(allocatedContainer, allocatedContainer.getId().getContainerId());
+ Thread launchThread = createLaunchContainerThread(allocatedContainer);
// launch and start the container on a separate thread to keep
// the main thread unblocked
@@ -541,11 +541,9 @@ public class ApplicationMaster implemen
}
@VisibleForTesting
- static class NMCallbackHandler
- implements NMClientAsync.CallbackHandler {
+ static class NMCallbackHandler implements NMClientAsync.CallbackHandler {
- private ConcurrentMap<ContainerId, Container> containers =
- new ConcurrentHashMap<ContainerId, Container>();
+ private ConcurrentMap<ContainerId, Container> containers = new ConcurrentHashMap<ContainerId, Container>();
private final ApplicationMaster applicationMaster;
public NMCallbackHandler(ApplicationMaster applicationMaster) {
@@ -568,8 +566,8 @@ public class ApplicationMaster implemen
public void onContainerStatusReceived(ContainerId containerId,
ContainerStatus containerStatus) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Container Status: id=" + containerId + ", status=" +
- containerStatus);
+ LOG.debug("Container Status: id=" + containerId + ", status="
+ + containerStatus);
}
}
@@ -581,7 +579,8 @@ public class ApplicationMaster implemen
}
Container container = containers.get(containerId);
if (container != null) {
- applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+ applicationMaster.nmClientAsync.getContainerStatusAsync(containerId,
+ container.getNodeId());
}
}
@@ -594,8 +593,7 @@ public class ApplicationMaster implemen
}
@Override
- public void onGetContainerStatusError(
- ContainerId containerId, Throwable t) {
+ public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
LOG.error("Failed to query the status of Container " + containerId);
}
@@ -607,8 +605,8 @@ public class ApplicationMaster implemen
}
/**
- * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
- * that will execute the shell command.
+ * Thread to connect to the {@link ContainerManagementProtocol} and launch the
+ * container that will execute the shell command.
*/
private class LaunchContainerRunnable implements Runnable {
@@ -619,25 +617,20 @@ public class ApplicationMaster implemen
Configuration conf;
- long taskAttemptId;
-
/**
- * @param lcontainer Allocated container
+ * @param lcontainer Allocated container
* @param containerListener Callback handler of the container
*/
- public LaunchContainerRunnable(
- Container lcontainer, NMCallbackHandler containerListener,
- Configuration conf, long taskAttemptId) {
+ public LaunchContainerRunnable(Container lcontainer,
+ NMCallbackHandler containerListener, Configuration conf) {
this.container = lcontainer;
this.containerListener = containerListener;
this.conf = conf;
- this.taskAttemptId = taskAttemptId;
}
/**
- * Connects to CM, sets up container launch context
- * for shell command and eventually dispatches the container
- * start request to the CM.
+ * Connects to CM, sets up container launch context for shell command and
+ * eventually dispatches the container start request to the CM.
*/
@Override
public void run() {
@@ -656,11 +649,12 @@ public class ApplicationMaster implemen
} catch (IOException e) {
e.printStackTrace();
}
- Path packageFile = new Path(System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
+ Path packageFile = new Path(
+ System.getenv(YARNBSPConstants.HAMA_YARN_LOCATION));
URL packageUrl = null;
try {
packageUrl = ConverterUtils.getYarnUrlFromPath(packageFile
- .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
+ .makeQualified(fs.getUri(), fs.getWorkingDirectory()));
LOG.info("PackageURL has been composed to " + packageUrl.toString());
LOG.info("Reverting packageURL to path: "
+ ConverterUtils.getPathFromYarnURL(packageUrl));
@@ -672,25 +666,27 @@ public class ApplicationMaster implemen
}
packageResource.setResource(packageUrl);
- packageResource.setSize(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
- packageResource.setTimestamp(Long.parseLong(System.getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
+ packageResource.setSize(Long.parseLong(System
+ .getenv(YARNBSPConstants.HAMA_YARN_SIZE)));
+ packageResource.setTimestamp(Long.parseLong(System
+ .getenv(YARNBSPConstants.HAMA_YARN_TIMESTAMP)));
packageResource.setType(LocalResourceType.FILE);
packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
localResources.put(YARNBSPConstants.APP_MASTER_JAR_PATH, packageResource);
- Path hamaReleaseFile = new Path(System.getenv(YARNBSPConstants.HAMA_LOCATION));
+ Path hamaReleaseFile = new Path(
+ System.getenv(YARNBSPConstants.HAMA_LOCATION));
URL hamaReleaseUrl = ConverterUtils.getYarnUrlFromPath(hamaReleaseFile
.makeQualified(fs.getUri(), fs.getWorkingDirectory()));
- LOG.info("Hama release URL has been composed to " + hamaReleaseUrl
- .toString());
+ LOG.info("Hama release URL has been composed to "
+ + hamaReleaseUrl.toString());
RemoteIterator<LocatedFileStatus> fileStatusListIterator = null;
try {
- fileStatusListIterator = fs.listFiles(
- hamaReleaseFile, true);
+ fileStatusListIterator = fs.listFiles(hamaReleaseFile, true);
- while(fileStatusListIterator.hasNext()) {
+ while (fileStatusListIterator.hasNext()) {
LocatedFileStatus lfs = fileStatusListIterator.next();
LocalResource localRsrc = LocalResource.newInstance(
ConverterUtils.getYarnUrlFromPath(lfs.getPath()),
@@ -704,16 +700,16 @@ public class ApplicationMaster implemen
ctx.setLocalResources(localResources);
- /*
- * TODO Package classpath seems not to work if you're in pseudo distributed
- * mode, because the resource must not be moved, it will never be unpacked.
- * So we will check if our jar file has the file:// prefix and put it into
- * the CP directly
- */
+ /*
+ * TODO Package classpath seems not to work if you're in pseudo
+ * distributed mode, because the resource must not be moved, it will never
+ * be unpacked. So we will check if our jar file has the file:// prefix
+ * and put it into the CP directly
+ */
StringBuilder classPathEnv = new StringBuilder(
- ApplicationConstants.Environment.CLASSPATH.$()).append(File.pathSeparatorChar)
- .append("./*");
+ ApplicationConstants.Environment.CLASSPATH.$()).append(
+ File.pathSeparatorChar).append("./*");
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
@@ -727,13 +723,14 @@ public class ApplicationMaster implemen
vargs.add(BSPRunner.class.getCanonicalName());
vargs.add(jobId.getJtIdentifier());
- vargs.add(Long.toString(taskAttemptId));
- vargs.add(
- new Path(jobFile).makeQualified(fs.getUri(), fs.getWorkingDirectory())
- .toString());
-
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stdout");
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/hama-worker.stderr");
+ vargs.add(Long.toString(container.getId().getContainerId()));
+ vargs.add(new Path(jobFile).makeQualified(fs.getUri(),
+ fs.getWorkingDirectory()).toString());
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + "/hama-worker.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + "/hama-worker.stderr");
// Get final commmand
StringBuilder command = new StringBuilder();
@@ -755,7 +752,7 @@ public class ApplicationMaster implemen
/**
* Setup the request that will be sent to the RM for the container ask.
- *
+ *
* @return the setup ResourceRequest to be sent to RM
*/
private AMRMClient.ContainerRequest setupContainerAskForRM() {
@@ -766,12 +763,13 @@ public class ApplicationMaster implemen
Priority pri = Priority.newInstance(requestPriority);
// Set up resource type requirements
- // For now, memory and CPU are supported so we set memory and cpu requirements
+ // For now, memory and CPU are supported so we set memory and cpu
+ // requirements
Resource capability = Resource.newInstance(containerMemory,
containerVirtualCores);
- AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(capability, null, null,
- pri);
+ AMRMClient.ContainerRequest request = new AMRMClient.ContainerRequest(
+ capability, null, null, pri);
LOG.info("Requested container ask: " + request.toString());
return request;
}
@@ -786,7 +784,7 @@ public class ApplicationMaster implemen
FileSystem fs = FileSystem.get(URI.create(path), jobConf);
- InputStream in =fs.open(jobSubmitPath);
+ InputStream in = fs.open(jobSubmitPath);
jobConf.addResource(in);
return jobConf;
@@ -795,7 +793,7 @@ public class ApplicationMaster implemen
/**
* Gets the application attempt ID from the environment. This should be set by
* YARN when the container has been launched.
- *
+ *
* @return a new ApplicationAttemptId which is unique and identifies this
* task.
*/
@@ -816,7 +814,7 @@ public class ApplicationMaster implemen
* This method starts the sync server on a specific port and waits for it to
* come up. Be aware that this method adds the "bsp.sync.server.address" that
* is needed for a task to connect to the service.
- *
+ *
* @throws IOException
*/
private void startSyncServer() throws Exception {
@@ -828,8 +826,8 @@ public class ApplicationMaster implemen
}
/**
- * This method is to run Zookeeper in order to coordinates between BSPMaster and Groomservers
- * using Runnable interface in java.
+ * This method is to run Zookeeper in order to coordinates between BSPMaster
+ * and Groomservers using Runnable interface in java.
*/
private static class ZKServerThread implements Runnable {
SyncServer server;
@@ -852,12 +850,13 @@ public class ApplicationMaster implemen
* This method starts the needed RPC servers: client server and the task
* server. This method manipulates the configuration and therefore needs to be
* executed BEFORE the submitconfiguration gets rewritten.
- *
+ *
* @throws IOException
*/
private void startRPCServers() throws IOException {
// start the RPC server which talks to the client
- this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort, jobConf);
+ this.clientServer = RPC.getServer(BSPClient.class, hostname, clientPort,
+ jobConf);
this.clientServer.start();
// start the RPC server which talks to the tasks
@@ -886,6 +885,7 @@ public class ApplicationMaster implemen
/**
* Get container memory from "bsp.child.mem.in.mb" set on Hama configuration
+ *
* @return The memory of container.
*/
private int getMemoryRequirements(Configuration conf) {
@@ -907,8 +907,7 @@ public class ApplicationMaster implemen
}
if (!opts.contains("-Xmx")) {
- LOG.info(
- "No \"-Xmx\" option found in child opts, using default amount of memory!");
+ LOG.info("No \"-Xmx\" option found in child opts, using default amount of memory!");
return DEFAULT_MEMORY_MB;
} else {
// e.G: -Xmx512m
@@ -916,8 +915,8 @@ public class ApplicationMaster implemen
int startIndex = opts.indexOf("-Xmx") + 4;
String xmxString = opts.substring(startIndex);
char qualifier = xmxString.charAt(xmxString.length() - 1);
- int memory = Integer
- .valueOf(xmxString.substring(0, xmxString.length() - 1));
+ int memory = Integer.valueOf(xmxString.substring(0,
+ xmxString.length() - 1));
if (qualifier == 'm') {
return memory;
} else if (qualifier == 'g') {
@@ -931,9 +930,9 @@ public class ApplicationMaster implemen
}
@VisibleForTesting
- Thread createLaunchContainerThread(Container allocatedContainer, long taskAttemptId) {
- LaunchContainerRunnable runnableLaunchContainer =
- new LaunchContainerRunnable(allocatedContainer, containerListener, jobConf, taskAttemptId);
+ Thread createLaunchContainerThread(Container allocatedContainer) {
+ LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(
+ allocatedContainer, containerListener, jobConf);
return new Thread(runnableLaunchContainer);
}
@@ -946,7 +945,7 @@ public class ApplicationMaster implemen
public Task getTask(TaskAttemptID taskid) throws IOException {
BSPJobClient.RawSplit assignedSplit = null;
String splitName = NullInputFormat.NullInputSplit.class.getName();
- //String splitName = NullInputSplit.class.getCanonicalName();
+ // String splitName = NullInputSplit.class.getCanonicalName();
if (splits != null) {
assignedSplit = splits[taskid.id];
splitName = assignedSplit.getClassName();
@@ -1003,6 +1002,7 @@ public class ApplicationMaster implemen
this.clientServer.stop();
this.taskServer.stop();
this.syncServer.stopServer();
+ threadPool.shutdown();
}
@Override