You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/30 19:31:16 UTC

git commit: [HELIX-481] Keep controller cache in sync with container updates

Repository: helix
Updated Branches:
  refs/heads/master 961b93090 -> 08371b561


[HELIX-481] Keep controller cache in sync with container updates


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/08371b56
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/08371b56
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/08371b56

Branch: refs/heads/master
Commit: 08371b561353c0b93fb6931f69d3ed01dbb634de
Parents: 961b930
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Tue Jul 29 17:04:24 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Tue Jul 29 17:04:24 2014 -0700

----------------------------------------------------------------------
 .../stages/ContainerProvisioningStage.java      | 88 ++++++++------------
 .../provisioning/yarn/YarnProvisioner.java      |  6 +-
 2 files changed, 42 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/08371b56/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index f062766..512a480 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -21,7 +21,6 @@ package org.apache.helix.controller.stages;
 
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -75,9 +74,9 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
   @Override
   public void process(ClusterEvent event) throws Exception {
     final HelixManager helixManager = event.getAttribute("helixmanager");
+    final HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
     final Map<ResourceId, ResourceConfig> resourceMap =
         event.getAttribute(AttributeName.RESOURCES.toString());
-    final HelixAdmin helixAdmin = helixManager.getClusterManagmentTool();
     final HelixDataAccessor accessor = helixManager.getHelixDataAccessor();
     final PropertyKey.Builder keyBuilder = accessor.keyBuilder();
     for (ResourceId resourceId : resourceMap.keySet()) {
@@ -106,6 +105,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
         TargetProvider targetProvider = _targetProviderMap.get(resourceId);
         ContainerProvider containerProvider = _containerProviderMap.get(resourceId);
         final Cluster cluster = event.getAttribute("Cluster");
+        final ClusterDataCache cache = event.getAttribute("ClusterDataCache");
         final Collection<Participant> participants = cluster.getParticipantMap().values();
 
         // If a process died, we need to mark it as DISCONNECTED or if the process is ready, mark as
@@ -119,17 +119,17 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
             if (!participant.isAlive() && ContainerState.CONNECTED.equals(containerState)) {
               // Need to mark as disconnected if process died
               LOG.info("Participant " + participantId + " died, marking as DISCONNECTED");
-              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+              updateContainerState(cache, accessor, keyBuilder, cluster, null, participantId,
                   ContainerState.DISCONNECTED);
             } else if (participant.isAlive() && ContainerState.CONNECTING.equals(containerState)) {
               // Need to mark as connected only when the live instance is visible
               LOG.info("Participant " + participantId + " is ready, marking as CONNECTED");
-              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+              updateContainerState(cache, accessor, keyBuilder, cluster, null, participantId,
                   ContainerState.CONNECTED);
             } else if (!participant.isAlive() && ContainerState.HALTING.equals(containerState)) {
               // Need to mark as connected only when the live instance is visible
               LOG.info("Participant " + participantId + " is has been killed, marking as HALTED");
-              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+              updateContainerState(cache, accessor, keyBuilder, cluster, null, participantId,
                   ContainerState.HALTED);
             }
           }
@@ -152,16 +152,16 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
         // allocate new containers
         for (final ContainerSpec spec : response.getContainersToAcquire()) {
           final ParticipantId participantId = spec.getParticipantId();
-          List<String> instancesInCluster =
-              helixAdmin.getInstancesInCluster(cluster.getId().stringify());
-          if (!instancesInCluster.contains(participantId.stringify())) {
+          if (!cluster.getParticipantMap().containsKey(participantId)) {
             // create a new Participant, attach the container spec
             InstanceConfig instanceConfig = new InstanceConfig(participantId);
+            instanceConfig.setInstanceEnabled(false);
             instanceConfig.setContainerSpec(spec);
             // create a helix_participant in ACQUIRING state
             instanceConfig.setContainerState(ContainerState.ACQUIRING);
             // create the helix participant and add it to cluster
             helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
+            cache.requireFullRefresh();
           }
           LOG.info("Allocating container for " + participantId);
           ListenableFuture<ContainerId> future = containerProvider.allocateContainer(spec);
@@ -169,19 +169,14 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
             @Override
             public void onSuccess(ContainerId containerId) {
               LOG.info("Container " + containerId + " acquired. Marking " + participantId);
-              InstanceConfig existingInstance =
-                  helixAdmin
-                      .getInstanceConfig(cluster.getId().toString(), participantId.toString());
-              existingInstance.setContainerId(containerId);
-              existingInstance.setContainerState(ContainerState.ACQUIRED);
-              accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()),
-                  existingInstance);
+              updateContainerState(cache, accessor, keyBuilder, cluster, containerId,
+                  participantId, ContainerState.ACQUIRED);
             }
 
             @Override
             public void onFailure(Throwable t) {
               LOG.error("Could not allocate a container for participant " + participantId, t);
-              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
+              updateContainerState(cache, accessor, keyBuilder, cluster, null, participantId,
                   ContainerState.FAILED);
             }
           };
