You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/04 11:13:56 UTC
svn commit: r1178735 - in
/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp:
BSPApplicationMaster.java BSPJobImpl.java
Author: tjungblut
Date: Tue Oct 4 09:13:56 2011
New Revision: 1178735
URL: http://svn.apache.org/viewvc?rev=1178735&view=rev
Log:
Added resource allocation requests for application master and tasks.
Modified:
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1178735&r1=1178734&r2=1178735&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Tue Oct 4 09:13:56 2011
@@ -18,17 +18,31 @@
package org.apache.hama.bsp;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hama.HamaConfiguration;
+import org.apache.mina.util.AvailablePortFinder;
/**
* BSPApplicationMaster is an application master for Apache Hamas BSP Engine.
@@ -42,9 +56,19 @@ public class BSPApplicationMaster {
private Configuration jobConf;
private FileSystem fs;
-
+ private Clock clock;
private YarnRPC yarnRPC;
+ private AMRMProtocol amrmRPC;
+
private ApplicationId appId;
+ private ApplicationAttemptId appAttemptId;
+ private String applicationName;
+ private String userName;
+ private long startTime;
+
+ // RPC info where the AM receive client side requests
+ private String hostname;
+ private int port;
private BSPApplicationMaster(String[] args) throws IOException {
if (args.length != 1) {
@@ -53,9 +77,79 @@ public class BSPApplicationMaster {
localConf = new YarnConfiguration();
jobConf = getSubmitConfiguration(args[0]);
+
+ applicationName = jobConf.get("bsp.job.name",
+ "<no bsp job name defined>");
+ userName = jobConf.get("user.name", "<no user defined>");
+
appId = Records.newRecord(ApplicationId.class);
+ appAttemptId = getApplicationAttemptId();
+
yarnRPC = YarnRPC.create(localConf);
fs = FileSystem.get(localConf);
+ clock = new SystemClock();
+ startTime = clock.getTime();
+
+ // TODO this is not localhost, is it?
+ // TODO this address of the client rpc server
+ hostname = InetAddress.getLocalHost().getHostName();
+ port = getFreePort();
+
+ amrmRPC = registerWithResourceManager(localConf, appAttemptId,
+ hostname, port, null);
+ }
+
+ private int getFreePort() {
+ int startPort = 14000;
+ while (!AvailablePortFinder.available(startPort)) {
+ startPort++;
+ LOG.debug("Testing port for availability: " + startPort);
+ }
+ return startPort;
+ }
+
+ private AMRMProtocol registerWithResourceManager(Configuration yarnConf,
+ ApplicationAttemptId appAttemptID, String appMasterHostName,
+ int appMasterRpcPort, String appMasterTrackingUrl)
+ throws YarnRemoteException {
+ // Connect to the Scheduler of the ResourceManager.
+ InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+ LOG.info("Connecting to ResourceManager at " + rmAddress);
+ AMRMProtocol resourceManager = (AMRMProtocol) yarnRPC.getProxy(
+ AMRMProtocol.class, rmAddress, yarnConf);
+
+ RegisterApplicationMasterRequest appMasterRequest = Records
+ .newRecord(RegisterApplicationMasterRequest.class);
+ appMasterRequest.setApplicationAttemptId(appAttemptID);
+ appMasterRequest.setHost(appMasterHostName);
+ appMasterRequest.setRpcPort(appMasterRpcPort);
+ // TODO tracking URL
+ // appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
+
+ RegisterApplicationMasterResponse response = resourceManager
+ .registerApplicationMaster(appMasterRequest);
+ LOG.debug("ApplicationMaster has maximum resource capability of: "
+ + response.getMaximumResourceCapability().getMemory());
+ return resourceManager;
+ }
+
+ /**
+ * Gets the application attempt ID from the environment. This should be set
+ * by YARN when the container has been launched.
+ *
+ * @return a new ApplicationAttemptId which is unique and identifies this
+ * task.
+ */
+ private ApplicationAttemptId getApplicationAttemptId() throws IOException {
+ Map<String, String> envs = System.getenv();
+ if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) {
+ throw new IllegalArgumentException(
+ "ApplicationAttemptId not set in the environment");
+ }
+ return ConverterUtils.toApplicationAttemptId(envs
+ .get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV));
}
private Configuration getSubmitConfiguration(String path) {
Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java?rev=1178735&r1=1178734&r2=1178735&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java Tue Oct 4 09:13:56 2011
@@ -20,9 +20,41 @@ package org.apache.hama.bsp;
import java.util.Map;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.Records;
public class BSPJobImpl implements BSPJob {
+ private ResourceRequest requestTasks(int numBSPTasks, int memoryInMb,
+ int priority) {
+ // Resource Request
+ ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
+
+ // setup requirements for hosts
+ // whether a particular rack/host is needed
+ // useful for applications that are sensitive
+ // to data locality
+ rsrcRequest.setHostName("*");
+
+ // set the priority for the request
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(priority);
+ rsrcRequest.setPriority(pri);
+
+ // Set up resource type requirements
+ // For now, only memory is supported so we set memory requirements
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(memoryInMb);
+ rsrcRequest.setCapability(capability);
+
+ // set no. of containers needed
+ // matching the specifications
+ rsrcRequest.setNumContainers(numBSPTasks);
+ return rsrcRequest;
+ }
+
@Override
public BSPJobID getID() {
// TODO Auto-generated method stub