You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/11/03 16:51:47 UTC

[03/29] git commit: BrooklynNode cluster + upgrade effector

BrooklynNode cluster + upgrade effector

Also effectors to:
  * select master in the cluster
  * set HA priority
  * set HA state


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/6f895aaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/6f895aaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/6f895aaf

Branch: refs/heads/master
Commit: 6f895aafe49cc8046cc043c17bb38cbbb4018da4
Parents: 63f29bd
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Wed Oct 22 11:51:19 2014 +0300
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Oct 31 09:36:16 2014 -0500

----------------------------------------------------------------------
 .../brooklyn/entity/basic/EntityPredicates.java |   9 +
 .../entity/brooklynnode/BrooklynCluster.java    |  56 ++++
 .../brooklynnode/BrooklynClusterImpl.java       | 110 ++++++++
 .../brooklynnode/BrooklynEntityMirrorImpl.java  |   2 -
 .../entity/brooklynnode/BrooklynNode.java       |  30 ++-
 .../entity/brooklynnode/BrooklynNodeImpl.java   |  16 +-
 .../effector/SelectMasterEffectorBody.java      | 173 ++++++++++++
 .../effector/SetHAModeEffectorBody.java         |  64 +++++
 .../effector/SetHAPriorityEffectorBody.java     |  55 ++++
 .../effector/UpgradeClusterEffectorBody.java    | 199 ++++++++++++++
 .../effector/CallbackEntityHttpClient.java      |  90 +++++++
 .../effector/SelectMasterEffectorTest.java      | 267 +++++++++++++++++++
 .../brooklynnode/effector/TestHttpEntity.java   |  66 +++++
 .../main/java/brooklyn/rest/api/ServerApi.java  |  23 +-
 .../brooklyn/rest/resources/ServerResource.java |  26 +-
 15 files changed, 1178 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java b/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java
index e0db9e9..b359b1e 100644
--- a/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java
+++ b/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java
@@ -90,6 +90,15 @@ public class EntityPredicates {
         };
     }
     
