You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:05:01 UTC

[18/50] [abbrv] git commit: Moving packages around

Moving packages around


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8992aa5a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8992aa5a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8992aa5a

Branch: refs/heads/master
Commit: 8992aa5a7e7bd39b63aa79ea96a9fe316f833014
Parents: b47e329
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Sat Feb 22 11:50:07 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Sat Feb 22 11:50:07 2014 -0800

----------------------------------------------------------------------
 .../controller/provisioner/ContainerState.java  |   3 +-
 helix-provisioning/pom.xml                      |   2 +-
 .../apache/helix/provisioning/AppConfig.java    |  17 +
 .../helix/provisioning/ApplicationSpec.java     |  29 +
 .../provisioning/ApplicationSpecFactory.java    |   9 +
 .../provisioning/ContainerAskResponse.java      |  17 +
 .../provisioning/ContainerLaunchResponse.java   |   5 +
 .../provisioning/ContainerReleaseResponse.java  |   5 +
 .../provisioning/ContainerStopResponse.java     |   5 +
 .../helix/provisioning/HelixYarnUtil.java       |  42 +
 .../helix/provisioning/ParticipantLauncher.java | 136 +++
 .../helix/provisioning/ServiceConfig.java       |  17 +
 .../apache/helix/provisioning/TaskConfig.java   |  13 +
 .../helix/provisioning/yarn/AppConfig.java      |  17 -
 .../helix/provisioning/yarn/AppLauncher.java    |  22 +-
 .../provisioning/yarn/AppMasterLauncher.java    | 166 ++++
 .../yarn/AppStatusReportGenerator.java          |  79 ++
 .../provisioning/yarn/ApplicationMaster.java    | 889 -------------------
 .../provisioning/yarn/ApplicationSpec.java      |  28 -
 .../yarn/ApplicationSpecFactory.java            |   9 -
 .../apache/helix/provisioning/yarn/Client.java  | 627 -------------
 .../provisioning/yarn/ContainerAskResponse.java |  17 -
 .../yarn/ContainerLaunchResponse.java           |   5 -
 .../yarn/ContainerReleaseResponse.java          |   5 -
 .../yarn/ContainerStopResponse.java             |   5 -
 .../helix/provisioning/yarn/DSConstants.java    |  47 -
 .../provisioning/yarn/FixedTargetProvider.java  |  20 +
 .../yarn/GenericApplicationMaster.java          |   4 +
 .../yarn/HelixYarnApplicationMasterMain.java    | 159 ----
 .../helix/provisioning/yarn/HelixYarnUtil.java  |  42 -
 .../provisioning/yarn/NMCallbackHandler.java    |   2 +
 .../provisioning/yarn/ParticipantLauncher.java  | 135 ---
 .../provisioning/yarn/RMCallbackHandler.java    |   3 +
 .../helix/provisioning/yarn/ServiceConfig.java  |  17 -
 .../helix/provisioning/yarn/TaskConfig.java     |  13 -
 .../provisioning/yarn/YarnProvisioner.java      |   6 +
 recipes/helloworld-provisioning-yarn/pom.xml    | 159 ++++
 recipes/helloworld-provisioning-yarn/run.sh     |   6 +
 .../src/assemble/assembly.xml                   |  60 ++
 .../src/main/config/log4j.properties            |  31 +
 .../yarn/example/HelloWordAppSpecFactory.java   |  92 ++
 .../yarn/example/HelloWorldService.java         |  41 +
 .../yarn/example/HelloWorldStateModel.java      |  33 +
 .../example/HelloWorldStateModelFactory.java    |  13 +
 .../yarn/example/HelloworldAppSpec.java         | 138 +++
 .../main/resources/hello_world_app_spec.yaml    |  24 +
 .../src/test/conf/testng.xml                    |  27 +
 recipes/pom.xml                                 |   2 +-
 recipes/provisioning/pom.xml                    |  50 --
 recipes/provisioning/yarn/helloworld/pom.xml    | 159 ----
 .../yarn/helloworld/src/assemble/assembly.xml   |  60 --
 .../helloworld/src/main/config/log4j.properties |  31 -
 .../yarn/example/HelloWordAppSpecFactory.java   |  92 --
 .../yarn/example/HelloWorldService.java         |  41 -
 .../yarn/example/HelloWorldStateModel.java      |  29 -
 .../example/HelloWorldStateModelFactory.java    |  13 -
 .../yarn/example/HelloworldAppSpec.java         | 138 ---
 .../main/resources/hello_world_app_spec.yaml    |  24 -
 .../yarn/helloworld/src/test/conf/testng.xml    |  27 -
 recipes/provisioning/yarn/pom.xml               |  50 --
 60 files changed, 1222 insertions(+), 2735 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
