You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:04:49 UTC
[06/50] [abbrv] git commit: Almost complete working example of
Helloworld
Almost complete working example of Helloworld
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/8b19cfc7
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/8b19cfc7
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/8b19cfc7
Branch: refs/heads/master
Commit: 8b19cfc77b0ddd6bc90dcb034cfbd9b983ff2932
Parents: 57b4b18
Author: Kishore Gopalakrishna <g....@gmail.com>
Authored: Thu Feb 20 22:08:18 2014 -0800
Committer: Kishore Gopalakrishna <g....@gmail.com>
Committed: Thu Feb 20 22:08:18 2014 -0800
----------------------------------------------------------------------
.../controller/provisioner/ContainerSpec.java | 19 ++++--
.../stages/ContainerProvisioningStage.java | 23 ++++---
.../manager/zk/AbstractParticipantService.java | 68 ++++++++++++++-----
.../integration/TestLocalContainerProvider.java | 4 +-
.../provisioning/yarn/ApplicationSpec.java | 4 +-
.../yarn/HelixYarnApplicationMasterMain.java | 40 ++++++-----
.../helix/provisioning/yarn/ServiceConfig.java | 14 ++--
.../yarn/YamlApplicationSpecFactory.java | 70 --------------------
.../provisioning/yarn/YarnProvisioner.java | 53 ++++++---------
.../yarn/example/HelloWorldService.java | 40 +++++++----
.../yarn/example/HelloworldAppSpec.java | 23 +++----
.../main/resources/hello_world_app_spec.yaml | 3 +-
12 files changed, 177 insertions(+), 184 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
index 4d3a521..ab3c46a 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ContainerSpec.java
@@ -1,5 +1,7 @@
package org.apache.helix.controller.provisioner;
+import org.apache.helix.api.id.ParticipantId;
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -27,8 +29,10 @@ public class ContainerSpec {
int _memory;
- public ContainerSpec(ContainerId containerId) {
- this._containerId = containerId;
+ private ParticipantId _participantId;
+
+ public ContainerSpec(ParticipantId _participantId) {
+ this._participantId = _participantId;
}
public ContainerId getContainerId() {
@@ -37,7 +41,7 @@ public class ContainerSpec {
@Override
public String toString() {
- return _containerId.toString();
+ return _participantId.toString();
}
public void setMemory(int memory){
@@ -49,6 +53,13 @@ public class ContainerSpec {
}
public static ContainerSpec from(String serialized) {
- return new ContainerSpec(ContainerId.from(serialized));
+ //todo
+ return null;
+ //return new ContainerSpec(ContainerId.from(serialized));
}
+
+ public ParticipantId getParticipantId() {
+ return _participantId;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 42c8218..f7105d1 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -21,6 +21,7 @@ package org.apache.helix.controller.stages;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.helix.HelixAdmin;
@@ -121,16 +122,17 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
// allocate new containers
for (final ContainerSpec spec : response.getContainersToAcquire()) {
- // random participant id
- final ParticipantId participantId = ParticipantId.from(spec.getContainerId().stringify());
- // create a new Participant, attach the container spec
- InstanceConfig instanceConfig = new InstanceConfig(participantId);
- instanceConfig.setContainerSpec(spec);
- // create a helix_participant in ACQUIRING state
- instanceConfig.setContainerState(ContainerState.ACQUIRING);
- // create the helix participant and add it to cluster
- helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
-
+ final ParticipantId participantId = spec.getParticipantId();
+ List<String> instancesInCluster = helixAdmin.getInstancesInCluster(cluster.getId().stringify());
+ if (!instancesInCluster.contains(participantId.stringify())) {
+ // create a new Participant, attach the container spec
+ InstanceConfig instanceConfig = new InstanceConfig(participantId);
+ instanceConfig.setContainerSpec(spec);
+ // create a helix_participant in ACQUIRING state
+ instanceConfig.setContainerState(ContainerState.ACQUIRING);
+ // create the helix participant and add it to cluster
+ helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
+ }
ListenableFuture<ContainerId> future = containerProvider.allocateContainer(spec);
FutureCallback<ContainerId> callback = new FutureCallback<ContainerId>() {
@Override
@@ -160,7 +162,6 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
.toString());
final ContainerId containerId = existingInstance.getContainerId();
- existingInstance.setContainerId(containerId);
existingInstance.setContainerState(ContainerState.CONNECTING);
accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
existingInstance);
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
index 2e5eafa..f515092 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/AbstractParticipantService.java
@@ -35,6 +35,7 @@ public abstract class AbstractParticipantService extends AbstractService {
private final ParticipantId _participantId;
private HelixParticipant _participant;
private HelixConnection _connection;
+ boolean initialized;
/**
* Initialize the service.
@@ -50,20 +51,22 @@ public abstract class AbstractParticipantService extends AbstractService {
}
@Override
- protected void doStart() {
+ protected final void doStart() {
_participant = _connection.createParticipant(_clusterId, _participantId);
// add a preconnect callback
_participant.addPreConnectCallback(new PreConnectCallback() {
@Override
public void onPreConnect() {
- onPreJoinCluster();
+ if (initialized) {
+ onReconnect();
+ } else {
+ init();
+ initialized = true;
+ }
}
});
- // register state machine and other initialization
- init();
-
// start and notify
if (!_connection.isConnected()) {
_connection.connect();
@@ -73,34 +76,67 @@ public abstract class AbstractParticipantService extends AbstractService {
}
@Override
- protected void doStop() {
+ protected final void doStop() {
_participant.stop();
notifyStopped();
}
/**
- * Initialize the participant. For example, here is where you can register a state machine: <br/>
+ * Invoked when connection is re-established to zookeeper. Typical scenario this is invoked is
+ * when there is a long GC pause that causes the node to disconnect from the cluster and
+ * reconnects. NOTE: When the service disconnects all its states are reset to initial state.
+ */
+ protected void onReconnect() {
+ // default implementation does nothing.
+ }
+
+ /**
+ * Initialize the participant. For example, here is where you can
+ * <ul>
+ * <li>Read configuration of the cluster,resource, node</li>
+ * <li>Read configuration of the cluster,resource, node register a state machine: <br/>
* <br/>
* <code>
* HelixParticipant participant = getParticipant();
* participant.getStateMachineEngine().registerStateModelFactory(stateModelDefId, factory);
* </code><br/>
* <br/>
- * This code is called prior to starting the participant.
+ * </li>
+ * </ul>
+ * This code is called after connecting to zookeeper but before creating the liveinstance.
*/
- public abstract void init();
-
- /**
- * Complete any tasks that require a live Helix connection. This function is called before the
- * participant declares itself ready to receive state transitions.
- */
- public abstract void onPreJoinCluster();
+ protected abstract void init();
/**
* Get an instantiated participant instance.
* @return HelixParticipant
*/
- public HelixParticipant getParticipant() {
+ protected HelixParticipant getParticipant() {
return _participant;
}
+
+ /**
+ * @return ClusterId
+ * @see {@link ClusterId}
+ */
+ public ClusterId getClusterId() {
+ return _clusterId;
+ }
+
+ /**
+ * @see {@link ParticipantId}
+ * @return
+ */
+ public ParticipantId getParticipantId() {
+ return _participantId;
+ }
+
+ /**
+ * @see {@link HelixConnection}
+ * @return HelixConnection
+ */
+ public HelixConnection getConnection() {
+ return _connection;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
index 0f7be64..f4153cc 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/TestLocalContainerProvider.java
@@ -277,8 +277,8 @@ public class TestLocalContainerProvider extends ZkUnitTestBase {
List<ContainerSpec> containersToAcquire = Lists.newArrayList();
boolean asked = false;
if (_askCount < MAX_PARTICIPANTS) {
- containersToAcquire.add(new ContainerSpec(ContainerId.from("container" + _askCount)));
- containersToAcquire.add(new ContainerSpec(ContainerId.from("container" + (_askCount + 1))));
+ containersToAcquire.add(new ContainerSpec(ParticipantId.from("container" + _askCount)));
+ containersToAcquire.add(new ContainerSpec(ParticipantId.from("container" + (_askCount + 1))));
asked = true;
}
List<Participant> containersToStart = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
index e104578..285d036 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ApplicationSpec.java
@@ -3,8 +3,6 @@ package org.apache.helix.provisioning.yarn;
import java.net.URI;
import java.util.List;
-import org.apache.helix.api.config.ParticipantConfig;
-import org.apache.helix.api.id.ParticipantId;
public interface ApplicationSpec {
/**
@@ -23,7 +21,7 @@ public interface ApplicationSpec {
String getServiceMainClass(String service);
- ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId);
+ ServiceConfig getServiceConfig(String serviceName);
List<TaskConfig> getTaskConfigs();
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
index 058b384..33183c7 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/HelixYarnApplicationMasterMain.java
@@ -46,7 +46,7 @@ public class HelixYarnApplicationMasterMain {
public static void main(String[] args) throws Exception {
Map<String, String> env = System.getenv();
LOG.info("Starting app master with the following environment variables");
- for(String key: env.keySet()){
+ for (String key : env.keySet()) {
LOG.info(key + "\t\t=" + env.get(key));
}
int numContainers = 1;
@@ -93,11 +93,11 @@ public class HelixYarnApplicationMasterMain {
YarnProvisioner.applicationMaster = genericApplicationMaster;
YarnProvisioner.applicationMasterConfig = appMasterConfig;
- YarnProvisioner.applicationSpec = factory.fromYaml(new FileInputStream(configFile));
+ ApplicationSpec applicationSpec = factory.fromYaml(new FileInputStream(configFile));
+ YarnProvisioner.applicationSpec = applicationSpec;
String zkAddress = appMasterConfig.getZKAddress();
String clusterName = appMasterConfig.getAppName();
-
- String resourceName = "HelloWorld";
+
// CREATE CLUSTER and setup the resources
// connect
ZkHelixConnection connection = new ZkHelixConnection(zkAddress);
@@ -110,17 +110,27 @@ public class HelixYarnApplicationMasterMain {
new StateModelDefinition(StateModelConfigGenerator.generateConfigForStatelessService());
clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId).addStateModelDefinition(
statelessService).build());
-
- // add the resource with the local provisioner
- ResourceId resourceId = ResourceId.from(resourceName);
- YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
- provisionerConfig.setNumContainers(numContainers);
- RebalancerConfig rebalancerConfig =
- new FullAutoRebalancerConfig.Builder(resourceId).stateModelDefId(
- statelessService.getStateModelDefId()).build();
- clusterAccessor.addResourceToCluster(new ResourceConfig.Builder(ResourceId.from(resourceName))
- .provisionerConfig(provisionerConfig).rebalancerConfig(rebalancerConfig).build());
-
+ for (String service : applicationSpec.getServices()) {
+ String resourceName = service;
+ // add the resource with the local provisioner
+ ResourceId resourceId = ResourceId.from(resourceName);
+ YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
+ ServiceConfig serviceConfig = applicationSpec.getServiceConfig(resourceName);
+ provisionerConfig.setNumContainers(serviceConfig.getIntField("num_containers", 1));
+ serviceConfig.setSimpleField("service_name", service);
+ FullAutoRebalancerConfig.Builder rebalancerConfigBuilder =
+ new FullAutoRebalancerConfig.Builder(resourceId);
+ RebalancerConfig rebalancerConfig =
+ rebalancerConfigBuilder.stateModelDefId(statelessService.getStateModelDefId())//
+ .build();
+ ResourceConfig.Builder resourceConfigBuilder =
+ new ResourceConfig.Builder(ResourceId.from(resourceName));
+ ResourceConfig resourceConfig = resourceConfigBuilder.provisionerConfig(provisionerConfig) //
+ .rebalancerConfig(rebalancerConfig) //
+ .userConfig(serviceConfig) //
+ .build();
+ clusterAccessor.addResourceToCluster(resourceConfig);
+ }
// start controller
ControllerId controllerId = ControllerId.from("controller1");
HelixController controller = connection.createController(clusterId, controllerId);
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java
index 4d9173e..87b5f12 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/ServiceConfig.java
@@ -3,11 +3,15 @@ package org.apache.helix.provisioning.yarn;
import java.util.HashMap;
import java.util.Map;
-public class ServiceConfig {
+import org.apache.helix.api.Scope;
+import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.id.ResourceId;
+
+public class ServiceConfig extends UserConfig{
public Map<String, String> config = new HashMap<String, String>();
- public String getValue(String key) {
- return (config != null ? config.get(key) : null);
- }
-
+ public ServiceConfig(Scope<ResourceId> scope) {
+ super(scope);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
deleted file mode 100644
index e87a5c2..0000000
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YamlApplicationSpecFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package org.apache.helix.provisioning.yarn;
-
-import java.io.InputStream;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.api.config.ParticipantConfig;
-import org.apache.helix.api.id.ParticipantId;
-import org.yaml.snakeyaml.Yaml;
-
-class DefaultApplicationSpec implements ApplicationSpec {
- public String appName;
- public Integer minContainers;
- public Integer maxContainers;
-
- public AppConfig appConfig;
-
- public List<String> services;
- public Map<String, ServiceConfig> serviceConfigMap;
-
- @Override
- public String getAppName() {
- return appName;
- }
-
- @Override
- public AppConfig getConfig() {
- return appConfig;
- }
-
- @Override
- public List<String> getServices() {
- return services;
- }
-
- @Override
- public URI getServicePackage(String serviceName) {
- return null;
- }
-
- @Override
- public ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId) {
- return null;
- }
-
- @Override
- public List<TaskConfig> getTaskConfigs() {
- return null;
- }
-
- @Override
- public URI getAppMasterPackage() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public String getServiceMainClass(String service) {
- // TODO Auto-generated method stub
- return null;
- }
-}
-
-public class YamlApplicationSpecFactory {
- ApplicationSpec fromYaml(InputStream input) {
- Yaml yaml = new Yaml();
- return yaml.loadAs(input, DefaultApplicationSpec.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index daac87b..8fd308e 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -210,8 +210,7 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
vargs.add("--cluster " + appName);
vargs.add("--participantId " + participant.getId().stringify());
vargs.add("--participantClass " + mainClass);
- ;
-
+
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/ContainerParticipant.stderr");
@@ -262,13 +261,13 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
int targetNumContainers = provisionerConfig.getNumContainers();
// Any container that is in a state should be put in this set
- Set<ContainerId> existingContainersIdSet = new HashSet<ContainerId>();
+ Set<ParticipantId> existingContainersIdSet = new HashSet<ParticipantId>();
// Cache halted containers to determine which to restart and which to release
- Map<ContainerId, Participant> excessHaltedContainers = Maps.newHashMap();
+ Map<ParticipantId, Participant> excessHaltedContainers = Maps.newHashMap();
// Cache participants to ensure that excess participants are stopped
- Map<ContainerId, Participant> excessActiveContainers = Maps.newHashMap();
+ Map<ParticipantId, Participant> excessActiveContainers = Maps.newHashMap();
for (Participant participant : participants) {
ContainerConfig containerConfig = participant.getContainerConfig();
@@ -276,35 +275,35 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
ContainerState state = containerConfig.getState();
switch (state) {
case ACQUIRING:
- existingContainersIdSet.add(containerConfig.getId());
+ existingContainersIdSet.add(participant.getId());
break;
case ACQUIRED:
// acquired containers are ready to start
- existingContainersIdSet.add(containerConfig.getId());
+ existingContainersIdSet.add(participant.getId());
containersToStart.add(participant);
break;
case CONNECTING:
- existingContainersIdSet.add(containerConfig.getId());
+ existingContainersIdSet.add(participant.getId());
break;
case CONNECTED:
// active containers can be stopped or kept active
- existingContainersIdSet.add(containerConfig.getId());
- excessActiveContainers.put(containerConfig.getId(), participant);
+ existingContainersIdSet.add(participant.getId());
+ excessActiveContainers.put(participant.getId(), participant);
break;
case DISCONNECTED:
// disconnected containers must be stopped
- existingContainersIdSet.add(containerConfig.getId());
+ existingContainersIdSet.add(participant.getId());
containersToStop.add(participant);
case HALTING:
- existingContainersIdSet.add(containerConfig.getId());
+ existingContainersIdSet.add(participant.getId());
break;
case HALTED:
// halted containers can be released or restarted
- existingContainersIdSet.add(containerConfig.getId());
- excessHaltedContainers.put(containerConfig.getId(), participant);
+ existingContainersIdSet.add(participant.getId());
+ excessHaltedContainers.put(participant.getId(), participant);
break;
case FINALIZING:
- existingContainersIdSet.add(containerConfig.getId());
+ existingContainersIdSet.add(participant.getId());
break;
case FINALIZED:
break;
@@ -316,29 +315,21 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
default:
break;
}
- ContainerId containerId = containerConfig.getId();
- if (containerId != null) {
- // _containerParticipants.put(containerId, participant.getId());
- // _states.put(containerId, state);
- }
}
}
for (int i = 0; i < targetNumContainers; i++) {
- ContainerId containerId = ContainerId.from(resourceId + "_container_" + (i));
- excessActiveContainers.remove(containerId); // don't stop this container if active
- if (excessHaltedContainers.containsKey(containerId)) {
+ ParticipantId participantId = ParticipantId.from(resourceId + "_container_" + (i));
+ excessActiveContainers.remove(participantId); // don't stop this container if active
+ if (excessHaltedContainers.containsKey(participantId)) {
// Halted containers can be restarted if necessary
- Participant participant = excessHaltedContainers.get(containerId);
+ Participant participant = excessHaltedContainers.get(participantId);
containersToStart.add(participant);
- excessHaltedContainers.remove(containerId); // don't release this container
- } else if (!existingContainersIdSet.contains(containerId)) {
+ excessHaltedContainers.remove(participantId); // don't release this container
+ } else if (!existingContainersIdSet.contains(participantId)) {
// Unallocated containers must be allocated
- ContainerSpec containerSpec = new ContainerSpec(containerId);
- ParticipantId participantId = ParticipantId.from(containerId.stringify());
- ParticipantConfig participantConfig =
- applicationSpec.getParticipantConfig(resourceId.stringify(), participantId);
- containerSpec.setMemory(participantConfig.getUserConfig().getIntField("memory", 1024));
+ ContainerSpec containerSpec = new ContainerSpec(participantId);
+ containerSpec.setMemory(_resourceConfig.getUserConfig().getIntField("memory", 1024));
containersToAcquire.add(containerSpec);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
index 614be36..f65fd5d 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloWorldService.java
@@ -1,28 +1,40 @@
package org.apache.helix.provisioning.yarn.example;
import org.apache.helix.HelixConnection;
+import org.apache.helix.api.accessor.ResourceAccessor;
+import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.manager.zk.AbstractParticipantService;
-
+import org.apache.log4j.Logger;
public class HelloWorldService extends AbstractParticipantService {
- public HelloWorldService(HelixConnection connection, ClusterId clusterId,
- ParticipantId participantId) {
- super(connection, clusterId, participantId);
- }
-
- @Override
- public void init() {
- HelloWorldStateModelFactory stateModelFactory = new HelloWorldStateModelFactory();
- getParticipant().getStateMachineEngine().registerStateModelFactory(StateModelDefId.from("StatelessService"), stateModelFactory);
- }
+ private static Logger LOG = Logger.getLogger(AbstractParticipantService.class);
+
+ static String SERVICE_NAME = "HelloWorld";
+
+ public HelloWorldService(HelixConnection connection, ClusterId clusterId,
+ ParticipantId participantId) {
+ super(connection, clusterId, participantId);
+ }
+ /**
+ * init method to setup appropriate call back handlers.
+ */
@Override
- public void onPreJoinCluster() {
- //this will be invoked prior to
+ public void init() {
+ ClusterId clusterId = getClusterId();
+ ResourceAccessor resourceAccessor = getConnection().createResourceAccessor(clusterId);
+ UserConfig serviceConfig = resourceAccessor.readUserConfig(ResourceId.from(SERVICE_NAME));
+ LOG.info("Starting service:" + SERVICE_NAME + " with configuration:" + serviceConfig);
+
+ HelloWorldStateModelFactory stateModelFactory = new HelloWorldStateModelFactory();
+ getParticipant().getStateMachineEngine().registerStateModelFactory(
+ StateModelDefId.from("StatelessService"), stateModelFactory);
+
}
-}
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java
index 2e4cd75..e22c7b2 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/example/HelloworldAppSpec.java
@@ -7,27 +7,31 @@ import java.util.Map;
import org.apache.helix.api.Scope;
import org.apache.helix.api.config.ParticipantConfig;
+import org.apache.helix.api.config.ResourceConfig;
+import org.apache.helix.api.config.ResourceConfig.Builder;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.ResourceId;
import org.apache.helix.provisioning.yarn.AppConfig;
import org.apache.helix.provisioning.yarn.ApplicationSpec;
+import org.apache.helix.provisioning.yarn.ServiceConfig;
import org.apache.helix.provisioning.yarn.TaskConfig;
public class HelloworldAppSpec implements ApplicationSpec {
- private String _appName;
+ public String _appName;
- private AppConfig _appConfig;
+ public AppConfig _appConfig;
- private List<String> _services;
+ public List<String> _services;
private String _appMasterPackageUri;
-
+
private Map<String, String> _servicePackageURIMap;
private Map<String, String> _serviceMainClassMap;
- private Map<String,Map<String,String>> _serviceConfigMap;
+ private Map<String, Map<String, String>> _serviceConfigMap;
private List<TaskConfig> _taskConfigs;
@@ -122,13 +126,8 @@ public class HelloworldAppSpec implements ApplicationSpec {
}
@Override
- public ParticipantConfig getParticipantConfig(String serviceName, ParticipantId participantId) {
- ParticipantConfig.Builder builder = new ParticipantConfig.Builder(participantId);
- Scope<ParticipantId> scope = Scope.participant(participantId);
- UserConfig userConfig = new UserConfig(scope);
- Map<String, String> map = _serviceConfigMap.get(serviceName);
- userConfig.setSimpleFields(map);
- return builder.addTag(serviceName).userConfig(userConfig ).build();
+ public ServiceConfig getServiceConfig(String serviceName) {
+ return new ServiceConfig(Scope.resource(ResourceId.from(serviceName)));
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/8b19cfc7/helix-provisioning/src/main/resources/hello_world_app_spec.yaml
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/resources/hello_world_app_spec.yaml b/helix-provisioning/src/main/resources/hello_world_app_spec.yaml
index 648104a..1d4f1b7 100644
--- a/helix-provisioning/src/main/resources/hello_world_app_spec.yaml
+++ b/helix-provisioning/src/main/resources/hello_world_app_spec.yaml
@@ -7,7 +7,8 @@ appMasterPackageUri: 'file:///Users/kgopalak/Documents/projects/incubator-helix/
appName: testApp
serviceConfigMap:
HelloWorld: {
- k1: v1
+ num_containers: 3,
+ memory: 1024
}
serviceMainClassMap: {
HelloWorld: org.apache.helix.provisioning.yarn.example.HelloWorldService