You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/11/20 22:13:04 UTC

[49/52] [abbrv] git commit: In the middle of changes, need to pull in latest helix

In the middle of changes, need to pull in latest helix


Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/342d0e7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/342d0e7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/342d0e7f

Branch: refs/heads/helix-yarn
Commit: 342d0e7f62859c46b980b3309862767d0490f8e3
Parents: 0b2da84
Author: Kishore Gopalakrishna <kg...@lm-lsv39442.linkedin.biz>
Authored: Tue Nov 19 11:26:16 2013 -0800
Committer: Kishore Gopalakrishna <kg...@lm-lsv39442.linkedin.biz>
Committed: Tue Nov 19 11:26:16 2013 -0800

----------------------------------------------------------------------
 pom.xml                                         |   5 +
 recipes/auto-scale/pom.xml                      |   2 +-
 .../impl/yarn/YarnContainerProviderProcess.java | 277 ++++++++++---------
 .../autoscale/provider/ProviderRebalancer.java  |   3 +-
 4 files changed, 159 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6840410..b21bee4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -169,6 +169,11 @@ under the License.
       <name>SnakeYAML repository</name>
       <url>http://oss.sonatype.org/content/groups/public/</url>
     </repository>
+    <repository>
+      <id>jboss-fs-public</id>
+      <name>JBoss FuseSource repository</name>
+      <url>http://repository.jboss.org/nexus/content/groups/fs-public/</url>
+    </repository>
   </repositories>
 
  

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/recipes/auto-scale/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/pom.xml b/recipes/auto-scale/pom.xml
index 95331f4..0482d3d 100644
--- a/recipes/auto-scale/pom.xml
+++ b/recipes/auto-scale/pom.xml
@@ -13,7 +13,7 @@
   <name>Apache Helix :: Recipes :: Auto-Scale</name>
   
   <properties>
-    <hadoop.version>0.23.9</hadoop.version>
+    <hadoop.version>2.2.0</hadoop.version>
 
     <ut.groups>unit</ut.groups>
     <it.groups>local, shell</it.groups>

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java
index 20a8b92..37f942d 100644
--- a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java
@@ -1,7 +1,6 @@
 package org.apache.helix.autoscale.impl.yarn;
 
 import java.io.File;
-import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -10,22 +9,17 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.helix.autoscale.Service;
 import org.apache.log4j.Logger;
