You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/01/12 08:34:17 UTC

git commit: Participants now connect to Helix cluster

Updated Branches:
  refs/heads/helix-provisioning f282a3003 -> c9ddde3e7


Participants now connect to Helix cluster


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c9ddde3e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c9ddde3e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c9ddde3e

Branch: refs/heads/helix-provisioning
Commit: c9ddde3e719d73a3158a050ebdb4a38cb6c91c68
Parents: f282a30
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Sat Jan 11 23:34:07 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Sat Jan 11 23:34:07 2014 -0800

----------------------------------------------------------------------
 .../java/org/apache/helix/api/Participant.java  |  1 +
 .../provisioner/ContainerProvider.java          |  4 +-
 .../provisioner/ParticipantContainer.java       | 12 -----
 .../provisioner/ParticipantService.java         | 10 +++++
 .../stages/ContainerProvisioningStage.java      |  2 +-
 .../integration/TestLocalContainerProvider.java | 10 ++---
 helix-provisioning/pom.xml                      |  6 ---
 .../provisioning/yarn/ApplicationSpec.java      | 19 --------
 .../apache/helix/provisioning/yarn/Client.java  | 30 +++----------
 .../provisioning/yarn/ContainerParticipant.java | 36 ++++++++++++---
 .../yarn/HelixYarnApplicationMasterMain.java    | 11 +++--
 .../provisioning/yarn/ParticipantLauncher.java  | 47 ++++++++++++++++++++
 .../provisioning/yarn/RMCallbackHandler.java    |  5 ++-
 .../provisioning/yarn/YarnProvisioner.java      | 30 +++++++------
 .../yarn/YarnProvisionerConfig.java             |  1 -
 15 files changed, 130 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index fc20968..3ed395b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -39,6 +39,7 @@ import com.google.common.collect.ImmutableMap;
  */
 public class Participant {
   private final ParticipantConfig _config;
+  
   private final ContainerConfig _containerConfig;
 
   private final RunningInstance _runningInstance;

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
index c88733f..a95abe0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
@@ -1,5 +1,7 @@
 package org.apache.helix.controller.provisioner;
 
+import org.apache.helix.api.Participant;
+
 import com.google.common.util.concurrent.ListenableFuture;
 
 /*
@@ -27,7 +29,7 @@ public interface ContainerProvider {
 
   ListenableFuture<Boolean> deallocateContainer(ContainerId containerId);
 
-  ListenableFuture<Boolean> startContainer(ContainerId containerId);
+  ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant);
 
   ListenableFuture<Boolean> stopContainer(ContainerId containerId);
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java
deleted file mode 100644
index 7b39aca..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.helix.controller.provisioner;
-
-public class ParticipantContainer {
-
-  /**
-   * Id request by the target provider
-   */
-  String requestId;
-
-  String allocatedId;
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
new file mode 100644
index 0000000..92b5a24
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
@@ -0,0 +1,10 @@
+package org.apache.helix.controller.provisioner;
+
+public interface ParticipantService {
+
+  boolean init(ServiceConfig serviceConfig);
+  
+  boolean start();
+  
+  boolean stop();
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 97b80b9..499f904 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -154,7 +154,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
           accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // create the helix participant and add it to cluster
-          ListenableFuture<Boolean> future = provisioner.startContainer(containerId);
+          ListenableFuture<Boolean> future = provisioner.startContainer(containerId, participant);
           Futures.addCallback(future, new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
index 7b8a580..2f1d397 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
@@ -242,12 +242,12 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
     }
 
     @Override
-    public ListenableFuture<Boolean> startContainer(ContainerId containerId) {
-      ParticipantService participant =
+    public ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant ) {
+      ParticipantService participantService =
           new ParticipantService(_clusterId, _containerParticipants.get(containerId));
-      participant.startAsync();
-      participant.awaitRunning();
-      _participants.put(containerId, participant);
+      participantService.startAsync();
+      participantService.awaitRunning();
+      _participants.put(containerId, participantService);
       _states.put(containerId, ContainerState.ACTIVE);
       started++;
       SettableFuture<Boolean> future = SettableFuture.create();

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/pom.xml
----------------------------------------------------------------------
diff --git a/helix-provisioning/pom.xml b/helix-provisioning/pom.xml
index d83bbf2..254d420 100644
--- a/helix-provisioning/pom.xml
+++ b/helix-provisioning/pom.xml
@@ -57,12 +57,6 @@ under the License.
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-common</artifactId>
-      <version>${hadoop.version}</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
       <groupId>org.testng</groupId>
       <artifactId>testng</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
deleted file mode 100644
index 6671364..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.helix.provisioning.yarn;
-
-public class ApplicationSpec {
-
-  int minContainers;
-
-  int maxContainers;
-
-  String serviceClass;
-    
-  String targetProvider;
-  
-  String stateModel;
-  
-  String taskClass;
-
-  int numTasks;
-  
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
index 3caf8f0..a7a119f 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
@@ -131,7 +131,7 @@ public class Client {
   // Queue for App master
   private String amQueue = "";
   // Amt. of memory resource to request for to run the App Master
-  private int amMemory = 10;
+  private int amMemory = 1024;
 
   // Application master jar file
   private String appMasterArchive = "";
@@ -201,19 +201,13 @@ public class Client {
     yarnClient = YarnClient.createYarnClient();
     yarnClient.init(conf);
     opts = new Options();
-    opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
+    opts.addOption("appName", true, "Application Name.");
     opts.addOption("priority", true, "Application Priority. Default 0");
     opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
     opts.addOption("timeout", true, "Application timeout in milliseconds");
     opts.addOption("master_memory", true,
         "Amount of memory in MB to be requested to run the application master");
     opts.addOption("archive", true, "Jar file containing the application master");
-    opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
-    opts.addOption("shell_script", true, "Location of the shell script to be executed");
-    opts.addOption("shell_args", true, "Command line args for the shell script");
-    opts.addOption("shell_env", true,
-        "Environment for shell script. Specified as env_key=env_val pairs");
-    opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
     opts.addOption("container_memory", true,
         "Amount of memory in MB to be requested to run the shell command");
     opts.addOption("num_containers", true,
@@ -258,13 +252,12 @@ public class Client {
 
     if (cliParser.hasOption("debug")) {
       debugFlag = true;
-
     }
 
-    appName = cliParser.getOptionValue("appname", "DistributedShell");
+    appName = cliParser.getOptionValue("appName");
     amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
     amQueue = cliParser.getOptionValue("queue", "default");
-    amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
+    amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "1024"));
 
     if (amMemory < 0) {
       throw new IllegalArgumentException(
@@ -278,17 +271,6 @@ public class Client {
 
     appMasterArchive = cliParser.getOptionValue("archive");
 
-    containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
-    numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
-
-    if (containerMemory < 0 || numContainers < 1) {
-      throw new IllegalArgumentException(
-          "Invalid no. of containers or container memory specified, exiting."
-              + " Specified containerMemory=" + containerMemory + ", numContainer=" + numContainers);
-    }
-
-    clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
-
     log4jPropFile = cliParser.getOptionValue("log_properties", "");
 
     return true;
@@ -460,7 +442,7 @@ public class Client {
     env.put("appId", "" + appId.getId());
     env.put("CLASSPATH", classPathEnv.toString());
     env.put("appClasspath", appClassPathEnv.toString());
-    env.put("containerParticipantMainClass", "org.apache.helix.provisioning.yarn.ContainerParticipant");
+    env.put("containerParticipantMainClass", "org.apache.helix.provisioning.yarn.ParticipantLauncher");
     amContainer.setEnvironment(env);
 
     // Set the necessary command to execute the application master
@@ -623,7 +605,7 @@ public class Client {
 
     // Response can be ignored as it is non-null on success or
     // throws an exception in case of failures
-    yarnClient.killApplication(appId);
+    //yarnClient.killApplication(appId);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
index b0272e8..9c80d87 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
@@ -1,15 +1,39 @@
 package org.apache.helix.provisioning.yarn;
 
-import java.util.Arrays;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+
+import com.google.common.util.concurrent.AbstractService;
 
-public class ContainerParticipant {
+public class ContainerParticipant extends AbstractService {
   private static final Log LOG = LogFactory.getLog(ContainerParticipant.class);
+  private final ClusterId _clusterId;
+  private final ParticipantId _participantId;
+  private HelixParticipant _participant;
+  private HelixConnection _connection;
+
+  public ContainerParticipant(HelixConnection connection, ClusterId clusterId,
+      ParticipantId participantId) {
+    _connection = connection;
+    _clusterId = clusterId;
+    _participantId = participantId;
+  }
+
+  @Override
+  protected void doStart() {
+    _participant = _connection.createParticipant(_clusterId, _participantId);
+    // register statemachine
+    _participant.startAsync();
+    notifyStarted();
+  }
 
-  public static void main(String[] args) throws InterruptedException {
-    LOG.info("Starting participant: "+ Arrays.toString(args));
-    Thread.currentThread().join();
+  @Override
+  protected void doStop() {
+    _participant.stopAsync();
+    notifyStopped();
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
index 92930ed..8be4754 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
@@ -1,10 +1,12 @@
 package org.apache.helix.provisioning.yarn;
 
+import java.io.File;
 import java.util.Map;
 
 import org.I0Itec.zkclient.IDefaultNameSpace;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -41,9 +43,12 @@ public class HelixYarnApplicationMasterMain {
 
       @Override
       public void createDefaultNameSpace(ZkClient zkClient) {
-
+        
       }
     };
+    FileUtils.deleteDirectory(new File(dataDir));
+    FileUtils.deleteDirectory(new File(logDir));
+    
     final ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace);
     server.start();
 
@@ -62,7 +67,7 @@ public class HelixYarnApplicationMasterMain {
     YarnProvisioner.applicationMaster = genericApplicationMaster;
 
     String zkAddress = envs.get(Environment.NM_HOST.name()) + ":2181";
-    String clusterName = "testCluster";
+    String clusterName = envs.get("appName");
     String resourceName = "testResource";
     int NUM_PARTITIONS = 6;
     int NUM_REPLICAS = 2;
@@ -91,7 +96,7 @@ public class HelixYarnApplicationMasterMain {
     // start controller
     ControllerId controllerId = ControllerId.from("controller1");
     HelixController controller = connection.createController(clusterId, controllerId);
-    controller.startAsync(); // TODO: is this really async?
+    controller.startAsync(); 
 
     Thread shutdownhook = new Thread(new Runnable() {
       @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
new file mode 100644
index 0000000..58e7a4f
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
@@ -0,0 +1,47 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+
+public class ParticipantLauncher {
+  public static void main(String[] args) {
+
+    System.out.println("Starting Helix Participant: " + Arrays.toString(args));
+    Options opts;
+    opts = new Options();
+    opts.addOption("cluster", true, "Cluster name, default app name");
+    opts.addOption("participantId", true, "Participant Id");
+    opts.addOption("zkAddress", true, "Zookeeper address");
+    try {
+      CommandLine cliParser = new GnuParser().parse(opts, args);
+      String zkAddress = cliParser.getOptionValue("zkAddress");
+      HelixConnection connection = new ZkHelixConnection(zkAddress);
+      connection.connect();
+      ClusterId clusterId = ClusterId.from(cliParser.getOptionValue("cluster"));
+      ParticipantId participantId = ParticipantId.from(cliParser.getOptionValue("participantId"));
+      ContainerParticipant containerParticipant =
+          new ContainerParticipant(connection, clusterId, participantId);
+      containerParticipant.startAsync();
+      containerParticipant.awaitRunning(60, TimeUnit.SECONDS);
+      Thread.currentThread().join();
+    } catch (Exception e) {
+      e.printStackTrace();
+      System.out.println("Failed to start Helix participant" + e);
+      // System.exit(1);
+    }
+    try {
+      Thread.currentThread().join();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
index 6c87bd2..50c38b5 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@ -16,7 +16,7 @@ import com.google.common.util.concurrent.SettableFuture;
 
 class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
   private static final Log LOG = LogFactory.getLog(RMCallbackHandler.class);
-
+  long startTime;
   /**
    * 
    */
@@ -27,6 +27,7 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
    */
   RMCallbackHandler(GenericApplicationMaster genericApplicationMaster) {
     _genericApplicationMaster = genericApplicationMaster;
+    startTime = System.currentTimeMillis();
   }
 
   @SuppressWarnings("unchecked")
@@ -96,7 +97,7 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
   @Override
   public float getProgress() {
     // set progress to deliver to RM on next heartbeat
-    return 0.5f;
+    return (System.currentTimeMillis()-startTime) % Integer.MAX_VALUE;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index f74e312..1a903d4 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -65,7 +65,8 @@ public class YarnProvisioner implements Provisioner {
   static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors
       .newCachedThreadPool());
   Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>();
-  int DEFAULT_CONTAINER = 4;
+  int DEFAULT_CONTAINER = 1;
+  private HelixManager _helixManager;
 
   @Override
   public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) {
@@ -99,11 +100,11 @@ public class YarnProvisioner implements Provisioner {
   }
 
   @Override
-  public ListenableFuture<Boolean> startContainer(final ContainerId containerId) {
+  public ListenableFuture<Boolean> startContainer(final ContainerId containerId, Participant participant) {
     Container container = allocatedContainersMap.get(containerId);
     ContainerLaunchContext launchContext;
     try {
-      launchContext = createLaunchContext(containerId);
+      launchContext = createLaunchContext(containerId, container, participant);
     } catch (Exception e) {
       LOG.error("Exception while creating context to launch container:" + containerId, e);
       return null;
@@ -118,9 +119,9 @@ public class YarnProvisioner implements Provisioner {
     }, service);
   }
 
-  private ContainerLaunchContext createLaunchContext(ContainerId containerId) throws Exception {
+  private ContainerLaunchContext createLaunchContext(ContainerId containerId, Container container, Participant participant) throws Exception {
 
-    ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+    ContainerLaunchContext participantContainer = Records.newRecord(ContainerLaunchContext.class);
 
     Map<String, String> envs = System.getenv();
     String appName = envs.get("appName");
@@ -158,7 +159,7 @@ public class YarnProvisioner implements Provisioner {
     localResources.put("app-pkg", amJarRsrc);
 
     // Set local resource info into app master container launch context
-    amContainer.setLocalResources(localResources);
+    participantContainer.setLocalResources(localResources);
 
     // Set the necessary security tokens as needed
     // amContainer.setContainerTokens(containerToken);
@@ -166,7 +167,7 @@ public class YarnProvisioner implements Provisioner {
     // Set the env variables to be setup in the env where the application master will be run
     LOG.info("Set the environment for the application master");
     Map<String, String> env = new HashMap<String, String>();
-    env.put("app-pkg-path", dst.getName());
+    env.put("app_pkg_path", dst.getName());
     // Add AppMaster.jar location to classpath
     // At some point we should not be required to add
     // the hadoop specific classpaths to the env.
@@ -190,10 +191,9 @@ public class YarnProvisioner implements Provisioner {
       classPathEnv.append(':');
       classPathEnv.append(System.getProperty("java.class.path"));
     }
-    System.out.println("classoath" + classPathEnv.toString());
     env.put("CLASSPATH", classPathEnv.toString());
 
-    amContainer.setEnvironment(env);
+    participantContainer.setEnvironment(env);
 
     // Set the necessary command to execute the application master
     Vector<CharSequence> vargs = new Vector<CharSequence>(30);
@@ -206,8 +206,9 @@ public class YarnProvisioner implements Provisioner {
     // Set class name
     vargs.add(containerParticipantMainClass);
     // Set params for container participant
-    vargs.add("--zk_address " + zkAddress);
-    vargs.add("--participantId " + containerId.stringify());
+    vargs.add("--zkAddress " + zkAddress);
+    vargs.add("--cluster " + appName);
+    vargs.add("--participantId " + participant.getId().stringify());
 
     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
@@ -218,11 +219,11 @@ public class YarnProvisioner implements Provisioner {
       command.append(str).append(" ");
     }
 
-    LOG.info("Completed setting up app master command " + command.toString());
+    LOG.info("Completed setting up  container launch command " + command.toString() + " with arguments \n" + vargs);
     List<String> commands = new ArrayList<String>();
     commands.add(command.toString());
-    amContainer.setCommands(commands);
-    return amContainer;
+    participantContainer.setCommands(commands);
+    return participantContainer;
   }
 
   @Override
@@ -244,6 +245,7 @@ public class YarnProvisioner implements Provisioner {
 
   @Override
   public void init(HelixManager helixManager) {
+    _helixManager = helixManager;
 
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
index 8427c14..0c1dbda 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
@@ -5,7 +5,6 @@ import org.apache.helix.controller.provisioner.ProvisionerConfig;
 import org.apache.helix.controller.provisioner.ProvisionerRef;
 import org.apache.helix.controller.serializer.DefaultStringSerializer;
 import org.apache.helix.controller.serializer.StringSerializer;
-import org.apache.helix.integration.TestLocalContainerProvider.LocalProvisioner;
 import org.codehaus.jackson.annotate.JsonProperty;
 
 public class YarnProvisionerConfig implements ProvisionerConfig {