You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/14 01:19:24 UTC

git commit: Add some basic checks to the provisioning stage

Updated Branches:
  refs/heads/helix-provisioning 9386a4cbc -> 37dd3cf4b


Add some basic checks to the provisioning stage


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/37dd3cf4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/37dd3cf4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/37dd3cf4

Branch: refs/heads/helix-provisioning
Commit: 37dd3cf4bb37dd0d7a7fa55766e5115cecb184c8
Parents: 9386a4c
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Jan 13 16:19:18 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Jan 13 16:19:18 2014 -0800

----------------------------------------------------------------------
 .../provisioner/ParticipantService.java         |  6 +--
 .../stages/ContainerProvisioningStage.java      | 55 +++++++++++++++-----
 2 files changed, 44 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/37dd3cf4/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
index 92b5a24..bfcce06 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
@@ -2,9 +2,9 @@ package org.apache.helix.controller.provisioner;
 
 public interface ParticipantService {
 
-  boolean init(ServiceConfig serviceConfig);
-  
+  // boolean init(ServiceConfig serviceConfig);
+
   boolean start();
-  
+
   boolean stop();
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/37dd3cf4/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 499f904..a17251d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -123,14 +123,15 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
           helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
 
           ListenableFuture<ContainerId> future = provisioner.allocateContainer(spec);
-          Futures.addCallback(future, new FutureCallback<ContainerId>() {
+          FutureCallback<ContainerId> callback = new FutureCallback<ContainerId>() {
             @Override
             public void onSuccess(ContainerId containerId) {
               InstanceConfig existingInstance =
-                  helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
+                  helixAdmin
+                      .getInstanceConfig(cluster.getId().toString(), participantId.toString());
               existingInstance.setContainerId(containerId);
               existingInstance.setContainerState(ContainerState.ACQUIRED);
-              accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()),
+              accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()),
                   existingInstance);
             }
 
@@ -140,7 +141,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
                   ContainerState.FAILED);
             }
-          });
+          };
+          safeAddCallback(future, callback);
         }
 
         // start new containers
@@ -151,11 +153,11 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
           final ContainerId containerId = existingInstance.getContainerId();
           existingInstance.setContainerId(containerId);
           existingInstance.setContainerState(ContainerState.CONNECTING);
-          accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+          accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // create the helix participant and add it to cluster
           ListenableFuture<Boolean> future = provisioner.startContainer(containerId, participant);
-          Futures.addCallback(future, new FutureCallback<Boolean>() {
+          FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
@@ -169,7 +171,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
                   ContainerState.FAILED);
             }
-          });
+          };
+          safeAddCallback(future, callback);
         }
 
         // release containers
@@ -180,11 +183,11 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
                   .toString());
           final ContainerId containerId = existingInstance.getContainerId();
           existingInstance.setContainerState(ContainerState.FINALIZING);
-          accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+          accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // remove the participant
           ListenableFuture<Boolean> future = provisioner.deallocateContainer(containerId);
-          Futures.addCallback(future, new FutureCallback<Boolean>() {
+          FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
               InstanceConfig existingInstance =
@@ -200,7 +203,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
                   ContainerState.FAILED);
             }
-          });
+          };
+          safeAddCallback(future, callback);
         }
 
         // stop but don't remove
@@ -212,11 +216,11 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
           final ContainerId containerId = existingInstance.getContainerId();
           existingInstance.setInstanceEnabled(false);
           existingInstance.setContainerState(ContainerState.TEARDOWN);
-          accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+          accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
               existingInstance);
           // stop the container
           ListenableFuture<Boolean> future = provisioner.stopContainer(containerId);
-          Futures.addCallback(future, new FutureCallback<Boolean>() {
+          FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
             @Override
             public void onSuccess(Boolean result) {
               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
@@ -231,18 +235,41 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
               updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
                   ContainerState.FAILED);
             }
-          });
+          };
+          safeAddCallback(future, callback);
         }
       }
     }
   }
 
+  /**
+   * Update a participant with a new container state
+   * @param helixAdmin
+   * @param accessor
+   * @param keyBuilder
+   * @param cluster
+   * @param participantId
+   * @param state
+   */
   private void updateContainerState(HelixAdmin helixAdmin, HelixDataAccessor accessor,
       PropertyKey.Builder keyBuilder, Cluster cluster, ParticipantId participantId,
       ContainerState state) {
     InstanceConfig existingInstance =
         helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
     existingInstance.setContainerState(state);
-    accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance);
+    accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance);
+  }
+
+  /**
+   * Add a callback, failing if the add fails
+   * @param future the future to listen on
+   * @param callback the callback to invoke
+   */
+  private <T> void safeAddCallback(ListenableFuture<T> future, FutureCallback<T> callback) {
+    try {
+      Futures.addCallback(future, callback);
+    } catch (Throwable t) {
+      callback.onFailure(t);
+    }
   }
 }