You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2014/01/12 08:34:17 UTC
git commit: Participants now connect to Helix cluster
Updated Branches:
refs/heads/helix-provisioning f282a3003 -> c9ddde3e7
Participants now connect to Helix cluster
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/c9ddde3e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/c9ddde3e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/c9ddde3e
Branch: refs/heads/helix-provisioning
Commit: c9ddde3e719d73a3158a050ebdb4a38cb6c91c68
Parents: f282a30
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Sat Jan 11 23:34:07 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Sat Jan 11 23:34:07 2014 -0800
----------------------------------------------------------------------
.../java/org/apache/helix/api/Participant.java | 1 +
.../provisioner/ContainerProvider.java | 4 +-
.../provisioner/ParticipantContainer.java | 12 -----
.../provisioner/ParticipantService.java | 10 +++++
.../stages/ContainerProvisioningStage.java | 2 +-
.../integration/TestLocalContainerProvider.java | 10 ++---
helix-provisioning/pom.xml | 6 ---
.../provisioning/yarn/ApplicationSpec.java | 19 --------
.../apache/helix/provisioning/yarn/Client.java | 30 +++----------
.../provisioning/yarn/ContainerParticipant.java | 36 ++++++++++++---
.../yarn/HelixYarnApplicationMasterMain.java | 11 +++--
.../provisioning/yarn/ParticipantLauncher.java | 47 ++++++++++++++++++++
.../provisioning/yarn/RMCallbackHandler.java | 5 ++-
.../provisioning/yarn/YarnProvisioner.java | 30 +++++++------
.../yarn/YarnProvisionerConfig.java | 1 -
15 files changed, 130 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index fc20968..3ed395b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -39,6 +39,7 @@ import com.google.common.collect.ImmutableMap;
*/
public class Participant {
private final ParticipantConfig _config;
+
private final ContainerConfig _containerConfig;
private final RunningInstance _runningInstance;
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
index c88733f..a95abe0 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerProvider.java
@@ -1,5 +1,7 @@
package org.apache.helix.controller.provisioner;
+import org.apache.helix.api.Participant;
+
import com.google.common.util.concurrent.ListenableFuture;
/*
@@ -27,7 +29,7 @@ public interface ContainerProvider {
ListenableFuture<Boolean> deallocateContainer(ContainerId containerId);
- ListenableFuture<Boolean> startContainer(ContainerId containerId);
+ ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant);
ListenableFuture<Boolean> stopContainer(ContainerId containerId);
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java
deleted file mode 100644
index 7b39aca..0000000
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantContainer.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package org.apache.helix.controller.provisioner;
-
-public class ParticipantContainer {
-
- /**
- * Id request by the target provider
- */
- String requestId;
-
- String allocatedId;
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
new file mode 100644
index 0000000..92b5a24
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
@@ -0,0 +1,10 @@
+package org.apache.helix.controller.provisioner;
+
+public interface ParticipantService {
+
+ boolean init(ServiceConfig serviceConfig);
+
+ boolean start();
+
+ boolean stop();
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 97b80b9..499f904 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -154,7 +154,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
existingInstance);
// create the helix participant and add it to cluster
- ListenableFuture<Boolean> future = provisioner.startContainer(containerId);
+ ListenableFuture<Boolean> future = provisioner.startContainer(containerId, participant);
Futures.addCallback(future, new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
index 7b8a580..2f1d397 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
@@ -242,12 +242,12 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
}
@Override
- public ListenableFuture<Boolean> startContainer(ContainerId containerId) {
- ParticipantService participant =
+ public ListenableFuture<Boolean> startContainer(ContainerId containerId, Participant participant ) {
+ ParticipantService participantService =
new ParticipantService(_clusterId, _containerParticipants.get(containerId));
- participant.startAsync();
- participant.awaitRunning();
- _participants.put(containerId, participant);
+ participantService.startAsync();
+ participantService.awaitRunning();
+ _participants.put(containerId, participantService);
_states.put(containerId, ContainerState.ACTIVE);
started++;
SettableFuture<Boolean> future = SettableFuture.create();
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/pom.xml
----------------------------------------------------------------------
diff --git a/helix-provisioning/pom.xml b/helix-provisioning/pom.xml
index d83bbf2..254d420 100644
--- a/helix-provisioning/pom.xml
+++ b/helix-provisioning/pom.xml
@@ -57,12 +57,6 @@ under the License.
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
deleted file mode 100644
index 6671364..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.helix.provisioning.yarn;
-
-public class ApplicationSpec {
-
- int minContainers;
-
- int maxContainers;
-
- String serviceClass;
-
- String targetProvider;
-
- String stateModel;
-
- String taskClass;
-
- int numTasks;
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
index 3caf8f0..a7a119f 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/Client.java
@@ -131,7 +131,7 @@ public class Client {
// Queue for App master
private String amQueue = "";
// Amt. of memory resource to request for to run the App Master
- private int amMemory = 10;
+ private int amMemory = 1024;
// Application master jar file
private String appMasterArchive = "";
@@ -201,19 +201,13 @@ public class Client {
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
opts = new Options();
- opts.addOption("appname", true, "Application Name. Default value - DistributedShell");
+ opts.addOption("appName", true, "Application Name.");
opts.addOption("priority", true, "Application Priority. Default 0");
opts.addOption("queue", true, "RM Queue in which this application is to be submitted");
opts.addOption("timeout", true, "Application timeout in milliseconds");
opts.addOption("master_memory", true,
"Amount of memory in MB to be requested to run the application master");
opts.addOption("archive", true, "Jar file containing the application master");
- opts.addOption("shell_command", true, "Shell command to be executed by the Application Master");
- opts.addOption("shell_script", true, "Location of the shell script to be executed");
- opts.addOption("shell_args", true, "Command line args for the shell script");
- opts.addOption("shell_env", true,
- "Environment for shell script. Specified as env_key=env_val pairs");
- opts.addOption("shell_cmd_priority", true, "Priority for the shell command containers");
opts.addOption("container_memory", true,
"Amount of memory in MB to be requested to run the shell command");
opts.addOption("num_containers", true,
@@ -258,13 +252,12 @@ public class Client {
if (cliParser.hasOption("debug")) {
debugFlag = true;
-
}
- appName = cliParser.getOptionValue("appname", "DistributedShell");
+ appName = cliParser.getOptionValue("appName");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
- amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
+ amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "1024"));
if (amMemory < 0) {
throw new IllegalArgumentException(
@@ -278,17 +271,6 @@ public class Client {
appMasterArchive = cliParser.getOptionValue("archive");
- containerMemory = Integer.parseInt(cliParser.getOptionValue("container_memory", "10"));
- numContainers = Integer.parseInt(cliParser.getOptionValue("num_containers", "1"));
-
- if (containerMemory < 0 || numContainers < 1) {
- throw new IllegalArgumentException(
- "Invalid no. of containers or container memory specified, exiting."
- + " Specified containerMemory=" + containerMemory + ", numContainer=" + numContainers);
- }
-
- clientTimeout = Integer.parseInt(cliParser.getOptionValue("timeout", "600000"));
-
log4jPropFile = cliParser.getOptionValue("log_properties", "");
return true;
@@ -460,7 +442,7 @@ public class Client {
env.put("appId", "" + appId.getId());
env.put("CLASSPATH", classPathEnv.toString());
env.put("appClasspath", appClassPathEnv.toString());
- env.put("containerParticipantMainClass", "org.apache.helix.provisioning.yarn.ContainerParticipant");
+ env.put("containerParticipantMainClass", "org.apache.helix.provisioning.yarn.ParticipantLauncher");
amContainer.setEnvironment(env);
// Set the necessary command to execute the application master
@@ -623,7 +605,7 @@ public class Client {
// Response can be ignored as it is non-null on success or
// throws an exception in case of failures
- yarnClient.killApplication(appId);
+ //yarnClient.killApplication(appId);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
index b0272e8..9c80d87 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ContainerParticipant.java
@@ -1,15 +1,39 @@
package org.apache.helix.provisioning.yarn;
-import java.util.Arrays;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixParticipant;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+
+import com.google.common.util.concurrent.AbstractService;
-public class ContainerParticipant {
+public class ContainerParticipant extends AbstractService {
private static final Log LOG = LogFactory.getLog(ContainerParticipant.class);
+ private final ClusterId _clusterId;
+ private final ParticipantId _participantId;
+ private HelixParticipant _participant;
+ private HelixConnection _connection;
+
+ public ContainerParticipant(HelixConnection connection, ClusterId clusterId,
+ ParticipantId participantId) {
+ _connection = connection;
+ _clusterId = clusterId;
+ _participantId = participantId;
+ }
+
+ @Override
+ protected void doStart() {
+ _participant = _connection.createParticipant(_clusterId, _participantId);
+ // register statemachine
+ _participant.startAsync();
+ notifyStarted();
+ }
- public static void main(String[] args) throws InterruptedException {
- LOG.info("Starting participant: "+ Arrays.toString(args));
- Thread.currentThread().join();
+ @Override
+ protected void doStop() {
+ _participant.stopAsync();
+ notifyStopped();
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
index 92930ed..8be4754 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
@@ -1,10 +1,12 @@
package org.apache.helix.provisioning.yarn;
+import java.io.File;
import java.util.Map;
import org.I0Itec.zkclient.IDefaultNameSpace;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -41,9 +43,12 @@ public class HelixYarnApplicationMasterMain {
@Override
public void createDefaultNameSpace(ZkClient zkClient) {
-
+
}
};
+ FileUtils.deleteDirectory(new File(dataDir));
+ FileUtils.deleteDirectory(new File(logDir));
+
final ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace);
server.start();
@@ -62,7 +67,7 @@ public class HelixYarnApplicationMasterMain {
YarnProvisioner.applicationMaster = genericApplicationMaster;
String zkAddress = envs.get(Environment.NM_HOST.name()) + ":2181";
- String clusterName = "testCluster";
+ String clusterName = envs.get("appName");
String resourceName = "testResource";
int NUM_PARTITIONS = 6;
int NUM_REPLICAS = 2;
@@ -91,7 +96,7 @@ public class HelixYarnApplicationMasterMain {
// start controller
ControllerId controllerId = ControllerId.from("controller1");
HelixController controller = connection.createController(clusterId, controllerId);
- controller.startAsync(); // TODO: is this really async?
+ controller.startAsync();
Thread shutdownhook = new Thread(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
new file mode 100644
index 0000000..58e7a4f
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ParticipantLauncher.java
@@ -0,0 +1,47 @@
+package org.apache.helix.provisioning.yarn;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Options;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+
+public class ParticipantLauncher {
+ public static void main(String[] args) {
+
+ System.out.println("Starting Helix Participant: " + Arrays.toString(args));
+ Options opts;
+ opts = new Options();
+ opts.addOption("cluster", true, "Cluster name, default app name");
+ opts.addOption("participantId", true, "Participant Id");
+ opts.addOption("zkAddress", true, "Zookeeper address");
+ try {
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ String zkAddress = cliParser.getOptionValue("zkAddress");
+ HelixConnection connection = new ZkHelixConnection(zkAddress);
+ connection.connect();
+ ClusterId clusterId = ClusterId.from(cliParser.getOptionValue("cluster"));
+ ParticipantId participantId = ParticipantId.from(cliParser.getOptionValue("participantId"));
+ ContainerParticipant containerParticipant =
+ new ContainerParticipant(connection, clusterId, participantId);
+ containerParticipant.startAsync();
+ containerParticipant.awaitRunning(60, TimeUnit.SECONDS);
+ Thread.currentThread().join();
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.out.println("Failed to start Helix participant" + e);
+ // System.exit(1);
+ }
+ try {
+ Thread.currentThread().join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
index 6c87bd2..50c38b5 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@ -16,7 +16,7 @@ import com.google.common.util.concurrent.SettableFuture;
class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
private static final Log LOG = LogFactory.getLog(RMCallbackHandler.class);
-
+ long startTime;
/**
*
*/
@@ -27,6 +27,7 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
*/
RMCallbackHandler(GenericApplicationMaster genericApplicationMaster) {
_genericApplicationMaster = genericApplicationMaster;
+ startTime = System.currentTimeMillis();
}
@SuppressWarnings("unchecked")
@@ -96,7 +97,7 @@ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
@Override
public float getProgress() {
// set progress to deliver to RM on next heartbeat
- return 0.5f;
+ return (System.currentTimeMillis()-startTime) % Integer.MAX_VALUE;
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index f74e312..1a903d4 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -65,7 +65,8 @@ public class YarnProvisioner implements Provisioner {
static ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors
.newCachedThreadPool());
Map<ContainerId, Container> allocatedContainersMap = new HashMap<ContainerId, Container>();
- int DEFAULT_CONTAINER = 4;
+ int DEFAULT_CONTAINER = 1;
+ private HelixManager _helixManager;
@Override
public ListenableFuture<ContainerId> allocateContainer(ContainerSpec spec) {
@@ -99,11 +100,11 @@ public class YarnProvisioner implements Provisioner {
}
@Override
- public ListenableFuture<Boolean> startContainer(final ContainerId containerId) {
+ public ListenableFuture<Boolean> startContainer(final ContainerId containerId, Participant participant) {
Container container = allocatedContainersMap.get(containerId);
ContainerLaunchContext launchContext;
try {
- launchContext = createLaunchContext(containerId);
+ launchContext = createLaunchContext(containerId, container, participant);
} catch (Exception e) {
LOG.error("Exception while creating context to launch container:" + containerId, e);
return null;
@@ -118,9 +119,9 @@ public class YarnProvisioner implements Provisioner {
}, service);
}
- private ContainerLaunchContext createLaunchContext(ContainerId containerId) throws Exception {
+ private ContainerLaunchContext createLaunchContext(ContainerId containerId, Container container, Participant participant) throws Exception {
- ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+ ContainerLaunchContext participantContainer = Records.newRecord(ContainerLaunchContext.class);
Map<String, String> envs = System.getenv();
String appName = envs.get("appName");
@@ -158,7 +159,7 @@ public class YarnProvisioner implements Provisioner {
localResources.put("app-pkg", amJarRsrc);
// Set local resource info into app master container launch context
- amContainer.setLocalResources(localResources);
+ participantContainer.setLocalResources(localResources);
// Set the necessary security tokens as needed
// amContainer.setContainerTokens(containerToken);
@@ -166,7 +167,7 @@ public class YarnProvisioner implements Provisioner {
// Set the env variables to be setup in the env where the application master will be run
LOG.info("Set the environment for the application master");
Map<String, String> env = new HashMap<String, String>();
- env.put("app-pkg-path", dst.getName());
+ env.put("app_pkg_path", dst.getName());
// Add AppMaster.jar location to classpath
// At some point we should not be required to add
// the hadoop specific classpaths to the env.
@@ -190,10 +191,9 @@ public class YarnProvisioner implements Provisioner {
classPathEnv.append(':');
classPathEnv.append(System.getProperty("java.class.path"));
}
- System.out.println("classoath" + classPathEnv.toString());
env.put("CLASSPATH", classPathEnv.toString());
- amContainer.setEnvironment(env);
+ participantContainer.setEnvironment(env);
// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
@@ -206,8 +206,9 @@ public class YarnProvisioner implements Provisioner {
// Set class name
vargs.add(containerParticipantMainClass);
// Set params for container participant
- vargs.add("--zk_address " + zkAddress);
- vargs.add("--participantId " + containerId.stringify());
+ vargs.add("--zkAddress " + zkAddress);
+ vargs.add("--cluster " + appName);
+ vargs.add("--participantId " + participant.getId().stringify());
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
@@ -218,11 +219,11 @@ public class YarnProvisioner implements Provisioner {
command.append(str).append(" ");
}
- LOG.info("Completed setting up app master command " + command.toString());
+ LOG.info("Completed setting up container launch command " + command.toString() + " with arguments \n" + vargs);
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
- amContainer.setCommands(commands);
- return amContainer;
+ participantContainer.setCommands(commands);
+ return participantContainer;
}
@Override
@@ -244,6 +245,7 @@ public class YarnProvisioner implements Provisioner {
@Override
public void init(HelixManager helixManager) {
+ _helixManager = helixManager;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/c9ddde3e/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
index 8427c14..0c1dbda 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisionerConfig.java
@@ -5,7 +5,6 @@ import org.apache.helix.controller.provisioner.ProvisionerConfig;
import org.apache.helix.controller.provisioner.ProvisionerRef;
import org.apache.helix.controller.serializer.DefaultStringSerializer;
import org.apache.helix.controller.serializer.StringSerializer;
-import org.apache.helix.integration.TestLocalContainerProvider.LocalProvisioner;
import org.codehaus.jackson.annotate.JsonProperty;
public class YarnProvisionerConfig implements ProvisionerConfig {