@@ -38,121 +32,154 @@ import com.google.common.base.Preconditions;
  */
 public class YarnContainerProviderProcess implements Service {
 
-    static final Logger             log                 = Logger.getLogger(YarnContainerProviderProcess.class);
-
-    static String                   YARN_MASTER_COMMAND = "/bin/sh %s 1>%s/stdout 2>%s/stderr";
-
-    Configuration                   conf;
-    YarnRPC                         rpc;
-    ClientRMProtocol                rmClient;
-    ApplicationId                   appId;
-    File                            propertiesFile;
-
-    YarnContainerProviderProperties properties;
-
-    @Override
-    public void configure(Properties properties) throws Exception {
-        configure(YarnUtils.createContainerProviderProperties(properties));
-    }
-
-    private void configure(YarnContainerProviderProperties properties) {
-        this.conf = new YarnConfiguration();
-        this.conf.set(YarnConfiguration.RM_ADDRESS, properties.getResourceManager());
-        this.conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, properties.getScheduler());
-        this.conf.set(FileSystem.FS_DEFAULT_NAME_KEY, properties.getHdfs());
-
-        this.rpc = YarnRPC.create(conf);
-
-        this.properties = properties;
-    }
-
-    @Override
-    public void start() throws Exception {
-        Preconditions.checkNotNull(properties);
-        Preconditions.checkState(properties.isValid());
-
-        connect();
-
-        String command = String.format(YARN_MASTER_COMMAND, YarnUtils.YARN_MASTER_PATH, ApplicationConstants.LOG_DIR_EXPANSION_VAR,
-                ApplicationConstants.LOG_DIR_EXPANSION_VAR);
-
-        log.info(String.format("Starting application '%s' provider '%s' (masterCommand='%s')", properties.getYarnData(), properties.getName(), command));
-
-        log.debug(String.format("Running master command \"%s\"", command));
-
-        // app id
-        GetNewApplicationRequest appRequest = Records.newRecord(GetNewApplicationRequest.class);
-        GetNewApplicationResponse appResponse = rmClient.getNewApplication(appRequest);
-
-        this.appId = appResponse.getApplicationId();
-
-        log.info(String.format("Acquired app id '%s' for '%s'", appId.toString(), properties.getName()));
-
-        // command
-        ContainerLaunchContext launchContext = Records.newRecord(ContainerLaunchContext.class);
-        launchContext.setCommands(Collections.singletonList(command));
-
-        // resource limit
-        Resource resource = Records.newRecord(Resource.class);
-        resource.setMemory(256); // TODO make dynamic
-        launchContext.setResource(resource);
-
-        // environment
-        Map<String, String> env = new HashMap<String, String>();
-        launchContext.setEnvironment(env);
-
-        // configuration
-        propertiesFile = YarnUtils.writePropertiesToTemp(properties);
-
-        // HDFS
-        final String namespace = appId.toString();
-        final Path masterArchive = YarnUtils.copyToHdfs(YarnUtils.YARN_MASTER_ARCHIVE_PATH, YarnUtils.YARN_MASTER_STAGING, namespace, conf);
-        final Path masterProperties = YarnUtils.copyToHdfs(propertiesFile.getCanonicalPath(), YarnUtils.YARN_MASTER_PROPERTIES, namespace, conf);
-        final Path containerArchive = YarnUtils.copyToHdfs(YarnUtils.YARN_CONTAINER_ARCHIVE_PATH, YarnUtils.YARN_CONTAINER_STAGING, namespace, conf);
-
-        // local resources
-        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-        localResources.put(YarnUtils.YARN_MASTER_DESTINATION, YarnUtils.createHdfsResource(masterArchive, LocalResourceType.ARCHIVE, conf));
-        localResources.put(YarnUtils.YARN_MASTER_PROPERTIES, YarnUtils.createHdfsResource(masterProperties, LocalResourceType.FILE, conf));
-        localResources.put(YarnUtils.YARN_CONTAINER_STAGING, YarnUtils.createHdfsResource(containerArchive, LocalResourceType.FILE, conf));
-
-        launchContext.setLocalResources(localResources);
-
-        // user
-        launchContext.setUser(properties.getUser());
-
-        // app submission
-        ApplicationSubmissionContext subContext = Records.newRecord(ApplicationSubmissionContext.class);
-        subContext.setApplicationId(appId);
-        subContext.setApplicationName(properties.getName());
-        subContext.setAMContainerSpec(launchContext);
-
-        SubmitApplicationRequest subRequest = Records.newRecord(SubmitApplicationRequest.class);
-        subRequest.setApplicationSubmissionContext(subContext);
-
-        log.info(String.format("Starting app id '%s'", appId.toString()));
-
-        rmClient.submitApplication(subRequest);
-
-    }
-
-    @Override
-    public void stop() throws YarnRemoteException {
-        log.info(String.format("Stopping app id '%s'", appId.toString()));
-        KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class);
-        killRequest.setApplicationId(appId);
-
-        rmClient.forceKillApplication(killRequest);
-
-		try { YarnUtils.destroyHdfsNamespace(appId.toString(), conf); } catch(Exception ignore) {}
-		
-        propertiesFile.delete();
-    }
-
-    void connect() {
-        YarnConfiguration yarnConf = new YarnConfiguration(conf);
-        InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
-        log.info("Connecting to ResourceManager at: " + rmAddress);
-        this.rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, conf));
-    }
+	static final Logger log = Logger
+			.getLogger(YarnContainerProviderProcess.class);
+
+	static String YARN_MASTER_COMMAND = "/bin/sh %s 1>%s/stdout 2>%s/stderr";
+
+	Configuration conf;
+	YarnClient yarnClient;
+	ApplicationId appId;
+	File propertiesFile;
+
+	YarnContainerProviderProperties properties;
+
+	@Override
+	public void configure(Properties properties) throws Exception {
+		configure(YarnUtils.createContainerProviderProperties(properties));
+	}
+
+	private void configure(YarnContainerProviderProperties properties) {
+		this.conf = new YarnConfiguration();
+		this.conf.set(YarnConfiguration.RM_ADDRESS,
+				properties.getResourceManager());
+		this.conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+				properties.getScheduler());
+		this.conf.set(FileSystem.FS_DEFAULT_NAME_KEY, properties.getHdfs());
+		this.yarnClient = YarnClient.createYarnClient();
+
+		this.properties = properties;
+	}
+
+	@Override
+	public void start() throws Exception {
+		Preconditions.checkNotNull(properties);
+		Preconditions.checkState(properties.isValid());
+
+		connect();
+
+		String command = String.format(YARN_MASTER_COMMAND,
+				YarnUtils.YARN_MASTER_PATH,
+				ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+				ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+
+		log.info(String.format(
+				"Starting application '%s' provider '%s' (masterCommand='%s')",
+				properties.getYarnData(), properties.getName(), command));
+
+		log.debug(String.format("Setting app master command \"%s\"", command));
+
+		YarnClientApplication application;
+		application = yarnClient.createApplication();
+		ApplicationSubmissionContext appContext = application
+				.getApplicationSubmissionContext();
+		// LOG.info("Got new ApplicationId="
+		// + application.getNewApplicationResponse().getApplicationId());
+		// // app id
+		// GetNewApplicationRequest appRequest =
+		// Records.newRecord(GetNewApplicationRequest.class);
+		// GetNewApplicationResponse appResponse =
+		// rmClient.getNewApplication(appRequest);
+
+		this.appId = application.getApplicationSubmissionContext()
+				.getApplicationId();
+
+		log.info(String.format("Acquired app id '%s' for '%s'",
+				appId.toString(), properties.getName()));
+
+		// command
+		ContainerLaunchContext launchContext = Records
+				.newRecord(ContainerLaunchContext.class);
+		launchContext.setCommands(Collections.singletonList(command));
+
+		// environment
+		Map<String, String> env = new HashMap<String, String>();
+		launchContext.setEnvironment(env);
+
+		// configuration
+		propertiesFile = YarnUtils.writePropertiesToTemp(properties);
+
+		// HDFS
+		final String namespace = appId.toString();
+		final Path masterArchive = YarnUtils.copyToHdfs(
+				YarnUtils.YARN_MASTER_ARCHIVE_PATH,
+				YarnUtils.YARN_MASTER_STAGING, namespace, conf);
+		final Path masterProperties = YarnUtils.copyToHdfs(
+				propertiesFile.getCanonicalPath(),
+				YarnUtils.YARN_MASTER_PROPERTIES, namespace, conf);
+		final Path containerArchive = YarnUtils.copyToHdfs(
+				YarnUtils.YARN_CONTAINER_ARCHIVE_PATH,
+				YarnUtils.YARN_CONTAINER_STAGING, namespace, conf);
+
+		// local resources
+		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+		localResources.put(YarnUtils.YARN_MASTER_DESTINATION, YarnUtils
+				.createHdfsResource(masterArchive, LocalResourceType.ARCHIVE,
+						conf));
+		localResources.put(YarnUtils.YARN_MASTER_PROPERTIES, YarnUtils
+				.createHdfsResource(masterProperties, LocalResourceType.FILE,
+						conf));
+		localResources.put(YarnUtils.YARN_CONTAINER_STAGING, YarnUtils
+				.createHdfsResource(containerArchive, LocalResourceType.FILE,
+						conf));
+
+		launchContext.setLocalResources(localResources);
+
+		// user
+		// appContext.setUser(properties.getUser());
+
+		// app submission
+
+		// resource limit, cannot set resource constraint for starting App
+		// Master
+		Resource resource = Records.newRecord(Resource.class);
+		resource.setMemory(256); // TODO make dynamic
+		appContext.setResource(resource);
+
+		appContext.setApplicationId(appId);
+		appContext.setApplicationName(properties.getName());
+		appContext.setAMContainerSpec(launchContext);
+
+		// SubmitApplicationRequest subRequest =
+		// Records.newRecord(SubmitApplicationRequest.class);
+		// subRequest.setApplicationSubmissionContext(subContext);
+
+		log.info(String.format("Starting app id '%s'", appId.toString()));
+
+		yarnClient.submitApplication(appContext);
+
+	}
+
+	@Override
+	public void stop() {
+		log.info(String.format("Stopping app id '%s'", appId.toString()));
+		KillApplicationRequest killRequest = Records
+				.newRecord(KillApplicationRequest.class);
+		killRequest.setApplicationId(appId);
+		try {
+			yarnClient.killApplication(appId);
+			YarnUtils.destroyHdfsNamespace(appId.toString(), conf);
+		} catch (Exception e) {
+			log.error("Exception while stoppilg app " + appId.toString(), e);
+		}
+
+		propertiesFile.delete();
+	}
+
+	void connect() {
+		log.info("Connecting to ResourceManager at: "
+				+ conf.get(YarnConfiguration.RM_ADDRESS));
+		yarnClient.init(conf);
+		yarnClient.start();
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
index 2b6e428..44e0762 100644
--- a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
@@ -61,10 +61,9 @@ public class ProviderRebalancer implements Rebalancer {
     }
 
     @Override
-    public ResourceAssignment computeResourceMapping(Resource resource, IdealState idealState, CurrentStateOutput currentStateOutput,
+    public ResourceAssignment computeResourceMapping(String resourceName, IdealState idealState, CurrentStateOutput currentStateOutput,
             ClusterDataCache clusterData) {
 
-        final String resourceName = resource.getResourceName();
         final String containerType = resourceName;
 
         final SortedSet<String> allContainers = Sets.newTreeSet(new IndexedNameComparator());