You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/30 19:31:16 UTC
git commit: [HELIX-481] Keep controller cache in sync with container
updates
Repository: helix
Updated Branches:
refs/heads/master 961b93090 -> 08371b561
[HELIX-481] Keep controller cache in sync with container updates
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/08371b56
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/08371b56
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/08371b56
Branch: refs/heads/master
Commit: 08371b561353c0b93fb6931f69d3ed01dbb634de
Parents: 961b930
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Jul 29 17:04:24 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Jul 29 17:04:24 2014 -0700
----------------------------------------------------------------------
.../stages/ContainerProvisioningStage.java | 88 ++++++++------------
.../provisioning/yarn/YarnProvisioner.java | 6 +-
2 files changed, 42 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/08371b56/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index f062766..512a480 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -21,7 +21,6 @@ package org.apache.helix.controller.stages;
import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -75,9 +74,9 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
@Override
public void process(ClusterEvent event) throws Exception {
final HelixManager helixManager = event.getAttribute("helixmanager");
+ final HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
final Map<ResourceId, ResourceConfig> resourceMap =
event.getAttribute(AttributeName.RESOURCES.toString());
- final HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
final HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
for (ResourceId resourceId : resourceMap.keySet()) {
@@ -106,6 +105,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
TargetProvider targetProvider = _targetProviderMap.get(resourceId);
ContainerProvider containerProvider = _containerProviderMap.get(resourceId);
final Cluster cluster = event.getAttribute("Cluster");
+ final ClusterDataCache cache = event.getAttribute("ClusterDataCache");
final Collection<Participant> participants = cluster.getParticipantMap().values();
// If a process died, we need to mark it as DISCONNECTED or if the process is ready, mark as
@@ -119,17 +119,17 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
if (!participant.isAlive() && ContainerState.CONNECTED.equals(containerState)) {
// Need to mark as disconnected if process died
LOG.info("Participant " + participantId + " died, marking as DISCONNECTED");
- updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+ updateContainerState(cache, accessor, keyBuilder, cluster, null, participantId,
ContainerState.DISCONNECTED);
} else if (participant.isAlive() && ContainerState.CONNECTING.equals(containerState)) {
// Need to mark as connected only when the live instance is visible
LOG.info("Participant " + participantId + " is ready, marking as CONNECTED");
- updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+ updateContainerState(cache, accessor, keyBuilder, cluster, null, participantId,
ContainerState.CONNECTED);
} else if (!participant.isAlive() && ContainerState.HALTING.equals(containerState)) {
// Need to mark as connected only when the live instance is visible
LOG.info("Participant " + participantId + " is has been killed, marking as HALTED");
- updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+ updateContainerState(cache, accessor, keyBuilder, cluster, null, participantId,
ContainerState.HALTED);
}
}
@@ -152,16 +152,16 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
// allocate new containers
for (final ContainerSpec spec : response.getContainersToAcquire()) {
final ParticipantId participantId = spec.getParticipantId();
- List<String> instancesInCluster =
- helixAdmin.getInstancesInCluster(cluster.getId().stringify());
- if (!instancesInCluster.contains(participantId.stringify())) {
+ if (!cluster.getParticipantMap().containsKey(participantId)) {
// create a new Participant, attach the container spec
InstanceConfig instanceConfig = new InstanceConfig(participantId);
+ instanceConfig.setInstanceEnabled(false);
instanceConfig.setContainerSpec(spec);
// create a helix_participant in ACQUIRING state
instanceConfig.setContainerState(ContainerState.ACQUIRING);
// create the helix participant and add it to cluster
helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
+ cache.requireFullRefresh();
}
LOG.info("Allocating container for " + participantId);
ListenableFuture<ContainerId> future = containerProvider.allocateContainer(spec);
@@ -169,19 +169,14 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
@Override
public void onSuccess(ContainerId containerId) {
LOG.info("Container " + containerId + " acquired. Marking " + participantId);
- InstanceConfig existingInstance =
- helixAdmin
- .getInstanceConfig(cluster.getId().toString(), participantId.toString());
- existingInstance.setContainerId(containerId);
- existingInstance.setContainerState(ContainerState.ACQUIRED);
- accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()),
- existingInstance);
+ updateContainerState(cache, accessor, keyBuilder, cluster, containerId,
+ participantId, ContainerState.ACQUIRED);
}
@Override
public void onFailure(Throwable t) {
LOG.error("Could not allocate a container for participant " + participantId, t);
- updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+ updateContainerState(cache, accessor, keyBuilder, cluster, null, participantId,
ContainerState.FAILED);
}
};
@@ -190,13 +185,9 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
// start new containers
for (final Participant participant : response.getContainersToStart()) {
- final InstanceConfig existingInstance =
- helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
- .toString());
- final ContainerId containerId = existingInstance.getContainerId();
- existingInstance.setContainerState(ContainerState.CONNECTING);
- accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
- existingInstance);
+ final ContainerId containerId = participant.getInstanceConfig().getContainerId();
+ updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
+ ContainerState.CONNECTING);
// create the helix participant and add it to cluster
LOG.info("Starting container " + containerId + " for " + participant.getId());
ListenableFuture<Boolean> future =
@@ -212,7 +203,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
public void onFailure(Throwable t) {
LOG.error("Could not start container" + containerId + "for participant "
+ participant.getId(), t);
- updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+ updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
ContainerState.FAILED);
}
};
@@ -222,13 +213,9 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
// release containers
for (final Participant participant : response.getContainersToRelease()) {
// mark it as finalizing
- final InstanceConfig existingInstance =
- helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
- .toString());
- final ContainerId containerId = existingInstance.getContainerId();
- existingInstance.setContainerState(ContainerState.FINALIZING);
- accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
- existingInstance);
+ final ContainerId containerId = participant.getInstanceConfig().getContainerId();
+ updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
+ ContainerState.FINALIZING);
// remove the participant
LOG.info("Deallocating container " + containerId + " for " + participant.getId());
ListenableFuture<Boolean> future = containerProvider.deallocateContainer(containerId);
@@ -240,13 +227,14 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
.toString());
helixAdmin.dropInstance(cluster.getId().toString(), existingInstance);
+ cache.requireFullRefresh();
}
@Override
public void onFailure(Throwable t) {
LOG.error("Could not deallocate container" + containerId + "for participant "
+ participant.getId(), t);
- updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+ updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
ContainerState.FAILED);
}
};
@@ -255,15 +243,10 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
// stop but don't remove
for (final Participant participant : response.getContainersToStop()) {
- // disable the node first
- final InstanceConfig existingInstance =
- helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
- .toString());
- final ContainerId containerId = existingInstance.getContainerId();
- existingInstance.setInstanceEnabled(false);
- existingInstance.setContainerState(ContainerState.HALTING);
- accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
- existingInstance);
+ // switch to halting
+ final ContainerId containerId = participant.getInstanceConfig().getContainerId();
+ updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
+ ContainerState.HALTING);
// stop the container
LOG.info("Stopping container " + containerId + " for " + participant.getId());
ListenableFuture<Boolean> future = containerProvider.stopContainer(containerId);
@@ -288,7 +271,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
LOG.error(
"Could not stop container" + containerId + "for participant "
+ participant.getId(), t);
- updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+ updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
ContainerState.FAILED);
}
};
@@ -299,7 +282,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
}
/**
- * Update a participant with a new container state
+ * Update a participant with a new container state and invalidate cached state
* @param helixAdmin
* @param accessor
* @param keyBuilder
@@ -307,14 +290,17 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
* @param participantId
* @param state
*/
- private void updateContainerState(HelixAdmin helixAdmin, HelixDataAccessor accessor,
- PropertyKey.Builder keyBuilder, Cluster cluster, ParticipantId participantId,
- ContainerState state) {
- InstanceConfig existingInstance =
- helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
- existingInstance.setContainerState(state);
- existingInstance.setInstanceEnabled(state.equals(ContainerState.CONNECTED));
- accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance);
+ private void updateContainerState(ClusterDataCache cache, HelixDataAccessor accessor,
+ PropertyKey.Builder keyBuilder, Cluster cluster, ContainerId containerId,
+ ParticipantId participantId, ContainerState state) {
+ InstanceConfig delta = new InstanceConfig(participantId);
+ delta.setContainerState(state);
+ if (containerId != null) {
+ delta.setContainerId(containerId);
+ }
+ delta.setInstanceEnabled(state.equals(ContainerState.CONNECTED));
+ accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), delta);
+ cache.requireFullRefresh();
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/08371b56/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index 96a900d..ad2f5f4 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -358,7 +358,11 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
} else if (!existingContainersIdSet.contains(participantId)) {
// Unallocated containers must be allocated
ContainerSpec containerSpec = new ContainerSpec(participantId);
- containerSpec.setMemory(_resourceConfig.getUserConfig().getIntField("memory", 1024));
+ int mem = 1024;
+ if (_resourceConfig.getUserConfig() != null) {
+ mem = _resourceConfig.getUserConfig().getIntField("memory", 1024);
+ }
+ containerSpec.setMemory(mem);
containersToAcquire.add(containerSpec);
}
}