You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2011/10/04 11:13:56 UTC

svn commit: r1178735 - in /incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp: BSPApplicationMaster.java BSPJobImpl.java

Author: tjungblut
Date: Tue Oct  4 09:13:56 2011
New Revision: 1178735

URL: http://svn.apache.org/viewvc?rev=1178735&view=rev
Log:
Added resource allocation requests for application master and tasks.

Modified:
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
    incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java?rev=1178735&r1=1178734&r2=1178735&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPApplicationMaster.java Tue Oct  4 09:13:56 2011
@@ -18,17 +18,31 @@
 package org.apache.hama.bsp;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hama.HamaConfiguration;
+import org.apache.mina.util.AvailablePortFinder;
 
 /**
  * BSPApplicationMaster is an application master for Apache Hamas BSP Engine.
@@ -42,9 +56,19 @@ public class BSPApplicationMaster {
 	private Configuration jobConf;
 
 	private FileSystem fs;
-
+	private Clock clock;
 	private YarnRPC yarnRPC;
+	private AMRMProtocol amrmRPC;
+
 	private ApplicationId appId;
+	private ApplicationAttemptId appAttemptId;
+	private String applicationName;
+	private String userName;
+	private long startTime;
+
+	// RPC info where the AM receive client side requests
+	private String hostname;
+	private int port;
 
 	private BSPApplicationMaster(String[] args) throws IOException {
 		if (args.length != 1) {
@@ -53,9 +77,79 @@ public class BSPApplicationMaster {
 
 		localConf = new YarnConfiguration();
 		jobConf = getSubmitConfiguration(args[0]);
+
+		applicationName = jobConf.get("bsp.job.name",
+				"<no bsp job name defined>");
+		userName = jobConf.get("user.name", "<no user defined>");
+
 		appId = Records.newRecord(ApplicationId.class);
+		appAttemptId = getApplicationAttemptId();
+
 		yarnRPC = YarnRPC.create(localConf);
 		fs = FileSystem.get(localConf);
+		clock = new SystemClock();
+		startTime = clock.getTime();
+
+		// TODO this is not localhost, is it?
+		// TODO this address of the client rpc server
+		hostname = InetAddress.getLocalHost().getHostName();
+		port = getFreePort();
+
+		amrmRPC = registerWithResourceManager(localConf, appAttemptId,
+				hostname, port, null);
+	}
+
+	private int getFreePort() {
+		int startPort = 14000;
+		while (!AvailablePortFinder.available(startPort)) {
+			startPort++;
+			LOG.debug("Testing port for availability: " + startPort);
+		}
+		return startPort;
+	}
+
+	private AMRMProtocol registerWithResourceManager(Configuration yarnConf,
+			ApplicationAttemptId appAttemptID, String appMasterHostName,
+			int appMasterRpcPort, String appMasterTrackingUrl)
+			throws YarnRemoteException {
+		// Connect to the Scheduler of the ResourceManager.
+		InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+				YarnConfiguration.RM_SCHEDULER_ADDRESS,
+				YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+		LOG.info("Connecting to ResourceManager at " + rmAddress);
+		AMRMProtocol resourceManager = (AMRMProtocol) yarnRPC.getProxy(
+				AMRMProtocol.class, rmAddress, yarnConf);
+
+		RegisterApplicationMasterRequest appMasterRequest = Records
+				.newRecord(RegisterApplicationMasterRequest.class);
+		appMasterRequest.setApplicationAttemptId(appAttemptID);
+		appMasterRequest.setHost(appMasterHostName);
+		appMasterRequest.setRpcPort(appMasterRpcPort);
+		// TODO tracking URL
+		// appMasterRequest.setTrackingUrl(appMasterTrackingUrl);
+
+		RegisterApplicationMasterResponse response = resourceManager
+				.registerApplicationMaster(appMasterRequest);
+		LOG.debug("ApplicationMaster has maximum resource capability of: "
+				+ response.getMaximumResourceCapability().getMemory());
+		return resourceManager;
+	}
+
+	/**
+	 * Gets the application attempt ID from the environment. This should be set
+	 * by YARN when the container has been launched.
+	 * 
+	 * @return a new ApplicationAttemptId which is unique and identifies this
+	 *         task.
+	 */
+	private ApplicationAttemptId getApplicationAttemptId() throws IOException {
+		Map<String, String> envs = System.getenv();
+		if (!envs.containsKey(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV)) {
+			throw new IllegalArgumentException(
+					"ApplicationAttemptId not set in the environment");
+		}
+		return ConverterUtils.toApplicationAttemptId(envs
+				.get(ApplicationConstants.APPLICATION_ATTEMPT_ID_ENV));
 	}
 
 	private Configuration getSubmitConfiguration(String path) {

Modified: incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java?rev=1178735&r1=1178734&r2=1178735&view=diff
==============================================================================
--- incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java (original)
+++ incubator/hama/branches/HamaV2/server/src/main/java/org/apache/hama/bsp/BSPJobImpl.java Tue Oct  4 09:13:56 2011
@@ -20,9 +20,41 @@ package org.apache.hama.bsp;
 import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.util.Records;
 
 public class BSPJobImpl implements BSPJob {
 
+	private ResourceRequest requestTasks(int numBSPTasks, int memoryInMb,
+			int priority) {
+		// Resource Request
+		ResourceRequest rsrcRequest = Records.newRecord(ResourceRequest.class);
+
+		// setup requirements for hosts
+		// whether a particular rack/host is needed
+		// useful for applications that are sensitive
+		// to data locality
+		rsrcRequest.setHostName("*");
+
+		// set the priority for the request
+		Priority pri = Records.newRecord(Priority.class);
+		pri.setPriority(priority);
+		rsrcRequest.setPriority(pri);
+
+		// Set up resource type requirements
+		// For now, only memory is supported so we set memory requirements
+		Resource capability = Records.newRecord(Resource.class);
+		capability.setMemory(memoryInMb);
+		rsrcRequest.setCapability(capability);
+
+		// set no. of containers needed
+		// matching the specifications
+		rsrcRequest.setNumContainers(numBSPTasks);
+		return rsrcRequest;
+	}
+
 	@Override
 	public BSPJobID getID() {
 		// TODO Auto-generated method stub