You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/02/24 17:53:36 UTC
git commit: Fixing app status report generator
Repository: helix
Updated Branches:
refs/heads/helix-provisioning 8992aa5a7 -> c072aca47
Fixing app status report generator
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c072aca4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c072aca4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c072aca4
Branch: refs/heads/helix-provisioning
Commit: c072aca47327c85a811c886e7782a6bd51c8380f
Parents: 8992aa5
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Mon Feb 24 08:53:12 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Mon Feb 24 08:53:12 2014 -0800
----------------------------------------------------------------------
.../helix/api/accessor/ClusterAccessor.java | 4 +
.../apache/helix/api/config/ResourceConfig.java | 14 ++++
.../helix/provisioning/yarn/AppLauncher.java | 69 +++++++++++-----
.../yarn/AppStatusReportGenerator.java | 27 +++---
.../java/tools/UpdateProvisionerConfig.java | 87 ++++++++++++++++++++
recipes/helloworld-provisioning-yarn/run.sh | 10 +--
.../main/resources/hello_world_app_spec.yaml | 4 +-
7 files changed, 177 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index cacdf6c..e653338 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -728,10 +728,14 @@ public class ClusterAccessor {
BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
if (baseAccessor != null) {
boolean[] existsResults = baseAccessor.exists(paths, 0);
+ int ind =0;
for (boolean exists : existsResults) {
+
if (!exists) {
+ LOG.warn("Path does not exist:"+ paths.get(ind));
return false;
}
+ ind = ind + 1;
}
}
return true;
http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
index 0b2df4a..5443236 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
@@ -180,6 +180,7 @@ public class ResourceConfig {
private enum Fields {
TYPE,
REBALANCER_CONFIG,
+ PROVISIONER_CONFIG,
USER_CONFIG,
BUCKET_SIZE,
BATCH_MESSAGE_MODE
@@ -220,6 +221,16 @@ public class ResourceConfig {
}
/**
+ * Set the provisioner configuration
+ * @param config properties of interest for provisioning
+ * @return Delta
+ */
+ public Delta setProvisionerConfig(ProvisionerConfig config) {
+ _builder.provisionerConfig(config);
+ _updateFields.add(Fields.PROVISIONER_CONFIG);
+ return this;
+ }
+ /**
* Set the user configuration
* @param userConfig user-specified properties
* @return Delta
@@ -272,6 +283,9 @@ public class ResourceConfig {
case REBALANCER_CONFIG:
builder.rebalancerConfig(deltaConfig.getRebalancerConfig());
break;
+ case PROVISIONER_CONFIG:
+ builder.provisionerConfig(deltaConfig.getProvisionerConfig());
+ break;
case USER_CONFIG:
builder.userConfig(deltaConfig.getUserConfig());
break;
http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index d2e901f..4b77105 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@ -13,6 +13,10 @@ import java.util.List;
import java.util.Map;
import java.util.Vector;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
import org.apache.commons.compress.archivers.ArchiveStreamFactory;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
@@ -117,7 +121,7 @@ public class AppLauncher {
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
- LOG.info("Copy App archive file from local filesystem and add to local environment");
+ LOG.info("Copy Application archive file from local filesystem and add to local environment");
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
FileSystem fs = FileSystem.get(_conf);
@@ -185,10 +189,9 @@ public class AppLauncher {
classPathEnv.append(':');
classPathEnv.append(System.getProperty("java.class.path"));
}
- LOG.info("\n\n Setting the classpath for AppMaster:\n\n" + classPathEnv.toString());
+ LOG.info("\n\n Setting the classpath to launch AppMaster:\n\n" );
// Set the env variables to be setup in the env where the application master will be run
Map<String, String> env = new HashMap<String, String>(_appMasterConfig.getEnv());
- LOG.info("Set the environment for the application master" + env);
env.put("CLASSPATH", classPathEnv.toString());
amContainer.setEnvironment(env);
@@ -197,7 +200,7 @@ public class AppLauncher {
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
// Set java executable command
- LOG.info("Setting up app master command");
+ LOG.info("Setting up app master launch command");
vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
int amMemory = 1024;
// Set Xmx based on am memory size
@@ -265,13 +268,12 @@ public class AppLauncher {
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);
- // Submit the application to the applications manager
- // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
- // Ignore the response as either a valid response object is returned on success
- // or an exception thrown to denote some form of a failure
- LOG.info("Submitting application to ASM");
- yarnClient.submitApplication(appContext);
+ LOG.info("Submitting application to YARN Resource Manager");
+
+ ApplicationId applicationId = yarnClient.submitApplication(appContext);
+
+ LOG.info("Submitted application with applicationId:" + applicationId );
return true;
}
@@ -355,6 +357,8 @@ public class AppLauncher {
*/
public boolean waitUntilDone() {
String prevReport = "";
+ HelixConnection connection = null;
+
while (true) {
try {
// Get application report for the appId we are interested in
@@ -381,15 +385,27 @@ public class AppLauncher {
return false;
}
if (YarnApplicationState.RUNNING == state) {
- HelixConnection connection = new ZkHelixConnection(report.getHost() + ":2181");
- try{
- connection.connect();
- }catch(Exception e){
- LOG.warn("AppMaster started but not yet initialized");
+ if (connection == null) {
+ String hostName = null;
+ int ind = report.getHost().indexOf('/');
+ if (ind > -1) {
+ hostName = report.getHost().substring(ind + 1);
+ } else {
+ hostName = report.getHost();
+ }
+ connection = new ZkHelixConnection(hostName + ":2181");
+
+ try {
+ connection.connect();
+ } catch (Exception e) {
+ LOG.warn("AppMaster started but not yet initialized");
+ connection = null;
+ }
}
- if(connection.isConnected()){
+ if (connection.isConnected()) {
AppStatusReportGenerator generator = new AppStatusReportGenerator();
- String generateReport = generator.generateReport(connection, ClusterId.from(_applicationSpec.getAppName()));
+ ClusterId clusterId = ClusterId.from(_applicationSpec.getAppName());
+ String generateReport = generator.generateReport(connection, clusterId);
LOG.info(generateReport);
}
}
@@ -430,13 +446,24 @@ public class AppLauncher {
}
/**
- * will take the input file and AppSpecFactory class name as input
- * @param args
+ * Launches the application on a YARN cluster. Once launched, it will display (periodically) the status of the containers in the application.
+ * @param args app_spec_provider and app_config_spec
* @throws Exception
*/
public static void main(String[] args) throws Exception {
- ApplicationSpecFactory applicationSpecFactory = HelixYarnUtil.createInstance(args[0]);
- File yamlConfigFile = new File(args[1]);
+
+ Options opts = new Options();
+ opts.addOption(new Option("app_spec_provider",true, "Application Spec Factory Class that will parse the app_config_spec file"));
+ opts.addOption(new Option("app_config_spec",true, "YAML config file that provides the app specifications"));
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ String appSpecFactoryClass = cliParser.getOptionValue("app_spec_provider");
+ String yamlConfigFileName = cliParser.getOptionValue("app_config_spec");
+
+ ApplicationSpecFactory applicationSpecFactory = HelixYarnUtil.createInstance(appSpecFactoryClass);
+ File yamlConfigFile = new File(yamlConfigFileName);
+ if(!yamlConfigFile.exists()){
+ throw new IllegalArgumentException("YAML app_config_spec file: '"+ yamlConfigFileName + "' does not exist");
+ }
final AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
launcher.launch();
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
index 0443f8a..b083ac9 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
@@ -17,7 +17,6 @@ import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.provisioner.ContainerId;
import org.apache.helix.controller.provisioner.ContainerState;
import org.apache.helix.manager.zk.ZkHelixConnection;
-import org.apache.helix.model.ExternalView;
public class AppStatusReportGenerator {
static String TAB = "\t";
@@ -39,26 +38,30 @@ public class AppStatusReportGenerator {
resource.getExternalView().getStateMap(PartitionId.from(resourceId.stringify() + "_0"));
builder.append(TAB).append("CONTAINER_NAME").append(TAB).append(TAB)
- .append("CONTAINER_STATE").append(TAB).append("SERVICE_STATE").append(TAB).append("CONTAINER_ID").append(NEWLINE);
+ .append("CONTAINER_STATE").append(TAB).append("SERVICE_STATE").append(TAB)
+ .append("CONTAINER_ID").append(NEWLINE);
for (Participant participant : participants.values()) {
// need a better check
if (!participant.getId().stringify().startsWith(resource.getId().stringify())) {
continue;
}
ContainerConfig containerConfig = participant.getContainerConfig();
- ContainerState containerState =ContainerState.UNDEFINED;
+ ContainerState containerState = ContainerState.UNDEFINED;
ContainerId containerId = ContainerId.from("N/A");
if (containerConfig != null) {
containerId = containerConfig.getId();
containerState = containerConfig.getState();
}
- State participantState = serviceStateMap.get(participant.getId());
+ State participantState = null;
+ if (serviceStateMap != null) {
+ participantState = serviceStateMap.get(participant.getId());
+ }
if (participantState == null) {
participantState = State.from("UNKNOWN");
}
- builder.append(TAB).append(participant.getId()).append(TAB)
- .append(containerState).append(TAB).append(participantState).append(TAB).append(TAB).append(containerId);
+ builder.append(TAB).append(participant.getId()).append(TAB).append(containerState)
+ .append(TAB).append(participantState).append(TAB).append(TAB).append(containerId);
builder.append(NEWLINE);
}
@@ -67,13 +70,17 @@ public class AppStatusReportGenerator {
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws InterruptedException {
AppStatusReportGenerator generator = new AppStatusReportGenerator();
ZkHelixConnection connection = new ZkHelixConnection("localhost:2181");
connection.connect();
- String generateReport = generator.generateReport(connection, ClusterId.from("testApp"));
- System.out.println(generateReport);
- connection.disconnect();
+ while (true) {
+ String generateReport = generator.generateReport(connection, ClusterId.from("testApp1"));
+ System.out.println(generateReport);
+ Thread.sleep(10000);
+ connection.createClusterManagementTool().addCluster("testApp1");
+ }
+ // connection.disconnect();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/helix-provisioning/src/main/java/tools/UpdateProvisionerConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/tools/UpdateProvisionerConfig.java b/helix-provisioning/src/main/java/tools/UpdateProvisionerConfig.java
new file mode 100644
index 0000000..89ee1c5
--- /dev/null
+++ b/helix-provisioning/src/main/java/tools/UpdateProvisionerConfig.java
@@ -0,0 +1,87 @@
+package tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.provisioning.yarn.YarnProvisionerConfig;
+import org.apache.log4j.Logger;
+/**
+ * Update the provisioner config
+ */
+public class UpdateProvisionerConfig {
+ private static Logger LOG = Logger.getLogger(UpdateProvisionerConfig.class);
+ private static String updateContainerCount = "updateContainerCount";
+ private HelixConnection _connection;
+
+ public UpdateProvisionerConfig(String zkAddress) {
+ _connection = new ZkHelixConnection(zkAddress);
+ _connection.connect();
+ }
+
+ public void setNumContainers(String appName, String serviceName, int numContainers) {
+ ResourceId resourceId = ResourceId.from(serviceName);
+
+ ResourceAccessor resourceAccessor = _connection.createResourceAccessor(ClusterId.from(appName));
+ Resource resource = resourceAccessor.readResource(resourceId);
+ LOG.info("Current provisioner config:"+ resource.getProvisionerConfig());
+
+ ResourceConfig.Delta delta = new ResourceConfig.Delta(resourceId);
+ YarnProvisionerConfig config = new YarnProvisionerConfig(resourceId);
+ config.setNumContainers(numContainers);
+ delta.setProvisionerConfig(config);
+ ResourceConfig updatedResourceConfig = resourceAccessor.updateResource(resourceId, delta);
+ LOG.info("Update provisioner config:"+ updatedResourceConfig.getProvisionerConfig());
+
+ }
+
+ @SuppressWarnings("static-access")
+ public static void main(String[] args) throws ParseException {
+ Option zkServerOption =
+ OptionBuilder.withLongOpt("zookeeperAddress").withDescription("Provide zookeeper address")
+ .create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("zookeeperAddress(Required)");
+
+ OptionGroup group = new OptionGroup();
+ group.setRequired(true);
+
+ // update container count per service
+ Option updateContainerCountOption =
+ OptionBuilder.withLongOpt(updateContainerCount)
+ .withDescription("set the number of containers per service").create();
+ updateContainerCountOption.setArgs(3);
+ updateContainerCountOption.setRequired(false);
+ updateContainerCountOption.setArgName("appName serviceName numContainers");
+
+ group.addOption(updateContainerCountOption);
+
+ Options options = new Options();
+ options.addOption(zkServerOption);
+ options.addOptionGroup(group);
+ CommandLine cliParser = new GnuParser().parse(options, args);
+
+ String zkAddress = cliParser.getOptionValue("zookeeperAddress");
+ UpdateProvisionerConfig updater = new UpdateProvisionerConfig(zkAddress);
+
+ if (cliParser.hasOption(updateContainerCount)) {
+ String appName = cliParser.getOptionValues(updateContainerCount)[0];
+ String serviceName = cliParser.getOptionValues(updateContainerCount)[1];
+ int numContainers = Integer.parseInt(
+ cliParser.getOptionValues(updateContainerCount)[2]);
+ updater.setNumContainers(appName, serviceName, numContainers);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/recipes/helloworld-provisioning-yarn/run.sh
----------------------------------------------------------------------
diff --git a/recipes/helloworld-provisioning-yarn/run.sh b/recipes/helloworld-provisioning-yarn/run.sh
index 51d4c35..07448bb 100755
--- a/recipes/helloworld-provisioning-yarn/run.sh
+++ b/recipes/helloworld-provisioning-yarn/run.sh
@@ -1,6 +1,6 @@
-cd ../../../../
-mvn clean install -DskipTests
-cd recipes/provisioning/yarn/helloworld/
+#cd ../../
+#mvn clean install -DskipTests
+#cd recipes/helloworld-provisioning-yarn
mvn clean package -DskipTests
-chmod +x target/helloworld-pkg/bin/app-launcher.sh
-target/helloworld-pkg/bin/app-launcher.sh org.apache.helix.provisioning.yarn.example.HelloWordAppSpecFactory /Users/kgopalak/Documents/projects/incubator-helix/recipes/provisioning/yarn/helloworld/src/main/resources/hello_world_app_spec.yaml
+chmod +x target/helloworld-provisioning-yarn-pkg/bin/app-launcher.sh
+target/helloworld-provisioning-yarn/pkg/bin/app-launcher.sh org.apache.helix.provisioning.yarn.example.HelloWordAppSpecFactory /Users/kgopalak/Documents/projects/incubator-helix/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml
http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml
----------------------------------------------------------------------
diff --git a/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml b/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml
index d8d1dd2..baaddb5 100644
--- a/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml
+++ b/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml
@@ -3,7 +3,7 @@ appConfig:
config: {
k1: v1
}
-appMasterPackageUri: 'file:///Users/kgopalak/Documents/projects/incubator-helix/recipes/provisioning/yarn/helloworld/target/helloworld-0.7.1-incubating-SNAPSHOT-pkg.tar'
+appMasterPackageUri: 'file:///Users/kgopalak/Documents/projects/incubator-helix/recipes/helloworld-provisioning-yarn/target/helloworld-provisioning-yarn-0.7.1-incubating-SNAPSHOT-pkg.tar'
appName: testApp
serviceConfigMap:
HelloWorld: {
@@ -14,7 +14,7 @@ serviceMainClassMap: {
HelloWorld: org.apache.helix.provisioning.yarn.example.HelloWorldService
}
servicePackageURIMap: {
- HelloWorld: 'file:///Users/kgopalak/Documents/projects/incubator-helix/recipes/provisioning/yarn/helloworld/target/helloworld-0.7.1-incubating-SNAPSHOT-pkg.tar'
+ HelloWorld: 'file:///Users/kgopalak/Documents/projects/incubator-helix/recipes/helloworld-provisioning-yarn/target/helloworld-provisioning-yarn-0.7.1-incubating-SNAPSHOT-pkg.tar'
}
services: [
HelloWorld]