index 449f636..2f91275 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerState.java
@@ -29,5 +29,6 @@ public enum ContainerState {
   HALTED,
   FINALIZING,
   FINALIZED,
-  FAILED
+  FAILED, 
+  UNDEFINED
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/pom.xml
----------------------------------------------------------------------
diff --git a/helix-provisioning/pom.xml b/helix-provisioning/pom.xml
index ea5a0fe..3ba7d39 100644
--- a/helix-provisioning/pom.xml
+++ b/helix-provisioning/pom.xml
@@ -36,7 +36,7 @@ under the License.
       org.apache.log4j,
       *
     </osgi.import>
-    <osgi.export>org.apache.helix.provisioning.yarn*;version="${project.version};-noimport:=true</osgi.export>
+    <osgi.export>org.apache.helix.provisioning*;version="${project.version};-noimport:=true</osgi.export>
   </properties>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java
new file mode 100644
index 0000000..a51db1c
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/AppConfig.java
@@ -0,0 +1,17 @@
+package org.apache.helix.provisioning;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class AppConfig {
+	public Map<String, String> config = new HashMap<String, String>();
+	
+	public String getValue(String key) {
+		return (config != null ? config.get(key) : null);
+	}
+	
+	public void setValue(String key, String value){
+	  config.put(key, value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java
new file mode 100644
index 0000000..f7454d2
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpec.java
@@ -0,0 +1,29 @@
+package org.apache.helix.provisioning;
+
+import java.net.URI;
+import java.util.List;
+
+
+
+public interface ApplicationSpec {
+  /**
+   * Returns the name of the application
+   * @return
+   */
+  String getAppName();
+
+  AppConfig getConfig();
+
+  List<String> getServices();
+
+  URI getAppMasterPackage();
+  
+  URI getServicePackage(String serviceName);
+  
+  String getServiceMainClass(String service);
+
+  ServiceConfig getServiceConfig(String serviceName);
+
+  List<TaskConfig> getTaskConfigs();
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java
new file mode 100644
index 0000000..0c524f2
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ApplicationSpecFactory.java
@@ -0,0 +1,9 @@
+package org.apache.helix.provisioning;
+
+import java.io.InputStream;
+
+public interface ApplicationSpecFactory {
+  
+  ApplicationSpec fromYaml(InputStream yamlFile);
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java
new file mode 100644
index 0000000..18f66d2
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerAskResponse.java
@@ -0,0 +1,17 @@
+package org.apache.helix.provisioning;
+
+import org.apache.hadoop.yarn.api.records.Container;
+
+public class ContainerAskResponse {
+  
+  Container container;
+
+  public Container getContainer() {
+    return container;
+  }
+
+  public void setContainer(Container container) {
+    this.container = container;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java
new file mode 100644
index 0000000..ea6ef12
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerLaunchResponse.java
@@ -0,0 +1,5 @@
+package org.apache.helix.provisioning;
+
+public class ContainerLaunchResponse {
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java
new file mode 100644
index 0000000..e4a5dc4
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerReleaseResponse.java
@@ -0,0 +1,5 @@
+package org.apache.helix.provisioning;
+
+public class ContainerReleaseResponse {
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java
new file mode 100644
index 0000000..d8c8a46
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ContainerStopResponse.java
@@ -0,0 +1,5 @@
+package org.apache.helix.provisioning;
+
+public class ContainerStopResponse {
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java
new file mode 100644
index 0000000..80ac16b
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/HelixYarnUtil.java
@@ -0,0 +1,42 @@
+package org.apache.helix.provisioning;
+
+import org.apache.log4j.Logger;
+
+public class HelixYarnUtil {
+  private static Logger LOG = Logger.getLogger(HelixYarnUtil.class);
+
+  @SuppressWarnings("unchecked")
+  public static <T extends ApplicationSpecFactory> T createInstance(String className) {
+    Class<ApplicationSpecFactory> factoryClazz = null;
+    {
+      try {
+        factoryClazz =
+            (Class<ApplicationSpecFactory>) Thread.currentThread().getContextClassLoader()
+                .loadClass(className);
+      } catch (ClassNotFoundException e) {
+        try {
+          factoryClazz =
+              (Class<ApplicationSpecFactory>) ClassLoader.getSystemClassLoader().loadClass(
+                  className);
+        } catch (ClassNotFoundException e1) {
+          try {
+            factoryClazz = (Class<ApplicationSpecFactory>) Class.forName(className);
+          } catch (ClassNotFoundException e2) {
+
+          }
+        }
+      }
+    }
+    System.out.println(System.getProperty("java.class.path"));
+    if (factoryClazz == null) {
+      LOG.error("Unable to find class:" + className);
+    }
+    ApplicationSpecFactory factory = null;
+    try {
+      factory = factoryClazz.newInstance();
+    } catch (Exception e) {
+      LOG.error("Unable to create instance of class: " + className, e);
+    }
+    return (T) factory;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java
new file mode 100644
index 0000000..55bb618
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ParticipantLauncher.java
@@ -0,0 +1,136 @@
+package org.apache.helix.provisioning;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.manager.zk.AbstractParticipantService;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.messaging.handling.HelixTaskResult;
+import org.apache.helix.messaging.handling.MessageHandler;
+import org.apache.helix.messaging.handling.MessageHandlerFactory;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.log4j.Logger;
+
+/**
+ * Main class that invokes the Participant Api
+ */
+public class ParticipantLauncher {
+  private static Logger LOG = Logger.getLogger(ParticipantLauncher.class);
+
+  public static void main(String[] args) {
+
+    System.out.println("Starting Helix Participant: " + Arrays.toString(args));
+    Options opts;
+    opts = new Options();
+    opts.addOption("cluster", true, "Cluster name, default app name");
+    opts.addOption("participantId", true, "Participant Id");
+    opts.addOption("zkAddress", true, "Zookeeper address");
+    opts.addOption("participantClass", true, "Participant service class");
+    try {
+      CommandLine cliParser = new GnuParser().parse(opts, args);
+      String zkAddress = cliParser.getOptionValue("zkAddress");
+      final HelixConnection connection = new ZkHelixConnection(zkAddress);
+      connection.connect();
+      ClusterId clusterId = ClusterId.from(cliParser.getOptionValue("cluster"));
+      ParticipantId participantId = ParticipantId.from(cliParser.getOptionValue("participantId"));
+      String participantClass = cliParser.getOptionValue("participantClass");
+      @SuppressWarnings("unchecked")
+      Class<? extends AbstractParticipantService> clazz =
+          (Class<? extends AbstractParticipantService>) Class.forName(participantClass);
+      final AbstractParticipantService containerParticipant =
+          clazz.getConstructor(HelixConnection.class, ClusterId.class, ParticipantId.class)
+              .newInstance(connection, clusterId, participantId);
+      containerParticipant.startAsync();
+      containerParticipant.awaitRunning(60, TimeUnit.SECONDS);
+      containerParticipant
+          .getParticipant()
+          .getMessagingService()
+          .registerMessageHandlerFactory(MessageType.SHUTDOWN.toString(),
+              new ShutdownMessageHandlerFactory(containerParticipant, connection));
+      Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+        @Override
+        public void run() {
+          LOG.info("Received a shutdown signal. Stopping participant");
+          containerParticipant.stopAsync();
+          containerParticipant.awaitTerminated();
+          connection.disconnect();
+        }
+      }) {
+
+      });
+      Thread.currentThread().join();
+    } catch (Exception e) {
+      e.printStackTrace();
+      System.out.println("Failed to start Helix participant" + e);
+      // System.exit(1);
+    }
+    try {
+      Thread.currentThread().join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+  }
+
+  public static class ShutdownMessageHandlerFactory implements MessageHandlerFactory {
+    private final AbstractParticipantService _service;
+    private final HelixConnection _connection;
+
+    public ShutdownMessageHandlerFactory(AbstractParticipantService service,
+        HelixConnection connection) {
+      _service = service;
+      _connection = connection;
+    }
+
+    @Override
+    public MessageHandler createHandler(Message message, NotificationContext context) {
+      return new ShutdownMessageHandler(_service, _connection, message, context);
+    }
+
+    @Override
+    public String getMessageType() {
+      return MessageType.SHUTDOWN.toString();
+    }
+
+    @Override
+    public void reset() {
+    }
+
+  }
+
+  public static class ShutdownMessageHandler extends MessageHandler {
+    private final AbstractParticipantService _service;
+    private final HelixConnection _connection;
+
+    public ShutdownMessageHandler(AbstractParticipantService service, HelixConnection connection,
+        Message message, NotificationContext context) {
+      super(message, context);
+      _service = service;
+      _connection = connection;
+    }
+
+    @Override
+    public HelixTaskResult handleMessage() throws InterruptedException {
+      LOG.info("Received a shutdown message. Trying to shut down.");
+      _service.stopAsync();
+      _service.awaitTerminated();
+      _connection.disconnect();
+      LOG.info("Shutdown complete. Process exiting gracefully");
+      System.exit(0);
+      return null;
+    }
+
+    @Override
+    public void onError(Exception e, ErrorCode code, ErrorType type) {
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java
new file mode 100644
index 0000000..262344b
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/ServiceConfig.java
@@ -0,0 +1,17 @@
+package org.apache.helix.provisioning;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ResourceId;
+
+public class ServiceConfig extends UserConfig{
+	public Map<String, String> config = new HashMap<String, String>();
+	
+	public ServiceConfig(Scope<ResourceId> scope) {
+	  super(scope);
+  }
+ 
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
new file mode 100644
index 0000000..42203e9
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/TaskConfig.java
@@ -0,0 +1,13 @@
+package org.apache.helix.provisioning;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class TaskConfig {
+	public Map<String, String> config = new HashMap<String, String>();
+	
+	public String getValue(String key) {
+		return (config != null ? config.get(key) : null);
+	}
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppConfig.java
deleted file mode 100644
index 7ea3e42..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppConfig.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.helix.provisioning.yarn;
-
-import java.util.HashMap;
-import java.util.Map;
-
-
-public class AppConfig {
-	public Map<String, String> config = new HashMap<String, String>();
-	
-	public String getValue(String key) {
-		return (config != null ? config.get(key) : null);
-	}
-	
-	public void setValue(String key, String value){
-	  config.put(key, value);
-	}
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index 1fe0a28..d2e901f 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@ -44,6 +44,12 @@ import org.apache.hadoop.yarn.client.api.YarnClientApplication;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.provisioning.ApplicationSpec;
+import org.apache.helix.provisioning.ApplicationSpecFactory;
+import org.apache.helix.provisioning.HelixYarnUtil;
 
 /**
  * Main class to launch the job.
@@ -52,7 +58,7 @@ import org.apache.hadoop.yarn.util.Records;
  */
 public class AppLauncher {
 
-  private static final Log LOG = LogFactory.getLog(Client.class);
+  private static final Log LOG = LogFactory.getLog(AppLauncher.class);
 
   private ApplicationSpec _applicationSpec;
   private YarnClient yarnClient;
@@ -197,7 +203,7 @@ public class AppLauncher {
     // Set Xmx based on am memory size
     vargs.add("-Xmx" + amMemory + "m");
     // Set class name
-    vargs.add(HelixYarnApplicationMasterMain.class.getCanonicalName());
+    vargs.add(AppMasterLauncher.class.getCanonicalName());
     // Set params for Application Master
     // vargs.add("--num_containers " + String.valueOf(numContainers));
 
@@ -375,7 +381,17 @@ public class AppLauncher {
           return false;
         }
         if (YarnApplicationState.RUNNING == state) {
-
+          HelixConnection connection = new ZkHelixConnection(report.getHost() + ":2181");
+          try{
+            connection.connect();
+          }catch(Exception e){
+            LOG.warn("AppMaster started but not yet initialized");
+          }
+          if(connection.isConnected()){
+            AppStatusReportGenerator generator = new AppStatusReportGenerator();
+            String generateReport = generator.generateReport(connection, ClusterId.from(_applicationSpec.getAppName()));
+            LOG.info(generateReport);
+          }
         }
         prevReport = reportMessage;
         Thread.sleep(10000);

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
new file mode 100644
index 0000000..72d6ea9
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
@@ -0,0 +1,166 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.helix.HelixController;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.config.ClusterConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ControllerId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.provisioner.ProvisionerConfig;
+import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.provisioning.ApplicationSpec;
+import org.apache.helix.provisioning.ApplicationSpecFactory;
+import org.apache.helix.provisioning.HelixYarnUtil;
+import org.apache.helix.provisioning.ServiceConfig;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
+
+/**
+ * This will <br/>
+ * <ul>
+ * <li>start zookeeper automatically</li>
+ * <li>create the cluster</li>
+ * <li>set up resource(s)</li>
+ * <li>start helix controller</li>
+ * </ul>
+ */
+public class AppMasterLauncher {
+  public static Logger LOG = Logger.getLogger(AppMasterLauncher.class);
+
+  @SuppressWarnings("unchecked")
+  public static void main(String[] args) throws Exception{
+    Map<String, String> env = System.getenv();
+    LOG.info("Starting app master with the following environment variables");
+    for (String key : env.keySet()) {
+      LOG.info(key + "\t\t=" + env.get(key));
+    }
+
+    Options opts;
+    opts = new Options();
+    opts.addOption("num_containers", true, "Number of containers");
+    try {
+      CommandLine cliParser = new GnuParser().parse(opts, args);
+    } catch (Exception e) {
+      LOG.error("Error parsing input arguments" + Arrays.toString(args), e);
+    }
+
+    // START ZOOKEEPER
+    String dataDir = "dataDir";
+    String logDir = "logDir";
+    IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+      @Override
+      public void createDefaultNameSpace(ZkClient zkClient) {
+
+      }
+    };
+    try {
+      FileUtils.deleteDirectory(new File(dataDir));
+      FileUtils.deleteDirectory(new File(logDir));
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+
+    final ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace);
+    server.start();
+
+    // start Generic AppMaster that interacts with Yarn RM
+    AppMasterConfig appMasterConfig = new AppMasterConfig();
+    String containerIdStr = appMasterConfig.getContainerId();
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+
+    String configFile = AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString();
+    String className = appMasterConfig.getApplicationSpecFactory();
+   
+    GenericApplicationMaster genericApplicationMaster = new GenericApplicationMaster(appAttemptID);
+    try {
+      genericApplicationMaster.start();
+    } catch (Exception e) {
+      LOG.error("Unable to start application master: ", e);
+    }
+    ApplicationSpecFactory factory = HelixYarnUtil.createInstance(className);
+    
+    //TODO: Avoid setting static variable.
+    YarnProvisioner.applicationMaster = genericApplicationMaster;
+    YarnProvisioner.applicationMasterConfig = appMasterConfig;
+    ApplicationSpec applicationSpec = factory.fromYaml(new FileInputStream(configFile));
+    YarnProvisioner.applicationSpec = applicationSpec;
+    String zkAddress = appMasterConfig.getZKAddress();
+    String clusterName = appMasterConfig.getAppName();
+
+    // CREATE CLUSTER and setup the resources
+    // connect
+    ZkHelixConnection connection = new ZkHelixConnection(zkAddress);
+    connection.connect();
+
+    // create the cluster
+    ClusterId clusterId = ClusterId.from(clusterName);
+    ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+    StateModelDefinition statelessService =
+        new StateModelDefinition(StateModelConfigGenerator.generateConfigForStatelessService());
+    clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
+        statelessService).build());
+    for (String service : applicationSpec.getServices()) {
+      String resourceName = service;
+      // add the resource with the local provisioner
+      ResourceId resourceId = ResourceId.from(resourceName);
+      
+      ServiceConfig serviceConfig = applicationSpec.getServiceConfig(resourceName);
+      serviceConfig.setSimpleField("service_name", service);
+      int numContainers = serviceConfig.getIntField("num_containers", 1);
+      
+      YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
+      provisionerConfig.setNumContainers(numContainers);
+
+      FullAutoRebalancerConfig.Builder rebalancerConfigBuilder =
+          new FullAutoRebalancerConfig.Builder(resourceId);
+      RebalancerConfig rebalancerConfig =
+          rebalancerConfigBuilder.stateModelDefId(statelessService.getStateModelDefId())//
+              .build();
+      ResourceConfig.Builder resourceConfigBuilder =
+          new ResourceConfig.Builder(ResourceId.from(resourceName));
+      ResourceConfig resourceConfig = resourceConfigBuilder.provisionerConfig(provisionerConfig) //
+          .rebalancerConfig(rebalancerConfig) //
+          .userConfig(serviceConfig) //
+          .build();
+      clusterAccessor.addResourceToCluster(resourceConfig);
+    }
+    // start controller
+    ControllerId controllerId = ControllerId.from("controller1");
+    HelixController controller = connection.createController(clusterId, controllerId);
+    controller.start();
+
+    Thread shutdownhook = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        server.shutdown();
+      }
+    });
+    Runtime.getRuntime().addShutdownHook(shutdownhook);
+    Thread.sleep(10000);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
new file mode 100644
index 0000000..0443f8a
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
@@ -0,0 +1,79 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.util.Map;
+
+import jline.ConsoleReader;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.State;
+import org.apache.helix.api.accessor.ClusterAccessor;
+import org.apache.helix.api.config.ContainerConfig;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.provisioner.ContainerId;
+import org.apache.helix.controller.provisioner.ContainerState;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.model.ExternalView;
+
+public class AppStatusReportGenerator {
+  static String TAB = "\t";
+  static String NEWLINE = "\n";
+
+  String generateReport(HelixConnection connection, ClusterId clusterId) {
+    if (!connection.isConnected()) {
+      return "Unable to connect to cluster";
+    }
+    StringBuilder builder = new StringBuilder();
+    ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+    Map<ParticipantId, Participant> participants = clusterAccessor.readParticipants();
+    builder.append("AppName").append(TAB).append(clusterId).append(NEWLINE);
+    Map<ResourceId, Resource> resources = clusterAccessor.readResources();
+    for (ResourceId resourceId : resources.keySet()) {
+      builder.append("SERVICE").append(TAB).append(resourceId).append(NEWLINE);
+      Resource resource = resources.get(resourceId);
+      Map<ParticipantId, State> serviceStateMap =
+          resource.getExternalView().getStateMap(PartitionId.from(resourceId.stringify() + "_0"));
+
+      builder.append(TAB).append("CONTAINER_NAME").append(TAB).append(TAB)
+          .append("CONTAINER_STATE").append(TAB).append("SERVICE_STATE").append(TAB).append("CONTAINER_ID").append(NEWLINE);
+      for (Participant participant : participants.values()) {
+        // need a better check
+        if (!participant.getId().stringify().startsWith(resource.getId().stringify())) {
+          continue;
+        }
+        ContainerConfig containerConfig = participant.getContainerConfig();
+        ContainerState containerState =ContainerState.UNDEFINED;
+        ContainerId containerId = ContainerId.from("N/A");
+
+        if (containerConfig != null) {
+          containerId = containerConfig.getId();
+          containerState = containerConfig.getState();
+        }
+        State participantState = serviceStateMap.get(participant.getId());
+        if (participantState == null) {
+          participantState = State.from("UNKNOWN");
+        }
+        builder.append(TAB).append(participant.getId()).append(TAB)
+            .append(containerState).append(TAB).append(participantState).append(TAB).append(TAB).append(containerId);
+        builder.append(NEWLINE);
+      }
+
+    }
+    return builder.toString();
+
+  }
+
+  public static void main(String[] args) {
+    AppStatusReportGenerator generator = new AppStatusReportGenerator();
+
+    ZkHelixConnection connection = new ZkHelixConnection("localhost:2181");
+    connection.connect();
+    String generateReport = generator.generateReport(connection, ClusterId.from("testApp"));
+    System.out.println(generateReport);
+    connection.disconnect();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
deleted file mode 100644
index d63b300..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationMaster.java
+++ /dev/null
@@ -1,889 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.helix.provisioning.yarn;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Vector;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
-import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
-
-/**
- * An ApplicationMaster for executing shell commands on a set of launched
- * containers using the YARN framework.
- * 
- * <p>
- * This class is meant to act as an example on how to write yarn-based
- * application masters.
- * </p>
- * 
- * <p>
- * The ApplicationMaster is started on a container by the
- * <code>ResourceManager</code>'s launcher. The first thing that the
- * <code>ApplicationMaster</code> needs to do is to connect and register itself
- * with the <code>ResourceManager</code>. The registration sets up information
- * within the <code>ResourceManager</code> regarding what host:port the
- * ApplicationMaster is listening on to provide any form of functionality to a
- * client as well as a tracking url that a client can use to keep track of
- * status/job history if needed. However, in the distributedshell, trackingurl
- * and appMasterHost:appMasterRpcPort are not supported.
- * </p>
- * 
- * <p>
- * The <code>ApplicationMaster</code> needs to send a heartbeat to the
- * <code>ResourceManager</code> at regular intervals to inform the
- * <code>ResourceManager</code> that it is up and alive. The
- * {@link ApplicationMasterProtocol#allocate} to the <code>ResourceManager</code> from the
- * <code>ApplicationMaster</code> acts as a heartbeat.
- * 
- * <p>
- * For the actual handling of the job, the <code>ApplicationMaster</code> has to
- * request the <code>ResourceManager</code> via {@link AllocateRequest} for the
- * required no. of containers using {@link ResourceRequest} with the necessary
- * resource specifications such as node location, computational
- * (memory/disk/cpu) resource requirements. The <code>ResourceManager</code>
- * responds with an {@link AllocateResponse} that informs the
- * <code>ApplicationMaster</code> of the set of newly allocated containers,
- * completed containers as well as current state of available resources.
- * </p>
- * 
- * <p>
- * For each allocated container, the <code>ApplicationMaster</code> can then set
- * up the necessary launch context via {@link ContainerLaunchContext} to specify
- * the allocated container id, local resources required by the executable, the
- * environment to be setup for the executable, commands to execute, etc. and
- * submit a {@link StartContainerRequest} to the {@link ContainerManagementProtocol} to
- * launch and execute the defined commands on the given allocated container.
- * </p>
- * 
- * <p>
- * The <code>ApplicationMaster</code> can monitor the launched container by
- * either querying the <code>ResourceManager</code> using
- * {@link ApplicationMasterProtocol#allocate} to get updates on completed containers or via
- * the {@link ContainerManagementProtocol} by querying for the status of the allocated
- * container's {@link ContainerId}.
- *
- * <p>
- * After the job has been completed, the <code>ApplicationMaster</code> has to
- * send a {@link FinishApplicationMasterRequest} to the
- * <code>ResourceManager</code> to inform it that the
- * <code>ApplicationMaster</code> has been completed.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Unstable
-public class ApplicationMaster {
-
-  private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
-
-  // Configuration
-  private Configuration conf;
-
-  // Handle to communicate with the Resource Manager
-  @SuppressWarnings("rawtypes")
-  private AMRMClientAsync amRMClient;
-
-  // Handle to communicate with the Node Manager
-  private NMClientAsync nmClientAsync;
-  // Listen to process the response from the Node Manager
-  private NMCallbackHandler containerListener;
-  
-  // Application Attempt Id ( combination of attemptId and fail count )
-  private ApplicationAttemptId appAttemptID;
-
-  // TODO
-  // For status update for clients - yet to be implemented
-  // Hostname of the container
-  private String appMasterHostname = "";
-  // Port on which the app master listens for status updates from clients
-  private int appMasterRpcPort = -1;
-  // Tracking url to which app master publishes info for clients to monitor
-  private String appMasterTrackingUrl = "";
-
-  // App Master configuration
-  // No. of containers to run shell command on
-  private int numTotalContainers = 1;
-  // Memory to request for the container on which the shell command will run
-  private int containerMemory = 10;
-  // Priority of the request
-  private int requestPriority;
-
-  // Counter for completed containers ( complete denotes successful or failed )
-  private AtomicInteger numCompletedContainers = new AtomicInteger();
-  // Allocated container count so that we know how many containers has the RM
-  // allocated to us
-  private AtomicInteger numAllocatedContainers = new AtomicInteger();
-  // Count of failed containers
-  private AtomicInteger numFailedContainers = new AtomicInteger();
-  // Count of containers already requested from the RM
-  // Needed as once requested, we should not request for containers again.
-  // Only request for more if the original requirement changes.
-  private AtomicInteger numRequestedContainers = new AtomicInteger();
-
-  // Shell command to be executed
-  private String shellCommand = "";
-  // Args to be passed to the shell command
-  private String shellArgs = "";
-  // Env variables to be setup for the shell command
-  private Map<String, String> shellEnv = new HashMap<String, String>();
-
-  // Location of shell script ( obtained from info set in env )
-  // Shell script path in fs
-  private String shellScriptPath = "";
-  // Timestamp needed for creating a local resource
-  private long shellScriptPathTimestamp = 0;
-  // File length needed for local resource
-  private long shellScriptPathLen = 0;
-
-  // Hardcoded path to shell script in launch container's local env
-  private final String ExecShellStringPath = "ExecShellScript.sh";
-
-  private volatile boolean done;
-  private volatile boolean success;
-
-  private ByteBuffer allTokens;
-
-  // Launch threads
-  private List<Thread> launchThreads = new ArrayList<Thread>();
-
-  /**
-   * @param args Command line args
-   */
-  public static void main(String[] args) {
-    boolean result = false;
-    try {
-      ApplicationMaster appMaster = new ApplicationMaster();
-      LOG.info("Initializing ApplicationMaster");
-      LOG.info("classpath:" + System.getProperty("java.class.path"));
-      boolean doRun = appMaster.init(args);
-      if (!doRun) {
-        System.exit(0);
-      }
-      result = appMaster.run();
-    } catch (Throwable t) {
-      LOG.fatal("Error running ApplicationMaster", t);
-      System.exit(1);
-    }
-    if (result) {
-      LOG.info("Application Master completed successfully. exiting");
-      System.exit(0);
-    } else {
-      LOG.info("Application Master failed. exiting");
-      System.exit(2);
-    }
-  }
-
-  /**
-   * Dump out contents of $CWD and the environment to stdout for debugging
-   */
-  private void dumpOutDebugInfo() {
-
-    LOG.info("Dump debug output");
-    Map<String, String> envs = System.getenv();
-    for (Map.Entry<String, String> env : envs.entrySet()) {
-      LOG.info("System env: key=" + env.getKey() + ", val=" + env.getValue());
-      System.out.println("System env: key=" + env.getKey() + ", val="
-          + env.getValue());
-    }
-
-    String cmd = "ls -al";
-    Runtime run = Runtime.getRuntime();
-    Process pr = null;
-    try {
-      pr = run.exec(cmd);
-      pr.waitFor();
-
-      BufferedReader buf = new BufferedReader(new InputStreamReader(
-          pr.getInputStream()));
-      String line = "";
-      while ((line = buf.readLine()) != null) {
-        LOG.info("System CWD content: " + line);
-        System.out.println("System CWD content: " + line);
-      }
-      buf.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-
-  public ApplicationMaster() {
-    // Set up the configuration
-    conf = new YarnConfiguration();
-  }
-
-  /**
-   * Parse command line options
-   *
-   * @param args Command line args
-   * @return Whether init successful and run should be invoked
-   * @throws ParseException
-   * @throws IOException
-   */
-  public boolean init(String[] args) throws ParseException, IOException {
-
-    Options opts = new Options();
-    opts.addOption("app_attempt_id", true,
-        "App Attempt ID. Not to be used unless for testing purposes");
-    opts.addOption("shell_command", true,
-        "Shell command to be executed by the Application Master");
-    opts.addOption("shell_script", true,
-        "Location of the shell script to be executed");
-    opts.addOption("shell_args", true, "Command line args for the shell script");
-    opts.addOption("shell_env", true,
-        "Environment for shell script. Specified as env_key=env_val pairs");
-    opts.addOption("container_memory", true,
-        "Amount of memory in MB to be requested to run the shell command");
-    opts.addOption("num_containers", true,
-        "No. of containers on which the shell command needs to be executed");
-    opts.addOption("priority", true, "Application Priority. Default 0");
-    opts.addOption("debug", false, "Dump out debug information");
-
-    opts.addOption("help", false, "Print usage");
-    CommandLine cliParser = new GnuParser().parse(opts, args);
-
-    if (args.length == 0) {
-      printUsage(opts);
-      throw new IllegalArgumentException(
-          "No args specified for application master to initialize");
-    }
-
-    if (cliParser.hasOption("help")) {
-      printUsage(opts);
-      return false;
-    }
-
-    if (cliParser.hasOption("debug")) {
-      dumpOutDebugInfo();
-    }
-
-    Map<String, String> envs = System.getenv();
-
-    if (!envs.containsKey(Environment.CONTAINER_ID.name())) {
-      if (cliParser.hasOption("app_attempt_id")) {
-        String appIdStr = cliParser.getOptionValue("app_attempt_id", "");
-        appAttemptID = ConverterUtils.toApplicationAttemptId(appIdStr);
-      } else {
-        throw new IllegalArgumentException(
-            "Application Attempt Id not set in the environment");
-      }
-    } else {
-      ContainerId containerId = ConverterUtils.toContainerId(envs
-          .get(Environment.CONTAINER_ID.name()));
-      appAttemptID = containerId.getApplicationAttemptId();
-    }
-
-    if (!envs.containsKey(ApplicationConstants.APP_SUBMIT_TIME_ENV)) {
-      throw new RuntimeException(ApplicationConstants.APP_SUBMIT_TIME_ENV
-          + " not set in the environment");
-    }
-    if (!envs.containsKey(Environment.NM_HOST.name())) {
-      throw new RuntimeException(Environment.NM_HOST.name()
-          + " not set in the environment");
-    }
-    if (!envs.containsKey(Environment.NM_HTTP_PORT.name())) {
-      throw new RuntimeException(Environment.NM_HTTP_PORT
-          + " not set in the environment");
-    }
-    if (!envs.containsKey(Environment.NM_PORT.name())) {
-      throw new RuntimeException(Environment.NM_PORT.name()
-          + " not set in the environment");
-    }
-
-    LOG.info("Application master for app" + ", appId="
-        + appAttemptID.getApplicationId().getId() + ", clustertimestamp="
-        + appAttemptID.getApplicationId().getClusterTimestamp()
-        + ", attemptId=" + appAttemptID.getAttemptId());
-
-    if (!cliParser.hasOption("shell_command")) {
-      throw new IllegalArgumentException(
-          "No shell command specified to be executed by application master");
-    }
-    shellCommand = cliParser.getOptionValue("shell_command");
-
-    if (cliParser.hasOption("shell_args")) {
-      shellArgs = cliParser.getOptionValue("shell_args");
-    }
-    if (cliParser.hasOption("shell_env")) {
-      String shellEnvs[] = cliParser.getOptionValues("shell_env");
-      for (String env : shellEnvs) {
-        env = env.trim();
-        int index = env.indexOf('=');
-        if (index == -1) {
-          shellEnv.put(env, "");
-          continue;
-        }
-        String key = env.substring(0, index);
-        String val = "";
-        if (index < (env.length() - 1)) {
-          val = env.substring(index + 1);
-        }
-        shellEnv.put(key, val);
-      }
-    }
-
-    if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION)) {
-      shellScriptPath = envs.get(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION);
-
-      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP)) {
-        shellScriptPathTimestamp = Long.valueOf(envs
-            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP));
-      }
-      if (envs.containsKey(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN)) {
-        shellScriptPathLen = Long.valueOf(envs
-            .get(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN));
-      }
-
-      if (!shellScriptPath.isEmpty()
-          && (shellScriptPathTimestamp <= 0 || shellScriptPathLen <= 0)) {
-        LOG.error("Illegal values in env for shell script path" + ", path="
-            + shellScriptPath + ", len=" + shellScriptPathLen + ", timestamp="
-            + shellScriptPathTimestamp);
-        throw new IllegalArgumentException(
-            "Illegal values in env for shell script path");
-      }
-    }
-
-    containerMemory = Integer.parseInt(cliParser.getOptionValue(
-        "container_memory", "10"));
-    numTotalContainers = Integer.parseInt(cliParser.getOptionValue(
-        "num_containers", "1"));
-    if (numTotalContainers == 0) {
-      throw new IllegalArgumentException(
-          "Cannot run distributed shell with no containers");
-    }
-    requestPriority = Integer.parseInt(cliParser
-        .getOptionValue("priority", "0"));
-
-    return true;
-  }
-
-  /**
-   * Helper function to print usage
-   *
-   * @param opts Parsed command line options
-   */
-  private void printUsage(Options opts) {
-    new HelpFormatter().printHelp("ApplicationMaster", opts);
-  }
-
-  /**
-   * Main run function for the application master
-   *
-   * @throws YarnException
-   * @throws IOException
-   */
-  @SuppressWarnings({ "unchecked" })
-  public boolean run() throws YarnException, IOException {
-    LOG.info("Starting ApplicationMaster");
-
-    Credentials credentials =
-        UserGroupInformation.getCurrentUser().getCredentials();
-    DataOutputBuffer dob = new DataOutputBuffer();
-    credentials.writeTokenStorageToStream(dob);
-    // Now remove the AM->RM token so that containers cannot access it.
-    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
-    while (iter.hasNext()) {
-      Token<?> token = iter.next();
-      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
-        iter.remove();
-      }
-    }
-    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-
-    AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
-    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
-    amRMClient.init(conf);
-    amRMClient.start();
-
-    containerListener = createNMCallbackHandler();
-    nmClientAsync = new NMClientAsyncImpl(containerListener);
-    nmClientAsync.init(conf);
-    nmClientAsync.start();
-
-    // Setup local RPC Server to accept status requests directly from clients
-    // TODO need to setup a protocol for client to be able to communicate to
-    // the RPC server
-    // TODO use the rpc port info to register with the RM for the client to
-    // send requests to this app master
-
-    // Register self with ResourceManager
-    // This will start heartbeating to the RM
-    appMasterHostname = NetUtils.getHostname();
-    RegisterApplicationMasterResponse response = amRMClient
-        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
-            appMasterTrackingUrl);
-    // Dump out information about cluster capability as seen by the
-    // resource manager
-    int maxMem = response.getMaximumResourceCapability().getMemory();
-    LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
-
-    // A resource ask cannot exceed the max.
-    if (containerMemory > maxMem) {
-      LOG.info("Container memory specified above max threshold of cluster."
-          + " Using max value." + ", specified=" + containerMemory + ", max="
-          + maxMem);
-      containerMemory = maxMem;
-    }
-
-    // Setup ask for containers from RM
-    // Send request for containers to RM
-    // Until we get our fully allocated quota, we keep on polling RM for
-    // containers
-    // Keep looping until all the containers are launched and shell script
-    // executed on them ( regardless of success/failure).
-    for (int i = 0; i < numTotalContainers; ++i) {
-      ContainerRequest containerAsk = setupContainerAskForRM();
-      amRMClient.addContainerRequest(containerAsk);
-    }
-    numRequestedContainers.set(numTotalContainers);
-
-    while (!done
-        && (numCompletedContainers.get() != numTotalContainers)) {
-      try {
-        Thread.sleep(200);
-      } catch (InterruptedException ex) {}
-    }
-    finish();
-    
-    return success;
-  }
-
-  @VisibleForTesting
-  NMCallbackHandler createNMCallbackHandler() {
-    return new NMCallbackHandler(this);
-  }
-
-  private void finish() {
-    // Join all launched threads
-    // needed for when we time out
-    // and we need to release containers
-    for (Thread launchThread : launchThreads) {
-      try {
-        launchThread.join(10000);
-      } catch (InterruptedException e) {
-        LOG.info("Exception thrown in thread join: " + e.getMessage());
-        e.printStackTrace();
-      }
-    }
-
-    // When the application completes, it should stop all running containers
-    LOG.info("Application completed. Stopping running containers");
-    nmClientAsync.stop();
-
-    // When the application completes, it should send a finish application
-    // signal to the RM
-    LOG.info("Application completed. Signalling finish to RM");
-
-    FinalApplicationStatus appStatus;
-    String appMessage = null;
-    success = true;
-    if (numFailedContainers.get() == 0 && 
-        numCompletedContainers.get() == numTotalContainers) {
-      appStatus = FinalApplicationStatus.SUCCEEDED;
-    } else {
-      appStatus = FinalApplicationStatus.FAILED;
-      appMessage = "Diagnostics." + ", total=" + numTotalContainers
-          + ", completed=" + numCompletedContainers.get() + ", allocated="
-          + numAllocatedContainers.get() + ", failed="
-          + numFailedContainers.get();
-      success = false;
-    }
-    try {
-      amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
-    } catch (YarnException ex) {
-      LOG.error("Failed to unregister application", ex);
-    } catch (IOException e) {
-      LOG.error("Failed to unregister application", e);
-    }
-    
-    amRMClient.stop();
-  }
-  
-  private class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
-    @SuppressWarnings("unchecked")
-    @Override
-    public void onContainersCompleted(List<ContainerStatus> completedContainers) {
-      LOG.info("Got response from RM for container ask, completedCnt="
-          + completedContainers.size());
-      for (ContainerStatus containerStatus : completedContainers) {
-        LOG.info("Got container status for containerID="
-            + containerStatus.getContainerId() + ", state="
-            + containerStatus.getState() + ", exitStatus="
-            + containerStatus.getExitStatus() + ", diagnostics="
-            + containerStatus.getDiagnostics());
-
-        // non complete containers should not be here
-        assert (containerStatus.getState() == ContainerState.COMPLETE);
-
-        // increment counters for completed/failed containers
-        int exitStatus = containerStatus.getExitStatus();
-        if (0 != exitStatus) {
-          // container failed
-          if (ContainerExitStatus.ABORTED != exitStatus) {
-            // shell script failed
-            // counts as completed
-            numCompletedContainers.incrementAndGet();
-            numFailedContainers.incrementAndGet();
-          } else {
-            // container was killed by framework, possibly preempted
-            // we should re-try as the container was lost for some reason
-            numAllocatedContainers.decrementAndGet();
-            numRequestedContainers.decrementAndGet();
-            // we do not need to release the container as it would be done
-            // by the RM
-          }
-        } else {
-          // nothing to do
-          // container completed successfully
-          numCompletedContainers.incrementAndGet();
-          LOG.info("Container completed successfully." + ", containerId="
-              + containerStatus.getContainerId());
-        }
-      }
-      
-      // ask for more containers if any failed
-      int askCount = numTotalContainers - numRequestedContainers.get();
-      numRequestedContainers.addAndGet(askCount);
-
-      if (askCount > 0) {
-        for (int i = 0; i < askCount; ++i) {
-          ContainerRequest containerAsk = setupContainerAskForRM();
-          amRMClient.addContainerRequest(containerAsk);
-        }
-      }
-      
-      if (numCompletedContainers.get() == numTotalContainers) {
-        done = true;
-      }
-    }
-
-    @Override
-    public void onContainersAllocated(List<Container> allocatedContainers) {
-      LOG.info("Got response from RM for container ask, allocatedCnt="
-          + allocatedContainers.size());
-      numAllocatedContainers.addAndGet(allocatedContainers.size());
-      for (Container allocatedContainer : allocatedContainers) {
-        LOG.info("Launching shell command on a new container."
-            + ", containerId=" + allocatedContainer.getId()
-            + ", containerNode=" + allocatedContainer.getNodeId().getHost()
-            + ":" + allocatedContainer.getNodeId().getPort()
-            + ", containerNodeURI=" + allocatedContainer.getNodeHttpAddress()
-            + ", containerResourceMemory"
-            + allocatedContainer.getResource().getMemory());
-        // + ", containerToken"
-        // +allocatedContainer.getContainerToken().getIdentifier().toString());
-
-        LaunchContainerRunnable runnableLaunchContainer =
-            new LaunchContainerRunnable(allocatedContainer, containerListener);
-        Thread launchThread = new Thread(runnableLaunchContainer);
-
-        // launch and start the container on a separate thread to keep
-        // the main thread unblocked
-        // as all containers may not be allocated at one go.
-        launchThreads.add(launchThread);
-        launchThread.start();
-      }
-    }
-
-    @Override
-    public void onShutdownRequest() {
-      done = true;
-    }
-
-    @Override
-    public void onNodesUpdated(List<NodeReport> updatedNodes) {}
-
-    @Override
-    public float getProgress() {
-      // set progress to deliver to RM on next heartbeat
-      float progress = (float) numCompletedContainers.get()
-          / numTotalContainers;
-      return progress;
-    }
-
-    @Override
-    public void onError(Throwable e) {
-      done = true;
-      amRMClient.stop();
-    }
-  }
-
-  @VisibleForTesting
-  static class NMCallbackHandler
-    implements NMClientAsync.CallbackHandler {
-
-    private ConcurrentMap<ContainerId, Container> containers =
-        new ConcurrentHashMap<ContainerId, Container>();
-    private final ApplicationMaster applicationMaster;
-
-    public NMCallbackHandler(ApplicationMaster applicationMaster) {
-      this.applicationMaster = applicationMaster;
-    }
-
-    public void addContainer(ContainerId containerId, Container container) {
-      containers.putIfAbsent(containerId, container);
-    }
-
-    @Override
-    public void onContainerStopped(ContainerId containerId) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Succeeded to stop Container " + containerId);
-      }
-      containers.remove(containerId);
-    }
-
-    @Override
-    public void onContainerStatusReceived(ContainerId containerId,
-        ContainerStatus containerStatus) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Container Status: id=" + containerId + ", status=" +
-            containerStatus);
-      }
-    }
-
-    @Override
-    public void onContainerStarted(ContainerId containerId,
-        Map<String, ByteBuffer> allServiceResponse) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Succeeded to start Container " + containerId);
-      }
-      Container container = containers.get(containerId);
-      if (container != null) {
-        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
-      }
-    }
-
-    @Override
-    public void onStartContainerError(ContainerId containerId, Throwable t) {
-      LOG.error("Failed to start Container " + containerId);
-      containers.remove(containerId);
-      applicationMaster.numCompletedContainers.incrementAndGet();
-      applicationMaster.numFailedContainers.incrementAndGet();
-    }
-
-    @Override
-    public void onGetContainerStatusError(
-        ContainerId containerId, Throwable t) {
-      LOG.error("Failed to query the status of Container " + containerId);
-    }
-
-    @Override
-    public void onStopContainerError(ContainerId containerId, Throwable t) {
-      LOG.error("Failed to stop Container " + containerId);
-      containers.remove(containerId);
-    }
-  }
-
-  /**
-   * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
-   * that will execute the shell command.
-   */
-  private class LaunchContainerRunnable implements Runnable {
-
-    // Allocated container
-    Container container;
-
-    NMCallbackHandler containerListener;
-
-    /**
-     * @param lcontainer Allocated container
-     * @param containerListener Callback handler of the container
-     */
-    public LaunchContainerRunnable(
-        Container lcontainer, NMCallbackHandler containerListener) {
-      this.container = lcontainer;
-      this.containerListener = containerListener;
-    }
-
-    @Override
-    /**
-     * Connects to CM, sets up container launch context 
-     * for shell command and eventually dispatches the container 
-     * start request to the CM. 
-     */
-    public void run() {
-      LOG.info("Setting up container launch container for containerid="
-          + container.getId());
-      ContainerLaunchContext ctx = Records
-          .newRecord(ContainerLaunchContext.class);
-
-      // Set the environment
-      ctx.setEnvironment(shellEnv);
-
-      // Set the local resources
-      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
-
-      // The container for the eventual shell commands needs its own local
-      // resources too.
-      // In this scenario, if a shell script is specified, we need to have it
-      // copied and made available to the container.
-      if (!shellScriptPath.isEmpty()) {
-        LocalResource shellRsrc = Records.newRecord(LocalResource.class);
-        shellRsrc.setType(LocalResourceType.FILE);
-        shellRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
-        try {
-          shellRsrc.setResource(ConverterUtils.getYarnUrlFromURI(new URI(
-              shellScriptPath)));
-        } catch (URISyntaxException e) {
-          LOG.error("Error when trying to use shell script path specified"
-              + " in env, path=" + shellScriptPath);
-          e.printStackTrace();
-
-          // A failure scenario on bad input such as invalid shell script path
-          // We know we cannot continue launching the container
-          // so we should release it.
-          // TODO
-          numCompletedContainers.incrementAndGet();
-          numFailedContainers.incrementAndGet();
-          return;
-        }
-        shellRsrc.setTimestamp(shellScriptPathTimestamp);
-        shellRsrc.setSize(shellScriptPathLen);
-        localResources.put(ExecShellStringPath, shellRsrc);
-      }
-      ctx.setLocalResources(localResources);
-
-      // Set the necessary command to execute on the allocated container
-      Vector<CharSequence> vargs = new Vector<CharSequence>(5);
-
-      // Set executable command
-      vargs.add(shellCommand);
-      // Set shell script path
-      if (!shellScriptPath.isEmpty()) {
-        vargs.add(ExecShellStringPath);
-      }
-
-      // Set args for the shell command if any
-      vargs.add(shellArgs);
-      // Add log redirect params
-      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
-      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
-
-      // Get final commmand
-      StringBuilder command = new StringBuilder();
-      for (CharSequence str : vargs) {
-        command.append(str).append(" ");
-      }
-
-      List<String> commands = new ArrayList<String>();
-      commands.add(command.toString());
-      ctx.setCommands(commands);
-
-      // Set up tokens for the container too. Today, for normal shell commands,
-      // the container in distribute-shell doesn't need any tokens. We are
-      // populating them mainly for NodeManagers to be able to download any
-      // files in the distributed file-system. The tokens are otherwise also
-      // useful in cases, for e.g., when one is running a "hadoop dfs" command
-      // inside the distributed shell.
-      ctx.setTokens(allTokens.duplicate());
-
-      containerListener.addContainer(container.getId(), container);
-      nmClientAsync.startContainerAsync(container, ctx);
-    }
-  }
-
-  /**
-   * Setup the request that will be sent to the RM for the container ask.
-   *
-   * @return the setup ResourceRequest to be sent to RM
-   */
-  private ContainerRequest setupContainerAskForRM() {
-    // setup requirements for hosts
-    // using * as any host will do for the distributed shell app
-    // set the priority for the request
-    Priority pri = Records.newRecord(Priority.class);
-    // TODO - what is the range for priority? how to decide?
-    pri.setPriority(requestPriority);
-
-    // Set up resource type requirements
-    // For now, only memory is supported so we set memory requirements
-    Resource capability = Records.newRecord(Resource.class);
-    capability.setMemory(containerMemory);
-
-    ContainerRequest request = new ContainerRequest(capability, null, null,
-        pri);
-    LOG.info("Requested container ask: " + request.toString());
-    return request;
-  }
-
-  public static ApplicationMaster getInstance() {
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
deleted file mode 100644
index 285d036..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.apache.helix.provisioning.yarn;
-
-import java.net.URI;
-import java.util.List;
-
-
-public interface ApplicationSpec {
-  /**
-   * Returns the name of the application
-   * @return
-   */
-  String getAppName();
-
-  AppConfig getConfig();
-
-  List<String> getServices();
-
-  URI getAppMasterPackage();
-  
-  URI getServicePackage(String serviceName);
-  
-  String getServiceMainClass(String service);
-
-  ServiceConfig getServiceConfig(String serviceName);
-
-  List<TaskConfig> getTaskConfigs();
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/8992aa5a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java
deleted file mode 100644
index 352dc0c..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpecFactory.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package org.apache.helix.provisioning.yarn;
-
-import java.io.InputStream;
-
-public interface ApplicationSpecFactory {
-  
-  ApplicationSpec fromYaml(InputStream yamlFile);
-
-}