You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/02/24 17:53:36 UTC

git commit: Fixing app status report generator

Repository: helix
Updated Branches:
  refs/heads/helix-provisioning 8992aa5a7 -> c072aca47


Fixing app status report generator


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c072aca4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c072aca4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c072aca4

Branch: refs/heads/helix-provisioning
Commit: c072aca47327c85a811c886e7782a6bd51c8380f
Parents: 8992aa5
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Mon Feb 24 08:53:12 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Mon Feb 24 08:53:12 2014 -0800

----------------------------------------------------------------------
 .../helix/api/accessor/ClusterAccessor.java     |  4 +
 .../apache/helix/api/config/ResourceConfig.java | 14 ++++
 .../helix/provisioning/yarn/AppLauncher.java    | 69 +++++++++++-----
 .../yarn/AppStatusReportGenerator.java          | 27 +++---
 .../java/tools/UpdateProvisionerConfig.java     | 87 ++++++++++++++++++++
 recipes/helloworld-provisioning-yarn/run.sh     | 10 +--
 .../main/resources/hello_world_app_spec.yaml    |  4 +-
 7 files changed, 177 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index cacdf6c..e653338 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -728,10 +728,14 @@ public class ClusterAccessor {
     BaseDataAccessor<?> baseAccessor = _accessor.getBaseDataAccessor();
     if (baseAccessor != null) {
       boolean[] existsResults = baseAccessor.exists(paths, 0);
+      int ind =0;
       for (boolean exists : existsResults) {
+        
         if (!exists) {
+          LOG.warn("Path does not exist:"+ paths.get(ind));
           return false;
         }
+        ind = ind + 1;
       }
     }
     return true;

http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
index 0b2df4a..5443236 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
@@ -180,6 +180,7 @@ public class ResourceConfig {
     private enum Fields {
       TYPE,
       REBALANCER_CONFIG,
+      PROVISIONER_CONFIG,
       USER_CONFIG,
       BUCKET_SIZE,
       BATCH_MESSAGE_MODE
@@ -220,6 +221,16 @@ public class ResourceConfig {
     }
 
     /**
+     * Set the provisioner configuration
+     * @param config properties of interest for provisioning
+     * @return Delta
+     */
+    public Delta setProvisionerConfig(ProvisionerConfig config) {
+      _builder.provisionerConfig(config);
+      _updateFields.add(Fields.PROVISIONER_CONFIG);
+      return this;
+    }
+    /**
      * Set the user configuration
      * @param userConfig user-specified properties
      * @return Delta
@@ -272,6 +283,9 @@ public class ResourceConfig {
         case REBALANCER_CONFIG:
           builder.rebalancerConfig(deltaConfig.getRebalancerConfig());
           break;
+        case PROVISIONER_CONFIG:
+          builder.provisionerConfig(deltaConfig.getProvisionerConfig());
+          break;
         case USER_CONFIG:
           builder.userConfig(deltaConfig.getUserConfig());
           break;

http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index d2e901f..4b77105 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@ -13,6 +13,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Vector;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
 import org.apache.commons.compress.archivers.ArchiveStreamFactory;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
 import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
@@ -117,7 +121,7 @@ public class AppLauncher {
     // Set up the container launch context for the application master
     ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
 
-    LOG.info("Copy App archive file from local filesystem and add to local environment");
+    LOG.info("Copy Application archive file from local filesystem and add to local environment");
     // Copy the application master jar to the filesystem
     // Create a local resource to point to the destination jar path
     FileSystem fs = FileSystem.get(_conf);
@@ -185,10 +189,9 @@ public class AppLauncher {
       classPathEnv.append(':');
       classPathEnv.append(System.getProperty("java.class.path"));
     }
-    LOG.info("\n\n Setting the classpath for AppMaster:\n\n" + classPathEnv.toString());
+    LOG.info("\n\n Setting the classpath to launch AppMaster:\n\n" );
     // Set the env variables to be setup in the env where the application master will be run
     Map<String, String> env = new HashMap<String, String>(_appMasterConfig.getEnv());
-    LOG.info("Set the environment for the application master" + env);
     env.put("CLASSPATH", classPathEnv.toString());
 
     amContainer.setEnvironment(env);
@@ -197,7 +200,7 @@ public class AppLauncher {
     Vector<CharSequence> vargs = new Vector<CharSequence>(30);
 
     // Set java executable command
-    LOG.info("Setting up app master command");
+    LOG.info("Setting up app master launch command");
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
     int amMemory = 1024;
     // Set Xmx based on am memory size
@@ -265,13 +268,12 @@ public class AppLauncher {
     // Set the queue to which this application is to be submitted in the RM
     appContext.setQueue(amQueue);
 
-    // Submit the application to the applications manager
-    // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
-    // Ignore the response as either a valid response object is returned on success
-    // or an exception thrown to denote some form of a failure
-    LOG.info("Submitting application to ASM");
 
-    yarnClient.submitApplication(appContext);
+    LOG.info("Submitting application to YARN Resource Manager");
+
+    ApplicationId applicationId = yarnClient.submitApplication(appContext);
+
+    LOG.info("Submitted application with applicationId:" + applicationId );
 
     return true;
   }
@@ -355,6 +357,8 @@ public class AppLauncher {
    */
   public boolean waitUntilDone() {
     String prevReport = "";
+    HelixConnection connection = null;
+
     while (true) {
       try {
         // Get application report for the appId we are interested in
@@ -381,15 +385,27 @@ public class AppLauncher {
           return false;
         }
         if (YarnApplicationState.RUNNING == state) {
-          HelixConnection connection = new ZkHelixConnection(report.getHost() + ":2181");
-          try{
-            connection.connect();
-          }catch(Exception e){
-            LOG.warn("AppMaster started but not yet initialized");
+          if (connection == null) {
+            String hostName = null;
+            int ind = report.getHost().indexOf('/');
+            if (ind > -1) {
+              hostName = report.getHost().substring(ind + 1);
+            } else {
+              hostName = report.getHost();
+            }
+            connection = new ZkHelixConnection(hostName + ":2181");
+
+            try {
+              connection.connect();
+            } catch (Exception e) {
+              LOG.warn("AppMaster started but not yet initialized");
+              connection = null;
+            }
           }
-          if(connection.isConnected()){
+          if (connection.isConnected()) {
             AppStatusReportGenerator generator = new AppStatusReportGenerator();
-            String generateReport = generator.generateReport(connection, ClusterId.from(_applicationSpec.getAppName()));
+            ClusterId clusterId = ClusterId.from(_applicationSpec.getAppName());
+            String generateReport = generator.generateReport(connection, clusterId);
             LOG.info(generateReport);
           }
         }
@@ -430,13 +446,24 @@ public class AppLauncher {
   }
 
   /**
-   * will take the input file and AppSpecFactory class name as input
-   * @param args
+   * Launches the application on a YARN cluster. Once launched, it will display (periodically) the status of the containers in the application.
+   * @param args app_spec_provider and app_config_spec
    * @throws Exception
    */
   public static void main(String[] args) throws Exception {
-    ApplicationSpecFactory applicationSpecFactory = HelixYarnUtil.createInstance(args[0]);
-    File yamlConfigFile = new File(args[1]);
+
+    Options opts = new Options();
+    opts.addOption(new Option("app_spec_provider",true, "Application Spec Factory Class that will parse the app_config_spec file"));
+    opts.addOption(new Option("app_config_spec",true, "YAML config file that provides the app specifications"));
+    CommandLine cliParser = new GnuParser().parse(opts, args);
+    String appSpecFactoryClass = cliParser.getOptionValue("app_spec_provider");
+    String yamlConfigFileName = cliParser.getOptionValue("app_config_spec");
+
+    ApplicationSpecFactory applicationSpecFactory = HelixYarnUtil.createInstance(appSpecFactoryClass);
+    File yamlConfigFile = new File(yamlConfigFileName);
+    if(!yamlConfigFile.exists()){
+      throw new IllegalArgumentException("YAML app_config_spec file: '"+ yamlConfigFileName + "' does not exist");
+    }
     final AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
     launcher.launch();
     Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
index 0443f8a..b083ac9 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
@@ -17,7 +17,6 @@ import org.apache.helix.api.id.ResourceId;
 import org.apache.helix.controller.provisioner.ContainerId;
 import org.apache.helix.controller.provisioner.ContainerState;
 import org.apache.helix.manager.zk.ZkHelixConnection;
-import org.apache.helix.model.ExternalView;
 
 public class AppStatusReportGenerator {
   static String TAB = "\t";
@@ -39,26 +38,30 @@ public class AppStatusReportGenerator {
           resource.getExternalView().getStateMap(PartitionId.from(resourceId.stringify() + "_0"));
 
       builder.append(TAB).append("CONTAINER_NAME").append(TAB).append(TAB)
-          .append("CONTAINER_STATE").append(TAB).append("SERVICE_STATE").append(TAB).append("CONTAINER_ID").append(NEWLINE);
+          .append("CONTAINER_STATE").append(TAB).append("SERVICE_STATE").append(TAB)
+          .append("CONTAINER_ID").append(NEWLINE);
       for (Participant participant : participants.values()) {
         // need a better check
         if (!participant.getId().stringify().startsWith(resource.getId().stringify())) {
           continue;
         }
         ContainerConfig containerConfig = participant.getContainerConfig();
-        ContainerState containerState =ContainerState.UNDEFINED;
+        ContainerState containerState = ContainerState.UNDEFINED;
         ContainerId containerId = ContainerId.from("N/A");
 
         if (containerConfig != null) {
           containerId = containerConfig.getId();
           containerState = containerConfig.getState();
         }
-        State participantState = serviceStateMap.get(participant.getId());
+        State participantState = null;
+        if (serviceStateMap != null) {
+          participantState = serviceStateMap.get(participant.getId());
+        }
         if (participantState == null) {
           participantState = State.from("UNKNOWN");
         }
-        builder.append(TAB).append(participant.getId()).append(TAB)
-            .append(containerState).append(TAB).append(participantState).append(TAB).append(TAB).append(containerId);
+        builder.append(TAB).append(participant.getId()).append(TAB).append(containerState)
+            .append(TAB).append(participantState).append(TAB).append(TAB).append(containerId);
         builder.append(NEWLINE);
       }
 
@@ -67,13 +70,17 @@ public class AppStatusReportGenerator {
 
   }
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws InterruptedException {
     AppStatusReportGenerator generator = new AppStatusReportGenerator();
 
     ZkHelixConnection connection = new ZkHelixConnection("localhost:2181");
     connection.connect();
-    String generateReport = generator.generateReport(connection, ClusterId.from("testApp"));
-    System.out.println(generateReport);
-    connection.disconnect();
+    while (true) {
+      String generateReport = generator.generateReport(connection, ClusterId.from("testApp1"));
+      System.out.println(generateReport);
+      Thread.sleep(10000);
+      connection.createClusterManagementTool().addCluster("testApp1");
+    }
+    // connection.disconnect();
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/helix-provisioning/src/main/java/tools/UpdateProvisionerConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/tools/UpdateProvisionerConfig.java b/helix-provisioning/src/main/java/tools/UpdateProvisionerConfig.java
new file mode 100644
index 0000000..89ee1c5
--- /dev/null
+++ b/helix-provisioning/src/main/java/tools/UpdateProvisionerConfig.java
@@ -0,0 +1,87 @@
+package tools;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.provisioning.yarn.YarnProvisionerConfig;
+import org.apache.log4j.Logger;
+/**
+ * Update the provisioner config
+ */
+public class UpdateProvisionerConfig {
+  private static Logger LOG = Logger.getLogger(UpdateProvisionerConfig.class);
+  private static String updateContainerCount = "updateContainerCount";
+  private HelixConnection _connection;
+
+  public UpdateProvisionerConfig(String zkAddress) {
+    _connection = new ZkHelixConnection(zkAddress);
+    _connection.connect();
+  }
+
+  public void setNumContainers(String appName, String serviceName, int numContainers) {
+    ResourceId resourceId = ResourceId.from(serviceName);
+
+    ResourceAccessor resourceAccessor = _connection.createResourceAccessor(ClusterId.from(appName));
+    Resource resource = resourceAccessor.readResource(resourceId);
+    LOG.info("Current provisioner config:"+ resource.getProvisionerConfig());
+
+    ResourceConfig.Delta delta = new ResourceConfig.Delta(resourceId);
+    YarnProvisionerConfig config = new YarnProvisionerConfig(resourceId);
+    config.setNumContainers(numContainers);
+    delta.setProvisionerConfig(config);
+    ResourceConfig updatedResourceConfig = resourceAccessor.updateResource(resourceId, delta);
+    LOG.info("Update provisioner config:"+ updatedResourceConfig.getProvisionerConfig());
+
+  }
+
+  @SuppressWarnings("static-access")
+  public static void main(String[] args) throws ParseException {
+    Option zkServerOption =
+        OptionBuilder.withLongOpt("zookeeperAddress").withDescription("Provide zookeeper address")
+            .create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("zookeeperAddress(Required)");
+
+    OptionGroup group = new OptionGroup();
+    group.setRequired(true);
+
+    // update container count per service
+    Option updateContainerCountOption =
+        OptionBuilder.withLongOpt(updateContainerCount)
+            .withDescription("set the number of containers per service").create();
+    updateContainerCountOption.setArgs(3);
+    updateContainerCountOption.setRequired(false);
+    updateContainerCountOption.setArgName("appName serviceName numContainers");
+
+    group.addOption(updateContainerCountOption);
+
+    Options options = new Options();
+    options.addOption(zkServerOption);
+    options.addOptionGroup(group);
+    CommandLine cliParser = new GnuParser().parse(options, args);
+
+    String zkAddress = cliParser.getOptionValue("zookeeperAddress");
+    UpdateProvisionerConfig updater = new UpdateProvisionerConfig(zkAddress);
+
+    if (cliParser.hasOption(updateContainerCount)) {
+      String appName = cliParser.getOptionValues(updateContainerCount)[0];
+      String serviceName = cliParser.getOptionValues(updateContainerCount)[1];
+      int numContainers = Integer.parseInt(
+        cliParser.getOptionValues(updateContainerCount)[2]);
+      updater.setNumContainers(appName, serviceName, numContainers);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/recipes/helloworld-provisioning-yarn/run.sh
----------------------------------------------------------------------
diff --git a/recipes/helloworld-provisioning-yarn/run.sh b/recipes/helloworld-provisioning-yarn/run.sh
index 51d4c35..07448bb 100755
--- a/recipes/helloworld-provisioning-yarn/run.sh
+++ b/recipes/helloworld-provisioning-yarn/run.sh
@@ -1,6 +1,6 @@
-cd ../../../../
-mvn clean install -DskipTests
-cd recipes/provisioning/yarn/helloworld/
+#cd ../../
+#mvn clean install -DskipTests
+#cd recipes/helloworld-provisioning-yarn
 mvn clean package -DskipTests
-chmod +x target/helloworld-pkg/bin/app-launcher.sh
-target/helloworld-pkg/bin/app-launcher.sh org.apache.helix.provisioning.yarn.example.HelloWordAppSpecFactory /Users/kgopalak/Documents/projects/incubator-helix/recipes/provisioning/yarn/helloworld/src/main/resources/hello_world_app_spec.yaml
+chmod +x target/helloworld-provisioning-yarn-pkg/bin/app-launcher.sh
+target/helloworld-provisioning-yarn/pkg/bin/app-launcher.sh org.apache.helix.provisioning.yarn.example.HelloWordAppSpecFactory /Users/kgopalak/Documents/projects/incubator-helix/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml

http://git-wip-us.apache.org/repos/asf/helix/blob/c072aca4/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml
----------------------------------------------------------------------
diff --git a/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml b/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml
index d8d1dd2..baaddb5 100644
--- a/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml
+++ b/recipes/helloworld-provisioning-yarn/src/main/resources/hello_world_app_spec.yaml
@@ -3,7 +3,7 @@ appConfig:
   config: {
     k1: v1
   }
-appMasterPackageUri: 'file:///Users/kgopalak/Documents/projects/incubator-helix/recipes/provisioning/yarn/helloworld/target/helloworld-0.7.1-incubating-SNAPSHOT-pkg.tar'
+appMasterPackageUri: 'file:///Users/kgopalak/Documents/projects/incubator-helix/recipes/helloworld-provisioning-yarn/target/helloworld-provisioning-yarn-0.7.1-incubating-SNAPSHOT-pkg.tar'
 appName: testApp
 serviceConfigMap:
   HelloWorld: {
@@ -14,7 +14,7 @@ serviceMainClassMap: {
   HelloWorld: org.apache.helix.provisioning.yarn.example.HelloWorldService
 }
 servicePackageURIMap: {
-  HelloWorld: 'file:///Users/kgopalak/Documents/projects/incubator-helix/recipes/provisioning/yarn/helloworld/target/helloworld-0.7.1-incubating-SNAPSHOT-pkg.tar'
+  HelloWorld: 'file:///Users/kgopalak/Documents/projects/incubator-helix/recipes/helloworld-provisioning-yarn/target/helloworld-provisioning-yarn-0.7.1-incubating-SNAPSHOT-pkg.tar'
 }
 services: [
   HelloWorld]