You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/01/14 01:19:24 UTC
git commit: Add some basic checks to the provisioning stage
Updated Branches:
refs/heads/helix-provisioning 9386a4cbc -> 37dd3cf4b
Add some basic checks to the provisioning stage
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/37dd3cf4
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/37dd3cf4
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/37dd3cf4
Branch: refs/heads/helix-provisioning
Commit: 37dd3cf4bb37dd0d7a7fa55766e5115cecb184c8
Parents: 9386a4c
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Jan 13 16:19:18 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Jan 13 16:19:18 2014 -0800
----------------------------------------------------------------------
.../provisioner/ParticipantService.java | 6 +--
.../stages/ContainerProvisioningStage.java | 55 +++++++++++++++-----
2 files changed, 44 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/37dd3cf4/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
index 92b5a24..bfcce06 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/provisioner/ParticipantService.java
@@ -2,9 +2,9 @@ package org.apache.helix.controller.provisioner;
public interface ParticipantService {
- boolean init(ServiceConfig serviceConfig);
-
+ // boolean init(ServiceConfig serviceConfig);
+
boolean start();
-
+
boolean stop();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/37dd3cf4/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 499f904..a17251d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -123,14 +123,15 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
helixAdmin.addInstance(cluster.getId().toString(), instanceConfig);
ListenableFuture<ContainerId> future = provisioner.allocateContainer(spec);
- Futures.addCallback(future, new FutureCallback<ContainerId>() {
+ FutureCallback<ContainerId> callback = new FutureCallback<ContainerId>() {
@Override
public void onSuccess(ContainerId containerId) {
InstanceConfig existingInstance =
- helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
+ helixAdmin
+ .getInstanceConfig(cluster.getId().toString(), participantId.toString());
existingInstance.setContainerId(containerId);
existingInstance.setContainerState(ContainerState.ACQUIRED);
- accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()),
+ accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()),
existingInstance);
}
@@ -140,7 +141,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participantId,
ContainerState.FAILED);
}
- });
+ };
+ safeAddCallback(future, callback);
}
// start new containers
@@ -151,11 +153,11 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
final ContainerId containerId = existingInstance.getContainerId();
existingInstance.setContainerId(containerId);
existingInstance.setContainerState(ContainerState.CONNECTING);
- accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+ accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
existingInstance);
// create the helix participant and add it to cluster
ListenableFuture<Boolean> future = provisioner.startContainer(containerId, participant);
- Futures.addCallback(future, new FutureCallback<Boolean>() {
+ FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
@@ -169,7 +171,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
ContainerState.FAILED);
}
- });
+ };
+ safeAddCallback(future, callback);
}
// release containers
@@ -180,11 +183,11 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
.toString());
final ContainerId containerId = existingInstance.getContainerId();
existingInstance.setContainerState(ContainerState.FINALIZING);
- accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+ accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
existingInstance);
// remove the participant
ListenableFuture<Boolean> future = provisioner.deallocateContainer(containerId);
- Futures.addCallback(future, new FutureCallback<Boolean>() {
+ FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
InstanceConfig existingInstance =
@@ -200,7 +203,8 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
ContainerState.FAILED);
}
- });
+ };
+ safeAddCallback(future, callback);
}
// stop but don't remove
@@ -212,11 +216,11 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
final ContainerId containerId = existingInstance.getContainerId();
existingInstance.setInstanceEnabled(false);
existingInstance.setContainerState(ContainerState.TEARDOWN);
- accessor.setProperty(keyBuilder.instanceConfig(participant.getId().toString()),
+ accessor.updateProperty(keyBuilder.instanceConfig(participant.getId().toString()),
existingInstance);
// stop the container
ListenableFuture<Boolean> future = provisioner.stopContainer(containerId);
- Futures.addCallback(future, new FutureCallback<Boolean>() {
+ FutureCallback<Boolean> callback = new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
@@ -231,18 +235,41 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
updateContainerState(helixAdmin, accessor, keyBuilder, cluster, participant.getId(),
ContainerState.FAILED);
}
- });
+ };
+ safeAddCallback(future, callback);
}
}
}
}
+ /**
+ * Update a participant with a new container state
+ * @param helixAdmin
+ * @param accessor
+ * @param keyBuilder
+ * @param cluster
+ * @param participantId
+ * @param state
+ */
private void updateContainerState(HelixAdmin helixAdmin, HelixDataAccessor accessor,
PropertyKey.Builder keyBuilder, Cluster cluster, ParticipantId participantId,
ContainerState state) {
InstanceConfig existingInstance =
helixAdmin.getInstanceConfig(cluster.getId().toString(), participantId.toString());
existingInstance.setContainerState(state);
- accessor.setProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance);
+ accessor.updateProperty(keyBuilder.instanceConfig(participantId.toString()), existingInstance);
+ }
+
+ /**
+ * Add a callback, failing if the add fails
+ * @param future the future to listen on
+ * @param callback the callback to invoke
+ */
+ private <T> void safeAddCallback(ListenableFuture<T> future, FutureCallback<T> callback) {
+ try {
+ Futures.addCallback(future, callback);
+ } catch (Throwable t) {
+ callback.onFailure(t);
+ }
}
}