You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/11/20 22:13:04 UTC
[49/52] [abbrv] git commit: In the middle of changes,
need to pull in latest helix
In the middle of changes, need to pull in latest helix
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/342d0e7f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/342d0e7f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/342d0e7f
Branch: refs/heads/helix-yarn
Commit: 342d0e7f62859c46b980b3309862767d0490f8e3
Parents: 0b2da84
Author: Kishore Gopalakrishna <kg...@lm-lsv39442.linkedin.biz>
Authored: Tue Nov 19 11:26:16 2013 -0800
Committer: Kishore Gopalakrishna <kg...@lm-lsv39442.linkedin.biz>
Committed: Tue Nov 19 11:26:16 2013 -0800
----------------------------------------------------------------------
pom.xml | 5 +
recipes/auto-scale/pom.xml | 2 +-
.../impl/yarn/YarnContainerProviderProcess.java | 277 ++++++++++---------
.../autoscale/provider/ProviderRebalancer.java | 3 +-
4 files changed, 159 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 6840410..b21bee4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -169,6 +169,11 @@ under the License.
<name>SnakeYAML repository</name>
<url>http://oss.sonatype.org/content/groups/public/</url>
</repository>
+ <repository>
+ <id>jboss-fs-public</id>
+ <name>JBoss FuseSource repository</name>
+ <url>http://repository.jboss.org/nexus/content/groups/fs-public/</url>
+ </repository>
</repositories>
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/recipes/auto-scale/pom.xml
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/pom.xml b/recipes/auto-scale/pom.xml
index 95331f4..0482d3d 100644
--- a/recipes/auto-scale/pom.xml
+++ b/recipes/auto-scale/pom.xml
@@ -13,7 +13,7 @@
<name>Apache Helix :: Recipes :: Auto-Scale</name>
<properties>
- <hadoop.version>0.23.9</hadoop.version>
+ <hadoop.version>2.2.0</hadoop.version>
<ut.groups>unit</ut.groups>
<it.groups>local, shell</it.groups>
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java
index 20a8b92..37f942d 100644
--- a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/impl/yarn/YarnContainerProviderProcess.java
@@ -1,7 +1,6 @@
package org.apache.helix.autoscale.impl.yarn;
import java.io.File;
-import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -10,22 +9,17 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ClientRMProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
import org.apache.helix.autoscale.Service;
import org.apache.log4j.Logger;
@@ -38,121 +32,154 @@ import com.google.common.base.Preconditions;
*/
public class YarnContainerProviderProcess implements Service {
- static final Logger log = Logger.getLogger(YarnContainerProviderProcess.class);
-
- static String YARN_MASTER_COMMAND = "/bin/sh %s 1>%s/stdout 2>%s/stderr";
-
- Configuration conf;
- YarnRPC rpc;
- ClientRMProtocol rmClient;
- ApplicationId appId;
- File propertiesFile;
-
- YarnContainerProviderProperties properties;
-
- @Override
- public void configure(Properties properties) throws Exception {
- configure(YarnUtils.createContainerProviderProperties(properties));
- }
-
- private void configure(YarnContainerProviderProperties properties) {
- this.conf = new YarnConfiguration();
- this.conf.set(YarnConfiguration.RM_ADDRESS, properties.getResourceManager());
- this.conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, properties.getScheduler());
- this.conf.set(FileSystem.FS_DEFAULT_NAME_KEY, properties.getHdfs());
-
- this.rpc = YarnRPC.create(conf);
-
- this.properties = properties;
- }
-
- @Override
- public void start() throws Exception {
- Preconditions.checkNotNull(properties);
- Preconditions.checkState(properties.isValid());
-
- connect();
-
- String command = String.format(YARN_MASTER_COMMAND, YarnUtils.YARN_MASTER_PATH, ApplicationConstants.LOG_DIR_EXPANSION_VAR,
- ApplicationConstants.LOG_DIR_EXPANSION_VAR);
-
- log.info(String.format("Starting application '%s' provider '%s' (masterCommand='%s')", properties.getYarnData(), properties.getName(), command));
-
- log.debug(String.format("Running master command \"%s\"", command));
-
- // app id
- GetNewApplicationRequest appRequest = Records.newRecord(GetNewApplicationRequest.class);
- GetNewApplicationResponse appResponse = rmClient.getNewApplication(appRequest);
-
- this.appId = appResponse.getApplicationId();
-
- log.info(String.format("Acquired app id '%s' for '%s'", appId.toString(), properties.getName()));
-
- // command
- ContainerLaunchContext launchContext = Records.newRecord(ContainerLaunchContext.class);
- launchContext.setCommands(Collections.singletonList(command));
-
- // resource limit
- Resource resource = Records.newRecord(Resource.class);
- resource.setMemory(256); // TODO make dynamic
- launchContext.setResource(resource);
-
- // environment
- Map<String, String> env = new HashMap<String, String>();
- launchContext.setEnvironment(env);
-
- // configuration
- propertiesFile = YarnUtils.writePropertiesToTemp(properties);
-
- // HDFS
- final String namespace = appId.toString();
- final Path masterArchive = YarnUtils.copyToHdfs(YarnUtils.YARN_MASTER_ARCHIVE_PATH, YarnUtils.YARN_MASTER_STAGING, namespace, conf);
- final Path masterProperties = YarnUtils.copyToHdfs(propertiesFile.getCanonicalPath(), YarnUtils.YARN_MASTER_PROPERTIES, namespace, conf);
- final Path containerArchive = YarnUtils.copyToHdfs(YarnUtils.YARN_CONTAINER_ARCHIVE_PATH, YarnUtils.YARN_CONTAINER_STAGING, namespace, conf);
-
- // local resources
- Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
- localResources.put(YarnUtils.YARN_MASTER_DESTINATION, YarnUtils.createHdfsResource(masterArchive, LocalResourceType.ARCHIVE, conf));
- localResources.put(YarnUtils.YARN_MASTER_PROPERTIES, YarnUtils.createHdfsResource(masterProperties, LocalResourceType.FILE, conf));
- localResources.put(YarnUtils.YARN_CONTAINER_STAGING, YarnUtils.createHdfsResource(containerArchive, LocalResourceType.FILE, conf));
-
- launchContext.setLocalResources(localResources);
-
- // user
- launchContext.setUser(properties.getUser());
-
- // app submission
- ApplicationSubmissionContext subContext = Records.newRecord(ApplicationSubmissionContext.class);
- subContext.setApplicationId(appId);
- subContext.setApplicationName(properties.getName());
- subContext.setAMContainerSpec(launchContext);
-
- SubmitApplicationRequest subRequest = Records.newRecord(SubmitApplicationRequest.class);
- subRequest.setApplicationSubmissionContext(subContext);
-
- log.info(String.format("Starting app id '%s'", appId.toString()));
-
- rmClient.submitApplication(subRequest);
-
- }
-
- @Override
- public void stop() throws YarnRemoteException {
- log.info(String.format("Stopping app id '%s'", appId.toString()));
- KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class);
- killRequest.setApplicationId(appId);
-
- rmClient.forceKillApplication(killRequest);
-
- try { YarnUtils.destroyHdfsNamespace(appId.toString(), conf); } catch(Exception ignore) {}
-
- propertiesFile.delete();
- }
-
- void connect() {
- YarnConfiguration yarnConf = new YarnConfiguration(conf);
- InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(YarnConfiguration.RM_ADDRESS, YarnConfiguration.DEFAULT_RM_ADDRESS));
- log.info("Connecting to ResourceManager at: " + rmAddress);
- this.rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, conf));
- }
+ static final Logger log = Logger
+ .getLogger(YarnContainerProviderProcess.class);
+
+ static String YARN_MASTER_COMMAND = "/bin/sh %s 1>%s/stdout 2>%s/stderr";
+
+ Configuration conf;
+ YarnClient yarnClient;
+ ApplicationId appId;
+ File propertiesFile;
+
+ YarnContainerProviderProperties properties;
+
+ @Override
+ public void configure(Properties properties) throws Exception {
+ configure(YarnUtils.createContainerProviderProperties(properties));
+ }
+
+ private void configure(YarnContainerProviderProperties properties) {
+ this.conf = new YarnConfiguration();
+ this.conf.set(YarnConfiguration.RM_ADDRESS,
+ properties.getResourceManager());
+ this.conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ properties.getScheduler());
+ this.conf.set(FileSystem.FS_DEFAULT_NAME_KEY, properties.getHdfs());
+ this.yarnClient = YarnClient.createYarnClient();
+
+ this.properties = properties;
+ }
+
+ @Override
+ public void start() throws Exception {
+ Preconditions.checkNotNull(properties);
+ Preconditions.checkState(properties.isValid());
+
+ connect();
+
+ String command = String.format(YARN_MASTER_COMMAND,
+ YarnUtils.YARN_MASTER_PATH,
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+
+ log.info(String.format(
+ "Starting application '%s' provider '%s' (masterCommand='%s')",
+ properties.getYarnData(), properties.getName(), command));
+
+ log.debug(String.format("Setting app master command \"%s\"", command));
+
+ YarnClientApplication application;
+ application = yarnClient.createApplication();
+ ApplicationSubmissionContext appContext = application
+ .getApplicationSubmissionContext();
+ // LOG.info("Got new ApplicationId="
+ // + application.getNewApplicationResponse().getApplicationId());
+ // // app id
+ // GetNewApplicationRequest appRequest =
+ // Records.newRecord(GetNewApplicationRequest.class);
+ // GetNewApplicationResponse appResponse =
+ // rmClient.getNewApplication(appRequest);
+
+ this.appId = application.getApplicationSubmissionContext()
+ .getApplicationId();
+
+ log.info(String.format("Acquired app id '%s' for '%s'",
+ appId.toString(), properties.getName()));
+
+ // command
+ ContainerLaunchContext launchContext = Records
+ .newRecord(ContainerLaunchContext.class);
+ launchContext.setCommands(Collections.singletonList(command));
+
+ // environment
+ Map<String, String> env = new HashMap<String, String>();
+ launchContext.setEnvironment(env);
+
+ // configuration
+ propertiesFile = YarnUtils.writePropertiesToTemp(properties);
+
+ // HDFS
+ final String namespace = appId.toString();
+ final Path masterArchive = YarnUtils.copyToHdfs(
+ YarnUtils.YARN_MASTER_ARCHIVE_PATH,
+ YarnUtils.YARN_MASTER_STAGING, namespace, conf);
+ final Path masterProperties = YarnUtils.copyToHdfs(
+ propertiesFile.getCanonicalPath(),
+ YarnUtils.YARN_MASTER_PROPERTIES, namespace, conf);
+ final Path containerArchive = YarnUtils.copyToHdfs(
+ YarnUtils.YARN_CONTAINER_ARCHIVE_PATH,
+ YarnUtils.YARN_CONTAINER_STAGING, namespace, conf);
+
+ // local resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ localResources.put(YarnUtils.YARN_MASTER_DESTINATION, YarnUtils
+ .createHdfsResource(masterArchive, LocalResourceType.ARCHIVE,
+ conf));
+ localResources.put(YarnUtils.YARN_MASTER_PROPERTIES, YarnUtils
+ .createHdfsResource(masterProperties, LocalResourceType.FILE,
+ conf));
+ localResources.put(YarnUtils.YARN_CONTAINER_STAGING, YarnUtils
+ .createHdfsResource(containerArchive, LocalResourceType.FILE,
+ conf));
+
+ launchContext.setLocalResources(localResources);
+
+ // user
+ // appContext.setUser(properties.getUser());
+
+ // app submission
+
+ // resource limit, cannot set resource constraint for starting App
+ // Master
+ Resource resource = Records.newRecord(Resource.class);
+ resource.setMemory(256); // TODO make dynamic
+ appContext.setResource(resource);
+
+ appContext.setApplicationId(appId);
+ appContext.setApplicationName(properties.getName());
+ appContext.setAMContainerSpec(launchContext);
+
+ // SubmitApplicationRequest subRequest =
+ // Records.newRecord(SubmitApplicationRequest.class);
+ // subRequest.setApplicationSubmissionContext(subContext);
+
+ log.info(String.format("Starting app id '%s'", appId.toString()));
+
+ yarnClient.submitApplication(appContext);
+
+ }
+
+ @Override
+ public void stop() {
+ log.info(String.format("Stopping app id '%s'", appId.toString()));
+ KillApplicationRequest killRequest = Records
+ .newRecord(KillApplicationRequest.class);
+ killRequest.setApplicationId(appId);
+ try {
+ yarnClient.killApplication(appId);
+ YarnUtils.destroyHdfsNamespace(appId.toString(), conf);
+ } catch (Exception e) {
+ log.error("Exception while stoppilg app " + appId.toString(), e);
+ }
+
+ propertiesFile.delete();
+ }
+
+ void connect() {
+ log.info("Connecting to ResourceManager at: "
+ + conf.get(YarnConfiguration.RM_ADDRESS));
+ yarnClient.init(conf);
+ yarnClient.start();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/342d0e7f/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
----------------------------------------------------------------------
diff --git a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
index 2b6e428..44e0762 100644
--- a/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
+++ b/recipes/auto-scale/src/main/java/org/apache/helix/autoscale/provider/ProviderRebalancer.java
@@ -61,10 +61,9 @@ public class ProviderRebalancer implements Rebalancer {
}
@Override
- public ResourceAssignment computeResourceMapping(Resource resource, IdealState idealState, CurrentStateOutput currentStateOutput,
+ public ResourceAssignment computeResourceMapping(String resourceName, IdealState idealState, CurrentStateOutput currentStateOutput,
ClusterDataCache clusterData) {
- final String resourceName = resource.getResourceName();
final String containerType = resourceName;
final SortedSet<String> allContainers = Sets.newTreeSet(new IndexedNameComparator());