+    public static <T> Predicate<Entity> attributeNotEqualTo(final AttributeSensor<T> attribute, final T val) {
+        return new SerializablePredicate<Entity>() {
+            @Override
+            public boolean apply(@Nullable Entity input) {
+                return (input != null) && !Objects.equal(input.getAttribute(attribute), val);
+            }
+        };
+    }
+    
     public static <T> Predicate<Entity> configEqualTo(final ConfigKey<T> configKey, final T val) {
         return new SerializablePredicate<Entity>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
new file mode 100644
index 0000000..a3c2157
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Effector;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+
+@ImplementedBy(BrooklynClusterImpl.class)
+public interface BrooklynCluster extends DynamicCluster {
+    public static final AttributeSensor<BrooklynNode> MASTER_NODE = new BasicAttributeSensor<BrooklynNode>(
+            BrooklynNode.class, "brooklyncluster.master", "Pointer to the child node with MASTER state in the cluster");
+
+    public interface SelectMasterEffector {
+        ConfigKey<String> NEW_MASTER_ID = ConfigKeys.newStringConfigKey(
+                "brooklyncluster.new_master_id", "The ID of the node to become master", null);
+        Effector<Void> SELECT_MASTER = Effectors.effector(Void.class, "selectMaster")
+                .description("Select a new master in the cluster")
+                .parameter(NEW_MASTER_ID)
+                .buildAbstract();
+    }
+
+    public static final Effector<Void> SELECT_MASTER = SelectMasterEffector.SELECT_MASTER;
+
+    public interface UpgradeClusterEffector {
+        Effector<Void> UPGRADE_CLUSTER = Effectors.effector(Void.class, "upgradeCluster")
+                .description("Upgrade the cluster with new distribution version")
+                .parameter(SoftwareProcess.DOWNLOAD_URL.getConfigKey())
+                .buildAbstract();
+    }
+
+    public static final Effector<Void> UPGRADE_CLUSTER = UpgradeClusterEffector.UPGRADE_CLUSTER;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
new file mode 100644
index 0000000..bfaf33c
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
+import brooklyn.entity.brooklynnode.effector.SelectMasterEffectorBody;
+import brooklyn.entity.brooklynnode.effector.UpgradeClusterEffectorBody;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.event.feed.function.FunctionPollConfig;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+
+public class BrooklynClusterImpl extends DynamicClusterImpl implements BrooklynCluster {
+
+    private static final String MSG_NO_MASTER = "No master node in cluster";
+
+    private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class);
+
+    //TODO set MEMBER_SPEC
+
+    private FunctionFeed scanMaster;
+
+    @Override
+    public void init() {
+        super.init();
+        getMutableEntityType().addEffector(SelectMasterEffectorBody.SELECT_MASTER);
+        getMutableEntityType().addEffector(UpgradeClusterEffectorBody.UPGRADE_CLUSTER);
+
+        ServiceNotUpLogic.updateNotUpIndicator(this, MASTER_NODE, MSG_NO_MASTER);
+        scanMaster = FunctionFeed.builder()
+                .entity(this)
+                .poll(new FunctionPollConfig<Object, BrooklynNode>(MASTER_NODE)
+                        .period(Duration.ONE_SECOND)
+                        .callable(new Callable<BrooklynNode>() {
+                                @Override
+                                public BrooklynNode call() throws Exception {
+                                    return findMasterChild();
+                                }
+                            }))
+                .build();
+    }
+
+    private BrooklynNode findMasterChild() {
+        Collection<Entity> masters = FluentIterable.from(getMembers())
+                .filter(EntityPredicates.attributeEqualTo(BrooklynNode.MANAGEMENT_NODE_STATE, ManagementNodeState.MASTER))
+                .toList();
+
+        if (masters.size() == 0) {
+            ServiceNotUpLogic.updateNotUpIndicator(this, MASTER_NODE, MSG_NO_MASTER);
+            return null;
+        } else if (masters.size() == 1) {
+            ServiceNotUpLogic.clearNotUpIndicator(this, MASTER_NODE);
+            return (BrooklynNode)Iterables.getOnlyElement(masters);
+        } else if (masters.size() == 2) {
+            //Probably hit a window where we have a new master
+            //its BrooklynNode picked it up, but the BrooklynNode
+            //for the old master hasn't refreshed its state yet.
+            //Just pick one of them, should sort itself out in next update.
+            LOG.warn("Two masters detected, probably a handover just occured: " + masters);
+
+            //Don't clearNotUpIndicator - if there were no masters previously
+            //why have two now.
+
+            return (BrooklynNode)Iterables.getOnlyElement(masters);
+        } else {
+            //Set on fire?
+            String msg = "Multiple (>=3) master nodes in cluster: " + masters;
+            LOG.error(msg);
+            throw new IllegalStateException(msg);
+        }
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+
+        if (scanMaster != null && scanMaster.isActivated()) {
+            scanMaster.stop();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java
index a71402f..b0fb728 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java
@@ -25,8 +25,6 @@ import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 
 import org.apache.http.HttpStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import brooklyn.entity.Effector;
 import brooklyn.entity.basic.AbstractEntity;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java
index 16b46c6..6208813 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java
@@ -40,6 +40,8 @@ import brooklyn.event.basic.Sensors;
 import brooklyn.event.basic.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey;
 import brooklyn.event.basic.MapConfigKey;
 import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
 import brooklyn.util.collections.MutableMap;
 import brooklyn.util.flags.SetFromFlag;
 import brooklyn.util.net.Networking;
@@ -215,8 +217,11 @@ public interface BrooklynNode extends SoftwareProcess, UsesJava {
             "brooklynnode.webconsole.portMapper", "Function for mapping private to public ports, for use in inferring the brooklyn URI", Functions.<Integer>identity());
 
     public static final AttributeSensor<URI> WEB_CONSOLE_URI = new BasicAttributeSensor<URI>(
-            URI.class, "brooklynnode.webconsole.url", "URL of the brooklyn web-console");
-    
+        URI.class, "brooklynnode.webconsole.url", "URL of the brooklyn web-console");
+
+    public static final AttributeSensor<ManagementNodeState> MANAGEMENT_NODE_STATE = new BasicAttributeSensor<ManagementNodeState>(
+        ManagementNodeState.class, "brooklynnode.ha.state", "High-availability state of the management node (MASTER, HOT_STANDBY, etc)");
+
     @SetFromFlag("noShutdownOnExit")
     public static final ConfigKey<Boolean> NO_SHUTDOWN_ON_EXIT = ConfigKeys.newBooleanConfigKey("brooklynnode.noshutdownonexit", 
         "Whether to shutdown entities on exit", false);
@@ -275,6 +280,25 @@ public interface BrooklynNode extends SoftwareProcess, UsesJava {
 
     public static final Effector<Void> STOP_NODE_AND_KILL_APPS = StopNodeAndKillAppsEffector.STOP_NODE_AND_KILL_APPS;
 
-    public EntityHttpClient http();
+    public interface SetHAPriorityEffector {
+        ConfigKey<Integer> PRIORITY = ConfigKeys.newIntegerConfigKey("priority", "HA priority");
+        Effector<Integer> SET_HA_PRIORITY = Effectors.effector(Integer.class, "setHAPriotity")
+                .description("Set HA priority on the node, returns the old priority")
+                .parameter(PRIORITY)
+                .buildAbstract();
+    }
 
+    public static final Effector<Integer> SET_HA_PRIORITY = SetHAPriorityEffector.SET_HA_PRIORITY;
+
+    public interface SetHAModeEffector {
+        ConfigKey<HighAvailabilityMode> MODE = ConfigKeys.newConfigKey(HighAvailabilityMode.class, "mode", "HA mode");
+        Effector<ManagementNodeState> SET_HA_MODE = Effectors.effector(ManagementNodeState.class, "setHAMode")
+                .description("Set HA mode on the node, returns the existing state")
+                .parameter(MODE)
+                .buildAbstract();
+    }
+
+    public static final Effector<ManagementNodeState> SET_HA_MODE = SetHAModeEffector.SET_HA_MODE;
+
+    public EntityHttpClient http();
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
index ddf2b96..630563d 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
@@ -38,13 +38,17 @@ import brooklyn.entity.basic.EntityPredicates;
 import brooklyn.entity.basic.Lifecycle;
 import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
 import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.brooklynnode.effector.SetHAModeEffectorBody;
+import brooklyn.entity.brooklynnode.effector.SetHAPriorityEffectorBody;
 import brooklyn.entity.effector.EffectorBody;
 import brooklyn.entity.effector.Effectors;
 import brooklyn.event.feed.ConfigToAttributes;
 import brooklyn.event.feed.http.HttpFeed;
 import brooklyn.event.feed.http.HttpPollConfig;
 import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.event.feed.http.JsonFunctions;
 import brooklyn.management.TaskAdaptable;
+import brooklyn.management.ha.ManagementNodeState;
 import brooklyn.util.collections.Jsonya;
 import brooklyn.util.collections.MutableMap;
 import brooklyn.util.config.ConfigBag;
@@ -52,6 +56,7 @@ import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.exceptions.PropagatedRuntimeException;
 import brooklyn.util.guava.Functionals;
 import brooklyn.util.http.HttpToolResponse;
+import brooklyn.util.javalang.Enums;
 import brooklyn.util.javalang.JavaClassNames;
 import brooklyn.util.repeat.Repeater;
 import brooklyn.util.task.DynamicTasks;
@@ -62,6 +67,7 @@ import brooklyn.util.time.Duration;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
 import com.google.gson.Gson;
 
 public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNode {
@@ -94,6 +100,8 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod
         getMutableEntityType().addEffector(ShutdownEffectorBody.SHUTDOWN);
         getMutableEntityType().addEffector(StopNodeButLeaveAppsEffectorBody.STOP_NODE_BUT_LEAVE_APPS);
         getMutableEntityType().addEffector(StopNodeAndKillAppsEffectorBody.STOP_NODE_AND_KILL_APPS);
+        getMutableEntityType().addEffector(SetHAPriorityEffectorBody.SET_HA_PRIORITY);
+        getMutableEntityType().addEffector(SetHAModeEffectorBody.SET_HA_MODE);
     }
 
     @Override
@@ -191,7 +199,9 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod
                     .addIfNotNull("delayForHttpReturn", toNullableString(parameters.get(DELAY_FOR_HTTP_RETURN)));
             try {
                 HttpToolResponse resp = ((BrooklynNode)entity()).http()
-                    .post("/v1/server/shutdown", MutableMap.<String, String>of(), formParams);
+                    .post("/v1/server/shutdown",
+                        ImmutableMap.of("Brooklyn-Allow-Non-Master-Access", "true"),
+                        formParams);
                 if (resp.getResponseCode() != HttpStatus.SC_NO_CONTENT) {
                     throw new IllegalStateException("Response code "+resp.getResponseCode());
                 }
@@ -341,6 +351,10 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod
                     .poll(new HttpPollConfig<Boolean>(WEB_CONSOLE_ACCESSIBLE)
                             .onSuccess(HttpValueFunctions.responseCodeEquals(200))
                             .setOnFailureOrException(false))
+                    .poll(new HttpPollConfig<ManagementNodeState>(MANAGEMENT_NODE_STATE)
+                            .suburl("/v1/server/ha/state")
+                            .onSuccess(Functionals.chain(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.cast(String.class)), Enums.fromStringFunction(ManagementNodeState.class)))
+                            .setOnFailureOrException(null))
                     .build();
 
             if (!Lifecycle.RUNNING.equals(getAttribute(SERVICE_STATE_ACTUAL))) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java
new file mode 100644
index 0000000..255b27c
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Effector;
+import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.brooklynnode.BrooklynCluster;
+import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAPriorityEffector;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.time.Duration;
+
+import com.google.api.client.util.Preconditions;
+import com.google.common.collect.Iterables;
+
+public class SelectMasterEffectorBody extends EffectorBody<Void> implements SelectMasterEffector {
+    public static final Effector<Void> SELECT_MASTER = Effectors.effector(SelectMasterEffector.SELECT_MASTER).impl(new SelectMasterEffectorBody()).build();
+    
+    private static final Logger LOG = LoggerFactory.getLogger(SelectMasterEffectorBody.class);
+
+    private static final int HA_STANDBY_PRIORITY = 0;
+    private static final int HA_MASTER_PRIORITY = 1;
+
+    private AtomicBoolean selectMasterInProgress = new AtomicBoolean();
+
+    @Override
+    public Void call(ConfigBag parameters) {
+        if (!selectMasterInProgress.compareAndSet(false, true)) {
+            throw new IllegalStateException("A master change is already in progress.");
+        }
+
+        try {
+            selectMaster(parameters);
+        } finally {
+            selectMasterInProgress.set(false);
+        }
+        return null;
+    }
+
+    private void selectMaster(ConfigBag parameters) {
+        String newMasterId = parameters.get(NEW_MASTER_ID);
+        Preconditions.checkNotNull(newMasterId, NEW_MASTER_ID.getName() + " parameter is required");
+
+        final Entity oldMaster = entity().getAttribute(BrooklynCluster.MASTER_NODE);
+        if (oldMaster != null && oldMaster.getId().equals(newMasterId)) {
+            LOG.info(newMasterId + " is already the current master, no change needed.");
+            return;
+        }
+
+        final Entity newMaster = getMember(newMasterId);
+
+        //1. Increase the priority of the node we wish to become master
+        setNodePriority(newMaster, HA_MASTER_PRIORITY);
+
+        //2. Denote the existing master so a new election takes place
+        try {
+            //If no master was yet selected, at least wait to see
+            //if the new master will be what we expect.
+            if (oldMaster != null) {
+                setNodeState(oldMaster, HighAvailabilityMode.HOT_STANDBY);
+            }
+
+            waitMasterHandover(oldMaster, newMaster);
+        } finally {
+            //3. Revert the priority of the node once it has become master
+            setNodePriority(newMaster, HA_STANDBY_PRIORITY);
+        }
+
+        checkMasterSelected(newMaster);
+    }
+
+    private void waitMasterHandover(final Entity oldMaster, final Entity newMaster) {
+        boolean masterChanged = Repeater.create()
+            .backoff(Duration.millis(500), 1.2, Duration.FIVE_SECONDS)
+            .limitTimeTo(Duration.ONE_MINUTE)
+            .until(new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    Entity master = getMasterNode();
+                    return master != oldMaster && master != null;
+                }
+            })
+            .run();
+        if (!masterChanged) {
+            LOG.warn("Timeout waiting for node to become master: " + newMaster + ".");
+        }
+    }
+
+    private void setNodeState(Entity oldMaster, HighAvailabilityMode mode) {
+        ManagementNodeState oldState = DynamicTasks.queue(
+                Effectors.invocation(
+                        oldMaster,
+                        BrooklynNode.SET_HA_MODE,
+                        MutableMap.of(SetHAModeEffector.MODE, mode))
+            ).asTask().getUnchecked();
+
+        if (oldState != ManagementNodeState.MASTER) {
+            LOG.warn("The previous HA state on node " + oldMaster.getId() + " was " + oldState +
+                    ", while the expected value is " + ManagementNodeState.MASTER + ".");
+        }
+    }
+
+    private void setNodePriority(Entity newMaster, int newPriority) {
+        Integer oldPriority = DynamicTasks.queue(
+                Effectors.invocation(
+                    newMaster,
+                    BrooklynNode.SET_HA_PRIORITY,
+                    MutableMap.of(SetHAPriorityEffector.PRIORITY, newPriority))
+            ).asTask().getUnchecked();
+
+        Integer expectedPriority = (newPriority == HA_MASTER_PRIORITY ? HA_STANDBY_PRIORITY : HA_MASTER_PRIORITY);
+        if (oldPriority != expectedPriority) {
+            LOG.warn("The previous HA priority on node " + newMaster.getId() + " was " + oldPriority +
+                    ", while the expected value is " + expectedPriority + " (while setting priority " +
+                    newPriority + ").");
+        }
+    }
+
+    private void checkMasterSelected(Entity newMaster) {
+        Entity actualMaster = getMasterNode();
+        if (actualMaster != newMaster) {
+            throw new IllegalStateException("Expected node " + newMaster + " to be master, but found that " +
+                    "master is " + actualMaster + " instead.");
+        }
+    }
+
+    private Entity getMember(String memberId) {
+        Group cluster = (Group)entity();
+        try {
+            return Iterables.find(cluster.getMembers(), EntityPredicates.idEqualTo(memberId));
+        } catch (NoSuchElementException e) {
+            throw new IllegalStateException(memberId + " is not an ID of brooklyn node in this cluster");
+        }
+    }
+
+    private Entity getMasterNode() {
+        return entity().getAttribute(BrooklynCluster.MASTER_NODE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java
new file mode 100644
index 0000000..36a51d0
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import org.apache.http.HttpStatus;
+
+import brooklyn.entity.Effector;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector;
+import brooklyn.entity.brooklynnode.EntityHttpClient;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.event.feed.http.JsonFunctions;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.http.HttpToolResponse;
+import brooklyn.util.javalang.Enums;
+
+import com.google.api.client.util.Preconditions;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+
+public class SetHAModeEffectorBody extends EffectorBody<ManagementNodeState> implements SetHAModeEffector {
+    public static final Effector<ManagementNodeState> SET_HA_MODE = Effectors.effector(SetHAModeEffector.SET_HA_MODE).impl(new SetHAModeEffectorBody()).build();
+
+    @Override
+    public ManagementNodeState call(ConfigBag parameters) {
+        HighAvailabilityMode mode = parameters.get(MODE);
+        Preconditions.checkNotNull(mode, MODE.getName() + " parameter is required");
+
+        EntityHttpClient httpClient = ((BrooklynNode)entity()).http();
+        HttpToolResponse resp = httpClient.post("/v1/server/ha/state", 
+                ImmutableMap.of("Brooklyn-Allow-Non-Master-Access", "true"),
+                ImmutableMap.of("mode", mode.toString()));
+
+        if (resp.getResponseCode() == HttpStatus.SC_OK) {
+            Function<HttpToolResponse, ManagementNodeState> parseRespone = Functionals.chain(
+                    Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.cast(String.class)),
+                    Enums.fromStringFunction(ManagementNodeState.class));
+            return parseRespone.apply(resp);
+        } else {
+            throw new IllegalStateException("Unexpected response code: " + resp.getResponseCode() + "\n" + resp.getContentAsString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java
new file mode 100644
index 0000000..94a961c
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import org.apache.http.HttpStatus;
+
+import brooklyn.entity.Effector;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAPriorityEffector;
+import brooklyn.entity.brooklynnode.EntityHttpClient;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.http.HttpToolResponse;
+
+import com.google.api.client.util.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+public class SetHAPriorityEffectorBody extends EffectorBody<Integer> implements SetHAPriorityEffector {
+    public static final Effector<Integer> SET_HA_PRIORITY = Effectors.effector(SetHAPriorityEffector.SET_HA_PRIORITY).impl(new SetHAPriorityEffectorBody()).build();
+
+    @Override
+    public Integer call(ConfigBag parameters) {
+        Integer priority = parameters.get(PRIORITY);
+        Preconditions.checkNotNull(priority, PRIORITY.getName() + " parameter is required");
+
+        EntityHttpClient httpClient = ((BrooklynNode)entity()).http();
+        HttpToolResponse resp = httpClient.post("/v1/server/ha/priority",
+            ImmutableMap.of("Brooklyn-Allow-Non-Master-Access", "true"),
+            ImmutableMap.of("priority", priority.toString()));
+
+        if (resp.getResponseCode() == HttpStatus.SC_OK) {
+            return Integer.valueOf(resp.getContentAsString());
+        } else {
+            throw new IllegalStateException("Unexpected response code: " + resp.getResponseCode() + "\n" + resp.getContentAsString());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
new file mode 100644
index 0000000..70b184a
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Effector;
+import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.brooklynnode.BrooklynCluster;
+import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector;
+import brooklyn.entity.brooklynnode.BrooklynCluster.UpgradeClusterEffector;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.net.Urls;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.time.Duration;
+
+import com.google.api.client.util.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
+
+public class UpgradeClusterEffectorBody extends EffectorBody<Void> implements UpgradeClusterEffector {
+    public static final Effector<Void> UPGRADE_CLUSTER = Effectors.effector(UpgradeClusterEffector.UPGRADE_CLUSTER).impl(new UpgradeClusterEffectorBody()).build();
+
+    private AtomicBoolean upgradeInProgress = new AtomicBoolean();
+
+    @Override
+    public Void call(ConfigBag parameters) {
+        if (!upgradeInProgress.compareAndSet(false, true)) {
+            throw new IllegalStateException("An upgrade is already in progress.");
+        }
+
+        EntitySpec<?> memberSpec = entity().getConfig(BrooklynCluster.MEMBER_SPEC);
+        Preconditions.checkNotNull(memberSpec, BrooklynCluster.MEMBER_SPEC.getName() + " is required for " + UpgradeClusterEffector.class.getName());
+
+        Map<ConfigKey<?>, Object> specCfg = memberSpec.getConfig();
+        String oldDownloadUrl = (String) specCfg.get(BrooklynNode.DOWNLOAD_URL);
+        String oldUploadUrl = (String) specCfg.get(BrooklynNode.DISTRO_UPLOAD_URL);
+        String newDownloadUrl = parameters.get(BrooklynNode.DOWNLOAD_URL.getConfigKey());
+        String newUploadUrl = inferUploadUrl(newDownloadUrl);
+        try {
+            memberSpec.configure(BrooklynNode.DOWNLOAD_URL, newUploadUrl);
+            memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, newUploadUrl);
+            upgrade(parameters);
+        } catch (Exception e) {
+            memberSpec.configure(BrooklynNode.DOWNLOAD_URL, oldDownloadUrl);
+            memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, oldUploadUrl);
+            throw Exceptions.propagate(e);
+        } finally {
+            upgradeInProgress.set(false);
+        }
+        return null;
+    }
+
+    private String inferUploadUrl(String newDownloadUrl) {
+        boolean isLocal = "file".equals(Urls.getProtocol(newDownloadUrl)) || new File(newDownloadUrl).exists();
+        if (isLocal) {
+            return newDownloadUrl;
+        } else {
+            return null;
+        }
+    }
+
+    private void upgrade(ConfigBag parameters) {
+        //TODO might be worth separating each step in a task for better UI
+
+        Group cluster = (Group)entity();
+        Collection<Entity> initialMembers = cluster.getMembers();
+        int initialClusterSize = initialMembers.size();
+
+        //1. Initially create a single node to check if it will launch successfully
+        Entity initialNode = Iterables.getOnlyElement(createNodes(1));
+
+        //2. If everything is OK with the first node launch the rest as well
+        Collection<Entity> remainingNodes = createNodes(initialClusterSize - 1);
+
+        //3. Once we have all nodes running without errors switch master
+        DynamicTasks.queue(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, MutableMap.of(SelectMasterEffector.NEW_MASTER_ID, initialNode.getId()))).asTask().getUnchecked();
+
+        //4. Stop the nodes which were running at the start of the upgrade call, but keep them around.
+        //   Should we create a quarantine-like zone for old stopped version?
+        //   For members that were created meanwhile - they will be using the new version already. If the new version
+        //   isn't good then they will fail to start as well, forcing the policies to retry (and succeed once the
+        //   URL is reverted).
+        HashSet<Entity> oldMembers = new HashSet<Entity>(initialMembers);
+        oldMembers.removeAll(remainingNodes);
+        oldMembers.remove(initialNode);
+        DynamicTasks.queue(Effectors.invocation(BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, Collections.emptyMap(), oldMembers)).asTask().getUnchecked();
+    }
+
+    private Collection<Entity> createNodes(int nodeCnt) {
+        DynamicCluster cluster = (DynamicCluster)entity();
+
+        //1. Create the nodes
+        Collection<Entity> newNodes = cluster.resizeByDelta(nodeCnt);
+
+        //2. Wait for them to be RUNNING
+        waitAttributeNotEqualTo(
+                newNodes,
+                BrooklynNode.SERVICE_STATE_ACTUAL, Lifecycle.STARTING);
+
+        //3. Set HOT_STANDBY in case it is not enabled on the command line ...
+        DynamicTasks.queue(Effectors.invocation(
+                BrooklynNode.SET_HA_MODE,
+                MutableMap.of(SetHAModeEffector.MODE, HighAvailabilityMode.HOT_STANDBY), 
+                newNodes)).asTask().getUnchecked();
+
+        //4. ... and wait until all of the nodes change state
+        //TODO if the REST call is blocking this is not needed
+        waitAttributeEqualTo(
+                newNodes,
+                BrooklynNode.MANAGEMENT_NODE_STATE,
+                ManagementNodeState.HOT_STANDBY);
+
+        //5. Just in case check if all of the nodes are SERVICE_UP (which would rule out ON_FIRE as well)
+        Collection<Entity> failedNodes = Collections2.filter(newNodes, EntityPredicates.attributeEqualTo(BrooklynNode.SERVICE_UP, Boolean.FALSE));
+        if (!failedNodes.isEmpty()) {
+            throw new IllegalStateException("Nodes " + failedNodes + " are not " + BrooklynNode.SERVICE_UP + " though successfully in " + ManagementNodeState.HOT_STANDBY);
+        }
+        return newNodes;
+    }
+
+    private <T> void waitAttributeEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
+        waitPredicate(
+                nodes, 
+                EntityPredicates.attributeEqualTo(sensor, value),
+                "Waiting for nodes " + nodes + ", sensor " + sensor + " to be " + value,
+                "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change to " + value);
+    }
+
+    private <T> void waitAttributeNotEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
+        waitPredicate(
+                nodes, 
+                EntityPredicates.attributeNotEqualTo(sensor, value),
+                "Waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value,
+                "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value);
+    }
+
+    private <T extends Entity> void waitPredicate(Collection<T> nodes, Predicate<T> waitPredicate, String blockingMsg, String errorMsg) {
+        Tasks.setBlockingDetails(blockingMsg);
+        boolean pollSuccess = Repeater.create(blockingMsg)
+            .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
+            .limitTimeTo(Duration.ONE_HOUR)
+            .until(nodes, allMatch(waitPredicate))
+            .run();
+        Tasks.resetBlockingDetails();
+
+        if (!pollSuccess) {
+            throw new IllegalStateException(errorMsg);
+        }
+    }
+
+    public static <T> Predicate<Collection<T>> allMatch(final Predicate<T> predicate) {
+        return new Predicate<Collection<T>>() {
+            @Override
+            public boolean apply(Collection<T> input) {
+                return Iterables.all(input, predicate);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java
----------------------------------------------------------------------
diff --git a/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java
new file mode 100644
index 0000000..8c8004b
--- /dev/null
+++ b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.brooklynnode.EntityHttpClient;
+import brooklyn.util.http.HttpTool.HttpClientBuilder;
+import brooklyn.util.http.HttpToolResponse;
+
+import com.google.common.base.Function;
+
+public class CallbackEntityHttpClient implements EntityHttpClient {
+    public static class Request {
+        private Entity entity;
+        private String method;
+        private String path;
+        private Map<String, String> params;
+        public Request(Entity entity, String method, String path, Map<String, String> params) {
+            this.entity = entity;
+            this.method = method;
+            this.path = path;
+            this.params = params;
+        }
+        public Entity getEntity() {
+            return entity;
+        }
+        public String getMethod() {
+            return method;
+        }
+        public String getPath() {
+            return path;
+        }
+        public Map<String, String> getParams() {
+            return params;
+        }
+    }
+    private Function<Request, String> callback;
+    private Entity entity;
+
+    public CallbackEntityHttpClient(Entity entity, Function<Request, String> callback) {
+        this.entity = entity;
+        this.callback = callback;
+    }
+
+    @Override
+    public HttpClientBuilder getHttpClientForBrooklynNode() {
+        throw new IllegalStateException("Method call not expected");
+    }
+
+    @Override
+    public HttpToolResponse get(String path) {
+        String result = callback.apply(new Request(entity, HttpGet.METHOD_NAME, path, Collections.<String, String>emptyMap()));
+        return new HttpToolResponse(HttpStatus.SC_OK, null, result.getBytes(), 0, 0, 0);
+    }
+
+    @Override
+    public HttpToolResponse post(String path, Map<String, String> headers, byte[] body) {
+        throw new IllegalStateException("Method call not expected");
+    }
+
+    @Override
+    public HttpToolResponse post(String path, Map<String, String> headers, Map<String, String> formParams) {
+        String result = callback.apply(new Request(entity, HttpPost.METHOD_NAME, path, formParams));
+        return new HttpToolResponse(HttpStatus.SC_OK, Collections.<String, List<String>>emptyMap(), result.getBytes(), 0, 0, 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java
----------------------------------------------------------------------
diff --git a/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java
new file mode 100644
index 0000000..6036507
--- /dev/null
+++ b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+
+import org.apache.http.client.methods.HttpPost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.BasicApplication;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.entity.brooklynnode.BrooklynCluster;
+import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector;
+import brooklyn.entity.brooklynnode.BrooklynClusterImpl;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.effector.CallbackEntityHttpClient.Request;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.feed.AttributePollHandler;
+import brooklyn.event.feed.DelegatingPollHandler;
+import brooklyn.event.feed.Poller;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.management.ManagementContext;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.test.entity.LocalManagementContextForTests;
+import brooklyn.util.task.BasicExecutionContext;
+import brooklyn.util.task.BasicExecutionManager;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class SelectMasterEffectorTest {
+    private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class);
+
+    protected ManagementContext mgmt;
+    protected BasicApplication app;
+    protected BasicExecutionContext ec;
+    protected BrooklynCluster cluster;
+    protected FunctionFeed scanMaster;
+    protected Poller<Void> poller; 
+
+    @BeforeMethod
+    public void setUp() {
+        mgmt = new LocalManagementContextForTests();
+        EntitySpec<BasicApplication> appSpec = EntitySpec.create(BasicApplication.class)
+                .child(EntitySpec.create(BrooklynCluster.class));
+        app = ApplicationBuilder.newManagedApp(appSpec, mgmt);
+        cluster = (BrooklynCluster)Iterables.getOnlyElement(app.getChildren());
+
+        BasicExecutionManager em = new BasicExecutionManager("mycontext");
+        ec = new BasicExecutionContext(em);
+
+        poller = new Poller<Void>((EntityLocal)app, false);
+        poller.scheduleAtFixedRate(
+            new Callable<Void>() {
+                @Override
+                public Void call() throws Exception {
+                    masterFailoverIfNeeded();
+                    return null;
+                }
+            },
+            new DelegatingPollHandler<Void>(Collections.<AttributePollHandler<? super Void>>emptyList()),
+            Duration.millis(200));
+        poller.start();
+    }
+
+    @AfterMethod
+    public void tearDown() {
+        poller.stop();
+    }
+
+    @Test
+    public void testInvalidNewMasterIdFails() {
+        try {
+            BrooklynCluster cluster = app.addChild(EntitySpec.create(BrooklynCluster.class));
+            selectMaster(cluster, "1234");
+            fail("Non-existend entity ID provided.");
+        } catch (Exception e) {
+            assertTrue(e.toString().contains("1234 is not an ID of brooklyn node in this cluster"));
+        }
+    }
+
+    @Test
+    public void testSelectMaster() {
+        HttpCallback cb = new HttpCallback();
+        BrooklynNode node1 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class)
+                .impl(TestHttpEntity.class)
+                .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb));
+        BrooklynNode node2 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class)
+                .impl(TestHttpEntity.class)
+                .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb));
+
+        cluster.addMemberChild(node1);
+        cluster.addMemberChild(node2);
+
+        setManagementState(node1, ManagementNodeState.MASTER);
+        EntityTestUtils.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, node1);
+
+        selectMaster(cluster, node2.getId());
+        checkMaster(cluster, node2);
+    }
+
+    @Test(groups="WIP")
+    //after throwing an exception in HttpCallback tasks are no longer executed, why?
+    public void testSelectMasterFailsAtChangeState() {
+        HttpCallback cb = new HttpCallback();
+        cb.setFailAtStateChange(true);
+
+        BrooklynNode node1 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class)
+                .impl(TestHttpEntity.class)
+                .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb));
+        BrooklynNode node2 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class)
+                .impl(TestHttpEntity.class)
+                .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb));
+
+        cluster.addMemberChild(node1);
+        cluster.addMemberChild(node2);
+
+        setManagementState(node1, ManagementNodeState.MASTER);
+        EntityTestUtils.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, node1);
+
+        selectMaster(cluster, node2.getId());
+        checkMaster(cluster, node1);
+    }
+
+    private void checkMaster(Group cluster, Entity node) {
+        assertEquals(node.getAttribute(BrooklynNode.MANAGEMENT_NODE_STATE), ManagementNodeState.MASTER);
+        assertEquals(cluster.getAttribute(BrooklynCluster.MASTER_NODE), node);
+        for (Entity member : cluster.getMembers()) {
+            if (member != node) {
+                assertEquals(member.getAttribute(BrooklynNode.MANAGEMENT_NODE_STATE), ManagementNodeState.HOT_STANDBY);
+            }
+            assertEquals((int)member.getAttribute(TestHttpEntity.HA_PRIORITY), 0);
+        }
+    }
+
+    private static class HttpCallback implements Function<CallbackEntityHttpClient.Request, String> {
+        private enum State {
+            INITIAL,
+            PROMOTED
+        }
+        private State state = State.INITIAL;
+        private boolean failAtStateChange;
+
+        @Override
+        public String apply(Request input) {
+            if ("/v1/server/ha/state".equals(input.getPath())) {
+                if (failAtStateChange) {
+                    throw new RuntimeException("Testing failure at chaning node state");
+                }
+
+                checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/state", "mode", "HOT_STANDBY");
+                Entity entity = input.getEntity();
+                EntityTestUtils.assertAttributeEquals(entity, BrooklynNode.MANAGEMENT_NODE_STATE, ManagementNodeState.MASTER);
+                EntityTestUtils.assertAttributeEquals(entity, TestHttpEntity.HA_PRIORITY, 0);
+
+                setManagementState(entity, ManagementNodeState.HOT_STANDBY);
+
+                return "MASTER";
+            } else {
+                switch(state) {
+                case INITIAL:
+                    checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/priority", "priority", "1");
+                    state = State.PROMOTED;
+                    setPriority(input.getEntity(), Integer.parseInt(input.getParams().get("priority")));
+                    return "0";
+                case PROMOTED:
+                    checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/priority", "priority", "0");
+                    state = State.INITIAL;
+                    setPriority(input.getEntity(), Integer.parseInt(input.getParams().get("priority")));
+                    return "1";
+                default: throw new IllegalStateException("Illegal call at state " + state + ". Request = " + input.getMethod() + " " + input.getPath());
+                }
+            }
+        }
+
+        public void checkRequest(Request input, String methodName, String path, String... keyValue) {
+            if (!input.getMethod().equals(methodName) || !input.getPath().equals(path)) {
+                throw new IllegalStateException("Request doesn't match expected state. Expected = " + input.getMethod() + " " + input.getPath() + ". " +
+                        "Actual = " + methodName + " " + path);
+            }
+            for(int i = 0; i < keyValue.length / 2; i++) {
+                String key = keyValue[i];
+                String value = keyValue[i+1];
+                String inputValue = input.getParams().get(key);
+                if(!Objects.equal(value, inputValue)) {
+                    throw new IllegalStateException("Request doesn't match expected parameter " + methodName + " " + path + ". Parameter " + key + 
+                            " expected = " + value + ", actual = " + inputValue);
+                }
+            }
+        }
+
+        public void setFailAtStateChange(boolean failAtStateChange) {
+            this.failAtStateChange = failAtStateChange;
+        }
+
+    }
+
+    private void masterFailoverIfNeeded() {
+        if (cluster.getAttribute(BrooklynCluster.MASTER_NODE) == null) {
+            Collection<Entity> members = cluster.getMembers();
+            if (members.size() > 0) {
+                for (Entity member : members) {
+                    if (member.getAttribute(TestHttpEntity.HA_PRIORITY) == 1) {
+                        masterFailover(member);
+                        return;
+                    }
+                }
+                masterFailover(members.iterator().next());
+            }
+        }
+    }
+
+    private void masterFailover(Entity member) {
+        LOG.debug("Master failover to " + member);
+        setManagementState(member, ManagementNodeState.MASTER);
+        EntityTestUtils.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, (BrooklynNode)member);
+        return;
+    }
+
+    public static void setManagementState(Entity entity, ManagementNodeState state) {
+        ((EntityLocal)entity).setAttribute(BrooklynNode.MANAGEMENT_NODE_STATE, state);
+    }
+
+    public static void setPriority(Entity entity, int priority) {
+        ((EntityLocal)entity).setAttribute(TestHttpEntity.HA_PRIORITY, priority);
+    }
+
+    private void selectMaster(DynamicCluster cluster, String id) {
+        ec.submit(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, ImmutableMap.of(SelectMasterEffector.NEW_MASTER_ID.getName(), id))).asTask().getUnchecked();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java
----------------------------------------------------------------------
diff --git a/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java
new file mode 100644
index 0000000..259a271
--- /dev/null
+++ b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.util.Collection;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.EntityHttpClient;
+import brooklyn.entity.brooklynnode.effector.CallbackEntityHttpClient.Request;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.location.Location;
+
+import com.google.common.base.Function;
+import com.google.common.reflect.TypeToken;
+
+public class TestHttpEntity extends AbstractEntity implements BrooklynNode {
+    @SuppressWarnings("serial")
+    public static final ConfigKey<Function<Request, String>> HTTP_CLIENT_CALLBACK = ConfigKeys.newConfigKey(new TypeToken<Function<Request, String>>(){}, "httpClientCallback");
+    public static final AttributeSensor<Integer> HA_PRIORITY = new BasicAttributeSensor<Integer>(Integer.class, "priority");
+    
+    @Override
+    public void init() {
+        super.init();
+        getMutableEntityType().addEffector(SetHAPriorityEffectorBody.SET_HA_PRIORITY);
+        getMutableEntityType().addEffector(SetHAModeEffectorBody.SET_HA_MODE);
+        setAttribute(HA_PRIORITY, 0);
+    }
+
+    @Override
+    public EntityHttpClient http() {
+        return new CallbackEntityHttpClient(this, getConfig(HTTP_CLIENT_CALLBACK));
+    }
+
+    @Override
+    public void start(Collection<? extends Location> locations) {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public void restart() {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java
----------------------------------------------------------------------
diff --git a/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java b/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java
index e0ba7ea..5d1ca9b 100644
--- a/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java
+++ b/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java
@@ -27,6 +27,7 @@ import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
 
+import brooklyn.management.ha.HighAvailabilityMode;
 import brooklyn.management.ha.ManagementNodeState;
 import brooklyn.rest.apidoc.Apidoc;
 import brooklyn.rest.domain.HighAvailabilitySummary;
@@ -90,14 +91,34 @@ public interface ServerApi {
     @ApiOperation(value = "Returns the HA state of this management node")
     public ManagementNodeState getHighAvailabilityNodeState();
     
+    @POST
+    @Path("/ha/state")
+    @ApiOperation(value = "Changes the HA state of this management node")
+    public ManagementNodeState setHighAvailabilityNodeState(
+            @ApiParam(name = "state", value = "The state to change to")
+            @FormParam("mode") HighAvailabilityMode mode);
+
     @GET
     @Path("/ha/states")
     @ApiOperation(value = "Returns the HA states and detail for all nodes in this management plane",
         responseClass = "brooklyn.rest.domain.HighAvailabilitySummary")
     public HighAvailabilitySummary getHighAvailabilityPlaneStates();
+
+    @GET
+    @Path("/ha/priority")
+    @ApiOperation(value = "Returns the HA node priority for MASTER failover")
+    public long getHighAvailabitlityPriority();
     
+    @POST
+    @Path("/ha/priority")
+    @ApiOperation(value = "Sets the HA node priority for MASTER failover")
+    public long setHighAvailabilityPriority(
+            @ApiParam(name = "priority", value = "The priority to be set")
+            @FormParam("priority") long priority);
+
     @GET
     @Path("/user")
     @ApiOperation(value = "Return user information for this Brooklyn instance", responseClass = "String", multiValueResponse = false)
-    public String getUser();
+    public String getUser(); 
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java
----------------------------------------------------------------------
diff --git a/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java b/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java
index 55fe968..a9cf8a5 100644
--- a/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java
+++ b/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java
@@ -38,6 +38,8 @@ import brooklyn.entity.basic.StartableApplication;
 import brooklyn.management.Task;
 import brooklyn.management.entitlement.EntitlementContext;
 import brooklyn.management.entitlement.Entitlements;
+import brooklyn.management.ha.HighAvailabilityManager;
+import brooklyn.management.ha.HighAvailabilityMode;
 import brooklyn.management.ha.ManagementNodeState;
 import brooklyn.management.ha.ManagementPlaneSyncRecord;
 import brooklyn.management.internal.ManagementContextInternal;
@@ -228,7 +230,28 @@ public class ServerResource extends AbstractBrooklynRestResource implements Serv
     public ManagementNodeState getHighAvailabilityNodeState() {
         return mgmt().getHighAvailabilityManager().getNodeState();
     }
-    
+
+    @Override
+    public ManagementNodeState setHighAvailabilityNodeState(HighAvailabilityMode mode) {
+        HighAvailabilityManager haMgr = mgmt().getHighAvailabilityManager();
+        ManagementNodeState existingState = haMgr.getNodeState();
+        haMgr.changeMode(mode);
+        return existingState;
+    }
+
+    @Override
+    public long getHighAvailabitlityPriority() {
+        return mgmt().getHighAvailabilityManager().getPriority();
+    }
+
+    @Override
+    public long setHighAvailabilityPriority(long priority) {
+        HighAvailabilityManager haMgr = mgmt().getHighAvailabilityManager();
+        long oldPrio = haMgr.getPriority();
+        haMgr.setPriority(priority);
+        return oldPrio;
+    }
+
     @Override
     public HighAvailabilitySummary getHighAvailabilityPlaneStates() {
         ManagementPlaneSyncRecord memento = mgmt().getHighAvailabilityManager().getManagementPlaneSyncState();
@@ -244,4 +267,5 @@ public class ServerResource extends AbstractBrooklynRestResource implements Serv
             return null; //User can be null if no authentication was requested
         }
     }
+
 }