@@ -190,13 +185,9 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
 
         // start new containers
         for (final Participant participant : response.getContainersToStart()) {
-          final InstanceConfig existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
-                  .toString());
-          final ContainerId containerId = existingInstance.getContainerId();
-          existingInstance.setContainerState(ContainerState.CONNECTING);
-          accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
-              existingInstance);
+          final ContainerId containerId = participant.getInstanceConfig().getContainerId();
+          updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
+              ContainerState.CONNECTING);
           // create the helix participant and add it to cluster
           LOG.info("Starting container " + containerId + " for " + participant.getId());
           ListenableFuture<Boolean> future =
@@ -212,7 +203,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
             public void onFailure(Throwable t) {
               LOG.error("Could not start container" + containerId + "for participant "
                   + participant.getId(), t);
-              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+              updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
                   ContainerState.FAILED);
             }
           };
@@ -222,13 +213,9 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
         // release containers
         for (final Participant participant : response.getContainersToRelease()) {
           // mark it as finalizing
-          final InstanceConfig existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
-                  .toString());
-          final ContainerId containerId = existingInstance.getContainerId();
-          existingInstance.setContainerState(ContainerState.FINALIZING);
-          accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
-              existingInstance);
+          final ContainerId containerId = participant.getInstanceConfig().getContainerId();
+          updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
+              ContainerState.FINALIZING);
           // remove the participant
           LOG.info("Deallocating container " + containerId + " for " + participant.getId());
           ListenableFuture<Boolean> future = containerProvider.deallocateContainer(containerId);
@@ -240,13 +227,14 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
                   helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
                       .toString());
               helixAdmin.dropInstance(cluster.getId().toString(), existingInstance);
+              cache.requireFullRefresh();
             }
 
             @Override
             public void onFailure(Throwable t) {
               LOG.error("Could not deallocate container" + containerId + "for participant "
                   + participant.getId(), t);
-              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+              updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
                   ContainerState.FAILED);
             }
           };
@@ -255,15 +243,10 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
 
         // stop but don't remove
         for (final Participant participant : response.getContainersToStop()) {
-          // disable the node first
-          final InstanceConfig existingInstance =
-              helixAdmin.getInstanceConfig(cluster.getId().toString(), participant.getId()
-                  .toString());
-          final ContainerId containerId = existingInstance.getContainerId();
-          existingInstance.setInstanceEnabled(false);
-          existingInstance.setContainerState(ContainerState.HALTING);
-          accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
-              existingInstance);
+          // switch to halting
+          final ContainerId containerId = participant.getInstanceConfig().getContainerId();
+          updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
+              ContainerState.HALTING);
           // stop the container
           LOG.info("Stopping container " + containerId + " for " + participant.getId());
           ListenableFuture<Boolean> future = containerProvider.stopContainer(containerId);
@@ -288,7 +271,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
               LOG.error(
                   "Could not stop container" + containerId + "for participant "
                       + participant.getId(), t);
-              updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
+              updateContainerState(cache, accessor, keyBuilder, cluster, null, participant.getId(),
                   ContainerState.FAILED);
             }
           };
@@ -299,7 +282,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
   }
 
   /**
-   * Update a participant with a new container state
+   * Update a participant with a new container state and invalidate cached state
    * @param helixAdmin
    * @param accessor
    * @param keyBuilder
@@ -307,14 +290,17 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
    * @param participantId
    * @param state
    */
-  private void updateContainerState(HelixAdmin helixAdmin, HelixDataAccessor accessor,
-      PropertyKey.Builder keyBuilder, Cluster cluster, ParticipantId participantId,
-      ContainerState state) {
-    InstanceConfig existingInstance =
-        helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
-    existingInstance.setContainerState(state);
-    existingInstance.setInstanceEnabled(state.equals(ContainerState.CONNECTED));
-    accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance);
+  private void updateContainerState(ClusterDataCache cache, HelixDataAccessor accessor,
+      PropertyKey.Builder keyBuilder, Cluster cluster, ContainerId containerId,
+      ParticipantId participantId, ContainerState state) {
+    InstanceConfig delta = new InstanceConfig(participantId);
+    delta.setContainerState(state);
+    if (containerId != null) {
+      delta.setContainerId(containerId);
+    }
+    delta.setInstanceEnabled(state.equals(ContainerState.CONNECTED));
+    accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), delta);
+    cache.requireFullRefresh();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/08371b56/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
index 96a900d..ad2f5f4 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/YarnProvisioner.java
@@ -358,7 +358,11 @@ public class YarnProvisioner implements Provisioner, TargetProvider, ContainerPr
       } else if (!existingContainersIdSet.contains(participantId)) {
         // Unallocated containers must be allocated
         ContainerSpec containerSpec = new ContainerSpec(participantId);
-        containerSpec.setMemory(_resourceConfig.getUserConfig().getIntField("memory", 1024));
+        int mem = 1024;
+        if (_resourceConfig.getUserConfig() != null) {
+          mem = _resourceConfig.getUserConfig().getIntField("memory", 1024);
+        }
+        containerSpec.setMemory(mem);
         containersToAcquire.add(containerSpec);
       }
     }