You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/11/03 16:51:47 UTC
[03/29] git commit: BrooklynNode cluster + upgrade effector
BrooklynNode cluster + upgrade effector
Also effectors to:
* select master in the cluster
* set HA priority
* set HA state
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/6f895aaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/6f895aaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/6f895aaf
Branch: refs/heads/master
Commit: 6f895aafe49cc8046cc043c17bb38cbbb4018da4
Parents: 63f29bd
Author: Svetoslav Neykov <sv...@cloudsoftcorp.com>
Authored: Wed Oct 22 11:51:19 2014 +0300
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Oct 31 09:36:16 2014 -0500
----------------------------------------------------------------------
.../brooklyn/entity/basic/EntityPredicates.java | 9 +
.../entity/brooklynnode/BrooklynCluster.java | 56 ++++
.../brooklynnode/BrooklynClusterImpl.java | 110 ++++++++
.../brooklynnode/BrooklynEntityMirrorImpl.java | 2 -
.../entity/brooklynnode/BrooklynNode.java | 30 ++-
.../entity/brooklynnode/BrooklynNodeImpl.java | 16 +-
.../effector/SelectMasterEffectorBody.java | 173 ++++++++++++
.../effector/SetHAModeEffectorBody.java | 64 +++++
.../effector/SetHAPriorityEffectorBody.java | 55 ++++
.../effector/UpgradeClusterEffectorBody.java | 199 ++++++++++++++
.../effector/CallbackEntityHttpClient.java | 90 +++++++
.../effector/SelectMasterEffectorTest.java | 267 +++++++++++++++++++
.../brooklynnode/effector/TestHttpEntity.java | 66 +++++
.../main/java/brooklyn/rest/api/ServerApi.java | 23 +-
.../brooklyn/rest/resources/ServerResource.java | 26 +-
15 files changed, 1178 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java b/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java
index e0db9e9..b359b1e 100644
--- a/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java
+++ b/core/src/main/java/brooklyn/entity/basic/EntityPredicates.java
@@ -90,6 +90,15 @@ public class EntityPredicates {
};
}
+ public static <T> Predicate<Entity> attributeNotEqualTo(final AttributeSensor<T> attribute, final T val) {
+ return new SerializablePredicate<Entity>() {
+ @Override
+ public boolean apply(@Nullable Entity input) {
+ return (input != null) && !Objects.equal(input.getAttribute(attribute), val);
+ }
+ };
+ }
+
public static <T> Predicate<Entity> configEqualTo(final ConfigKey<T> configKey, final T val) {
return new SerializablePredicate<Entity>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
new file mode 100644
index 0000000..a3c2157
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Effector;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.ImplementedBy;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+
+@ImplementedBy(BrooklynClusterImpl.class)
+public interface BrooklynCluster extends DynamicCluster {
+ public static final AttributeSensor<BrooklynNode> MASTER_NODE = new BasicAttributeSensor<BrooklynNode>(
+ BrooklynNode.class, "brooklyncluster.master", "Pointer to the child node with MASTER state in the cluster");
+
+ public interface SelectMasterEffector {
+ ConfigKey<String> NEW_MASTER_ID = ConfigKeys.newStringConfigKey(
+ "brooklyncluster.new_master_id", "The ID of the node to become master", null);
+ Effector<Void> SELECT_MASTER = Effectors.effector(Void.class, "selectMaster")
+ .description("Select a new master in the cluster")
+ .parameter(NEW_MASTER_ID)
+ .buildAbstract();
+ }
+
+ public static final Effector<Void> SELECT_MASTER = SelectMasterEffector.SELECT_MASTER;
+
+ public interface UpgradeClusterEffector {
+ Effector<Void> UPGRADE_CLUSTER = Effectors.effector(Void.class, "upgradeCluster")
+ .description("Upgrade the cluster with new distribution version")
+ .parameter(SoftwareProcess.DOWNLOAD_URL.getConfigKey())
+ .buildAbstract();
+ }
+
+ public static final Effector<Void> UPGRADE_CLUSTER = UpgradeClusterEffector.UPGRADE_CLUSTER;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
new file mode 100644
index 0000000..bfaf33c
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
+import brooklyn.entity.brooklynnode.effector.SelectMasterEffectorBody;
+import brooklyn.entity.brooklynnode.effector.UpgradeClusterEffectorBody;
+import brooklyn.entity.group.DynamicClusterImpl;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.event.feed.function.FunctionPollConfig;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.time.Duration;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+
+public class BrooklynClusterImpl extends DynamicClusterImpl implements BrooklynCluster {
+
+ private static final String MSG_NO_MASTER = "No master node in cluster";
+
+ private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class);
+
+ //TODO set MEMBER_SPEC
+
+ private FunctionFeed scanMaster;
+
+ @Override
+ public void init() {
+ super.init();
+ getMutableEntityType().addEffector(SelectMasterEffectorBody.SELECT_MASTER);
+ getMutableEntityType().addEffector(UpgradeClusterEffectorBody.UPGRADE_CLUSTER);
+
+ ServiceNotUpLogic.updateNotUpIndicator(this, MASTER_NODE, MSG_NO_MASTER);
+ scanMaster = FunctionFeed.builder()
+ .entity(this)
+ .poll(new FunctionPollConfig<Object, BrooklynNode>(MASTER_NODE)
+ .period(Duration.ONE_SECOND)
+ .callable(new Callable<BrooklynNode>() {
+ @Override
+ public BrooklynNode call() throws Exception {
+ return findMasterChild();
+ }
+ }))
+ .build();
+ }
+
+ private BrooklynNode findMasterChild() {
+ Collection<Entity> masters = FluentIterable.from(getMembers())
+ .filter(EntityPredicates.attributeEqualTo(BrooklynNode.MANAGEMENT_NODE_STATE, ManagementNodeState.MASTER))
+ .toList();
+
+ if (masters.size() == 0) {
+ ServiceNotUpLogic.updateNotUpIndicator(this, MASTER_NODE, MSG_NO_MASTER);
+ return null;
+ } else if (masters.size() == 1) {
+ ServiceNotUpLogic.clearNotUpIndicator(this, MASTER_NODE);
+ return (BrooklynNode)Iterables.getOnlyElement(masters);
+ } else if (masters.size() == 2) {
+ //Probably hit a window where we have a new master
+ //its BrooklynNode picked it up, but the BrooklynNode
+ //for the old master hasn't refreshed its state yet.
+ //Just pick one of them, should sort itself out in next update.
+ LOG.warn("Two masters detected, probably a handover just occured: " + masters);
+
+ //Don't clearNotUpIndicator - if there were no masters previously
+ //why have two now.
+
+ return (BrooklynNode)Iterables.getOnlyElement(masters);
+ } else {
+ //Set on fire?
+ String msg = "Multiple (>=3) master nodes in cluster: " + masters;
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+
+ if (scanMaster != null && scanMaster.isActivated()) {
+ scanMaster.stop();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java
index a71402f..b0fb728 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynEntityMirrorImpl.java
@@ -25,8 +25,6 @@ import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.http.HttpStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import brooklyn.entity.Effector;
import brooklyn.entity.basic.AbstractEntity;
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java
index 16b46c6..6208813 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNode.java
@@ -40,6 +40,8 @@ import brooklyn.event.basic.Sensors;
import brooklyn.event.basic.BasicAttributeSensorAndConfigKey.StringAttributeSensorAndConfigKey;
import brooklyn.event.basic.MapConfigKey;
import brooklyn.event.basic.PortAttributeSensorAndConfigKey;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
import brooklyn.util.collections.MutableMap;
import brooklyn.util.flags.SetFromFlag;
import brooklyn.util.net.Networking;
@@ -215,8 +217,11 @@ public interface BrooklynNode extends SoftwareProcess, UsesJava {
"brooklynnode.webconsole.portMapper", "Function for mapping private to public ports, for use in inferring the brooklyn URI", Functions.<Integer>identity());
public static final AttributeSensor<URI> WEB_CONSOLE_URI = new BasicAttributeSensor<URI>(
- URI.class, "brooklynnode.webconsole.url", "URL of the brooklyn web-console");
-
+ URI.class, "brooklynnode.webconsole.url", "URL of the brooklyn web-console");
+
+ public static final AttributeSensor<ManagementNodeState> MANAGEMENT_NODE_STATE = new BasicAttributeSensor<ManagementNodeState>(
+ ManagementNodeState.class, "brooklynnode.ha.state", "High-availability state of the management node (MASTER, HOT_STANDBY, etc)");
+
@SetFromFlag("noShutdownOnExit")
public static final ConfigKey<Boolean> NO_SHUTDOWN_ON_EXIT = ConfigKeys.newBooleanConfigKey("brooklynnode.noshutdownonexit",
"Whether to shutdown entities on exit", false);
@@ -275,6 +280,25 @@ public interface BrooklynNode extends SoftwareProcess, UsesJava {
public static final Effector<Void> STOP_NODE_AND_KILL_APPS = StopNodeAndKillAppsEffector.STOP_NODE_AND_KILL_APPS;
- public EntityHttpClient http();
+ public interface SetHAPriorityEffector {
+ ConfigKey<Integer> PRIORITY = ConfigKeys.newIntegerConfigKey("priority", "HA priority");
+ Effector<Integer> SET_HA_PRIORITY = Effectors.effector(Integer.class, "setHAPriotity")
+ .description("Set HA priority on the node, returns the old priority")
+ .parameter(PRIORITY)
+ .buildAbstract();
+ }
+ public static final Effector<Integer> SET_HA_PRIORITY = SetHAPriorityEffector.SET_HA_PRIORITY;
+
+ public interface SetHAModeEffector {
+ ConfigKey<HighAvailabilityMode> MODE = ConfigKeys.newConfigKey(HighAvailabilityMode.class, "mode", "HA mode");
+ Effector<ManagementNodeState> SET_HA_MODE = Effectors.effector(ManagementNodeState.class, "setHAMode")
+ .description("Set HA mode on the node, returns the existing state")
+ .parameter(MODE)
+ .buildAbstract();
+ }
+
+ public static final Effector<ManagementNodeState> SET_HA_MODE = SetHAModeEffector.SET_HA_MODE;
+
+ public EntityHttpClient http();
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
index ddf2b96..630563d 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
@@ -38,13 +38,17 @@ import brooklyn.entity.basic.EntityPredicates;
import brooklyn.entity.basic.Lifecycle;
import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.brooklynnode.effector.SetHAModeEffectorBody;
+import brooklyn.entity.brooklynnode.effector.SetHAPriorityEffectorBody;
import brooklyn.entity.effector.EffectorBody;
import brooklyn.entity.effector.Effectors;
import brooklyn.event.feed.ConfigToAttributes;
import brooklyn.event.feed.http.HttpFeed;
import brooklyn.event.feed.http.HttpPollConfig;
import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.event.feed.http.JsonFunctions;
import brooklyn.management.TaskAdaptable;
+import brooklyn.management.ha.ManagementNodeState;
import brooklyn.util.collections.Jsonya;
import brooklyn.util.collections.MutableMap;
import brooklyn.util.config.ConfigBag;
@@ -52,6 +56,7 @@ import brooklyn.util.exceptions.Exceptions;
import brooklyn.util.exceptions.PropagatedRuntimeException;
import brooklyn.util.guava.Functionals;
import brooklyn.util.http.HttpToolResponse;
+import brooklyn.util.javalang.Enums;
import brooklyn.util.javalang.JavaClassNames;
import brooklyn.util.repeat.Repeater;
import brooklyn.util.task.DynamicTasks;
@@ -62,6 +67,7 @@ import brooklyn.util.time.Duration;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNode {
@@ -94,6 +100,8 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod
getMutableEntityType().addEffector(ShutdownEffectorBody.SHUTDOWN);
getMutableEntityType().addEffector(StopNodeButLeaveAppsEffectorBody.STOP_NODE_BUT_LEAVE_APPS);
getMutableEntityType().addEffector(StopNodeAndKillAppsEffectorBody.STOP_NODE_AND_KILL_APPS);
+ getMutableEntityType().addEffector(SetHAPriorityEffectorBody.SET_HA_PRIORITY);
+ getMutableEntityType().addEffector(SetHAModeEffectorBody.SET_HA_MODE);
}
@Override
@@ -191,7 +199,9 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod
.addIfNotNull("delayForHttpReturn", toNullableString(parameters.get(DELAY_FOR_HTTP_RETURN)));
try {
HttpToolResponse resp = ((BrooklynNode)entity()).http()
- .post("/v1/server/shutdown", MutableMap.<String, String>of(), formParams);
+ .post("/v1/server/shutdown",
+ ImmutableMap.of("Brooklyn-Allow-Non-Master-Access", "true"),
+ formParams);
if (resp.getResponseCode() != HttpStatus.SC_NO_CONTENT) {
throw new IllegalStateException("Response code "+resp.getResponseCode());
}
@@ -341,6 +351,10 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod
.poll(new HttpPollConfig<Boolean>(WEB_CONSOLE_ACCESSIBLE)
.onSuccess(HttpValueFunctions.responseCodeEquals(200))
.setOnFailureOrException(false))
+ .poll(new HttpPollConfig<ManagementNodeState>(MANAGEMENT_NODE_STATE)
+ .suburl("/v1/server/ha/state")
+ .onSuccess(Functionals.chain(Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.cast(String.class)), Enums.fromStringFunction(ManagementNodeState.class)))
+ .setOnFailureOrException(null))
.build();
if (!Lifecycle.RUNNING.equals(getAttribute(SERVICE_STATE_ACTUAL))) {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java
new file mode 100644
index 0000000..255b27c
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorBody.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.util.NoSuchElementException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.entity.Effector;
+import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.brooklynnode.BrooklynCluster;
+import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAPriorityEffector;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.time.Duration;
+
+import com.google.api.client.util.Preconditions;
+import com.google.common.collect.Iterables;
+
+public class SelectMasterEffectorBody extends EffectorBody<Void> implements SelectMasterEffector {
+ public static final Effector<Void> SELECT_MASTER = Effectors.effector(SelectMasterEffector.SELECT_MASTER).impl(new SelectMasterEffectorBody()).build();
+
+ private static final Logger LOG = LoggerFactory.getLogger(SelectMasterEffectorBody.class);
+
+ private static final int HA_STANDBY_PRIORITY = 0;
+ private static final int HA_MASTER_PRIORITY = 1;
+
+ private AtomicBoolean selectMasterInProgress = new AtomicBoolean();
+
+ @Override
+ public Void call(ConfigBag parameters) {
+ if (!selectMasterInProgress.compareAndSet(false, true)) {
+ throw new IllegalStateException("A master change is already in progress.");
+ }
+
+ try {
+ selectMaster(parameters);
+ } finally {
+ selectMasterInProgress.set(false);
+ }
+ return null;
+ }
+
+ private void selectMaster(ConfigBag parameters) {
+ String newMasterId = parameters.get(NEW_MASTER_ID);
+ Preconditions.checkNotNull(newMasterId, NEW_MASTER_ID.getName() + " parameter is required");
+
+ final Entity oldMaster = entity().getAttribute(BrooklynCluster.MASTER_NODE);
+ if (oldMaster != null && oldMaster.getId().equals(newMasterId)) {
+ LOG.info(newMasterId + " is already the current master, no change needed.");
+ return;
+ }
+
+ final Entity newMaster = getMember(newMasterId);
+
+ //1. Increase the priority of the node we wish to become master
+ setNodePriority(newMaster, HA_MASTER_PRIORITY);
+
+ //2. Denote the existing master so a new election takes place
+ try {
+ //If no master was yet selected, at least wait to see
+ //if the new master will be what we expect.
+ if (oldMaster != null) {
+ setNodeState(oldMaster, HighAvailabilityMode.HOT_STANDBY);
+ }
+
+ waitMasterHandover(oldMaster, newMaster);
+ } finally {
+ //3. Revert the priority of the node once it has become master
+ setNodePriority(newMaster, HA_STANDBY_PRIORITY);
+ }
+
+ checkMasterSelected(newMaster);
+ }
+
+ private void waitMasterHandover(final Entity oldMaster, final Entity newMaster) {
+ boolean masterChanged = Repeater.create()
+ .backoff(Duration.millis(500), 1.2, Duration.FIVE_SECONDS)
+ .limitTimeTo(Duration.ONE_MINUTE)
+ .until(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ Entity master = getMasterNode();
+ return master != oldMaster && master != null;
+ }
+ })
+ .run();
+ if (!masterChanged) {
+ LOG.warn("Timeout waiting for node to become master: " + newMaster + ".");
+ }
+ }
+
+ private void setNodeState(Entity oldMaster, HighAvailabilityMode mode) {
+ ManagementNodeState oldState = DynamicTasks.queue(
+ Effectors.invocation(
+ oldMaster,
+ BrooklynNode.SET_HA_MODE,
+ MutableMap.of(SetHAModeEffector.MODE, mode))
+ ).asTask().getUnchecked();
+
+ if (oldState != ManagementNodeState.MASTER) {
+ LOG.warn("The previous HA state on node " + oldMaster.getId() + " was " + oldState +
+ ", while the expected value is " + ManagementNodeState.MASTER + ".");
+ }
+ }
+
+ private void setNodePriority(Entity newMaster, int newPriority) {
+ Integer oldPriority = DynamicTasks.queue(
+ Effectors.invocation(
+ newMaster,
+ BrooklynNode.SET_HA_PRIORITY,
+ MutableMap.of(SetHAPriorityEffector.PRIORITY, newPriority))
+ ).asTask().getUnchecked();
+
+ Integer expectedPriority = (newPriority == HA_MASTER_PRIORITY ? HA_STANDBY_PRIORITY : HA_MASTER_PRIORITY);
+ if (oldPriority != expectedPriority) {
+ LOG.warn("The previous HA priority on node " + newMaster.getId() + " was " + oldPriority +
+ ", while the expected value is " + expectedPriority + " (while setting priority " +
+ newPriority + ").");
+ }
+ }
+
+ private void checkMasterSelected(Entity newMaster) {
+ Entity actualMaster = getMasterNode();
+ if (actualMaster != newMaster) {
+ throw new IllegalStateException("Expected node " + newMaster + " to be master, but found that " +
+ "master is " + actualMaster + " instead.");
+ }
+ }
+
+ private Entity getMember(String memberId) {
+ Group cluster = (Group)entity();
+ try {
+ return Iterables.find(cluster.getMembers(), EntityPredicates.idEqualTo(memberId));
+ } catch (NoSuchElementException e) {
+ throw new IllegalStateException(memberId + " is not an ID of brooklyn node in this cluster");
+ }
+ }
+
+ private Entity getMasterNode() {
+ return entity().getAttribute(BrooklynCluster.MASTER_NODE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java
new file mode 100644
index 0000000..36a51d0
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAModeEffectorBody.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import org.apache.http.HttpStatus;
+
+import brooklyn.entity.Effector;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector;
+import brooklyn.entity.brooklynnode.EntityHttpClient;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.event.feed.http.JsonFunctions;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.http.HttpToolResponse;
+import brooklyn.util.javalang.Enums;
+
+import com.google.api.client.util.Preconditions;
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableMap;
+
+public class SetHAModeEffectorBody extends EffectorBody<ManagementNodeState> implements SetHAModeEffector {
+ public static final Effector<ManagementNodeState> SET_HA_MODE = Effectors.effector(SetHAModeEffector.SET_HA_MODE).impl(new SetHAModeEffectorBody()).build();
+
+ @Override
+ public ManagementNodeState call(ConfigBag parameters) {
+ HighAvailabilityMode mode = parameters.get(MODE);
+ Preconditions.checkNotNull(mode, MODE.getName() + " parameter is required");
+
+ EntityHttpClient httpClient = ((BrooklynNode)entity()).http();
+ HttpToolResponse resp = httpClient.post("/v1/server/ha/state",
+ ImmutableMap.of("Brooklyn-Allow-Non-Master-Access", "true"),
+ ImmutableMap.of("mode", mode.toString()));
+
+ if (resp.getResponseCode() == HttpStatus.SC_OK) {
+ Function<HttpToolResponse, ManagementNodeState> parseRespone = Functionals.chain(
+ Functionals.chain(HttpValueFunctions.jsonContents(), JsonFunctions.cast(String.class)),
+ Enums.fromStringFunction(ManagementNodeState.class));
+ return parseRespone.apply(resp);
+ } else {
+ throw new IllegalStateException("Unexpected response code: " + resp.getResponseCode() + "\n" + resp.getContentAsString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java
new file mode 100644
index 0000000..94a961c
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/SetHAPriorityEffectorBody.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import org.apache.http.HttpStatus;
+
+import brooklyn.entity.Effector;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAPriorityEffector;
+import brooklyn.entity.brooklynnode.EntityHttpClient;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.http.HttpToolResponse;
+
+import com.google.api.client.util.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+public class SetHAPriorityEffectorBody extends EffectorBody<Integer> implements SetHAPriorityEffector {
+ public static final Effector<Integer> SET_HA_PRIORITY = Effectors.effector(SetHAPriorityEffector.SET_HA_PRIORITY).impl(new SetHAPriorityEffectorBody()).build();
+
+ @Override
+ public Integer call(ConfigBag parameters) {
+ Integer priority = parameters.get(PRIORITY);
+ Preconditions.checkNotNull(priority, PRIORITY.getName() + " parameter is required");
+
+ EntityHttpClient httpClient = ((BrooklynNode)entity()).http();
+ HttpToolResponse resp = httpClient.post("/v1/server/ha/priority",
+ ImmutableMap.of("Brooklyn-Allow-Non-Master-Access", "true"),
+ ImmutableMap.of("priority", priority.toString()));
+
+ if (resp.getResponseCode() == HttpStatus.SC_OK) {
+ return Integer.valueOf(resp.getContentAsString());
+ } else {
+ throw new IllegalStateException("Unexpected response code: " + resp.getResponseCode() + "\n" + resp.getContentAsString());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
new file mode 100644
index 0000000..70b184a
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Effector;
+import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.brooklynnode.BrooklynCluster;
+import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector;
+import brooklyn.entity.brooklynnode.BrooklynCluster.UpgradeClusterEffector;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.net.Urls;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.time.Duration;
+
+import com.google.api.client.util.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
+
+public class UpgradeClusterEffectorBody extends EffectorBody<Void> implements UpgradeClusterEffector {
+ public static final Effector<Void> UPGRADE_CLUSTER = Effectors.effector(UpgradeClusterEffector.UPGRADE_CLUSTER).impl(new UpgradeClusterEffectorBody()).build();
+
+ private AtomicBoolean upgradeInProgress = new AtomicBoolean();
+
+ @Override
+ public Void call(ConfigBag parameters) {
+ if (!upgradeInProgress.compareAndSet(false, true)) {
+ throw new IllegalStateException("An upgrade is already in progress.");
+ }
+
+ EntitySpec<?> memberSpec = entity().getConfig(BrooklynCluster.MEMBER_SPEC);
+ Preconditions.checkNotNull(memberSpec, BrooklynCluster.MEMBER_SPEC.getName() + " is required for " + UpgradeClusterEffector.class.getName());
+
+ Map<ConfigKey<?>, Object> specCfg = memberSpec.getConfig();
+ String oldDownloadUrl = (String) specCfg.get(BrooklynNode.DOWNLOAD_URL);
+ String oldUploadUrl = (String) specCfg.get(BrooklynNode.DISTRO_UPLOAD_URL);
+ String newDownloadUrl = parameters.get(BrooklynNode.DOWNLOAD_URL.getConfigKey());
+ String newUploadUrl = inferUploadUrl(newDownloadUrl);
+ try {
+ memberSpec.configure(BrooklynNode.DOWNLOAD_URL, newUploadUrl);
+ memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, newUploadUrl);
+ upgrade(parameters);
+ } catch (Exception e) {
+ memberSpec.configure(BrooklynNode.DOWNLOAD_URL, oldDownloadUrl);
+ memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, oldUploadUrl);
+ throw Exceptions.propagate(e);
+ } finally {
+ upgradeInProgress.set(false);
+ }
+ return null;
+ }
+
+ private String inferUploadUrl(String newDownloadUrl) {
+ boolean isLocal = "file".equals(Urls.getProtocol(newDownloadUrl)) || new File(newDownloadUrl).exists();
+ if (isLocal) {
+ return newDownloadUrl;
+ } else {
+ return null;
+ }
+ }
+
+ private void upgrade(ConfigBag parameters) {
+ //TODO might be worth separating each step in a task for better UI
+
+ Group cluster = (Group)entity();
+ Collection<Entity> initialMembers = cluster.getMembers();
+ int initialClusterSize = initialMembers.size();
+
+ //1. Initially create a single node to check if it will launch successfully
+ Entity initialNode = Iterables.getOnlyElement(createNodes(1));
+
+ //2. If everything is OK with the first node launch the rest as well
+ Collection<Entity> remainingNodes = createNodes(initialClusterSize - 1);
+
+ //3. Once we have all nodes running without errors switch master
+ DynamicTasks.queue(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, MutableMap.of(SelectMasterEffector.NEW_MASTER_ID, initialNode.getId()))).asTask().getUnchecked();
+
+ //4. Stop the nodes which were running at the start of the upgrade call, but keep them around.
+ // Should we create a quarantine-like zone for old stopped version?
+ // For members that were created meanwhile - they will be using the new version already. If the new version
+ // isn't good then they will fail to start as well, forcing the policies to retry (and succeed once the
+ // URL is reverted).
+ HashSet<Entity> oldMembers = new HashSet<Entity>(initialMembers);
+ oldMembers.removeAll(remainingNodes);
+ oldMembers.remove(initialNode);
+ DynamicTasks.queue(Effectors.invocation(BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, Collections.emptyMap(), oldMembers)).asTask().getUnchecked();
+ }
+
+ private Collection<Entity> createNodes(int nodeCnt) {
+ DynamicCluster cluster = (DynamicCluster)entity();
+
+ //1. Create the nodes
+ Collection<Entity> newNodes = cluster.resizeByDelta(nodeCnt);
+
+ //2. Wait for them to be RUNNING
+ waitAttributeNotEqualTo(
+ newNodes,
+ BrooklynNode.SERVICE_STATE_ACTUAL, Lifecycle.STARTING);
+
+ //3. Set HOT_STANDBY in case it is not enabled on the command line ...
+ DynamicTasks.queue(Effectors.invocation(
+ BrooklynNode.SET_HA_MODE,
+ MutableMap.of(SetHAModeEffector.MODE, HighAvailabilityMode.HOT_STANDBY),
+ newNodes)).asTask().getUnchecked();
+
+ //4. ... and wait until all of the nodes change state
+ //TODO if the REST call is blocking this is not needed
+ waitAttributeEqualTo(
+ newNodes,
+ BrooklynNode.MANAGEMENT_NODE_STATE,
+ ManagementNodeState.HOT_STANDBY);
+
+ //5. Just in case check if all of the nodes are SERVICE_UP (which would rule out ON_FIRE as well)
+ Collection<Entity> failedNodes = Collections2.filter(newNodes, EntityPredicates.attributeEqualTo(BrooklynNode.SERVICE_UP, Boolean.FALSE));
+ if (!failedNodes.isEmpty()) {
+ throw new IllegalStateException("Nodes " + failedNodes + " are not " + BrooklynNode.SERVICE_UP + " though successfully in " + ManagementNodeState.HOT_STANDBY);
+ }
+ return newNodes;
+ }
+
+ private <T> void waitAttributeEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
+ waitPredicate(
+ nodes,
+ EntityPredicates.attributeEqualTo(sensor, value),
+ "Waiting for nodes " + nodes + ", sensor " + sensor + " to be " + value,
+ "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change to " + value);
+ }
+
+ private <T> void waitAttributeNotEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
+ waitPredicate(
+ nodes,
+ EntityPredicates.attributeNotEqualTo(sensor, value),
+ "Waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value,
+ "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value);
+ }
+
+ private <T extends Entity> void waitPredicate(Collection<T> nodes, Predicate<T> waitPredicate, String blockingMsg, String errorMsg) {
+ Tasks.setBlockingDetails(blockingMsg);
+ boolean pollSuccess = Repeater.create(blockingMsg)
+ .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
+ .limitTimeTo(Duration.ONE_HOUR)
+ .until(nodes, allMatch(waitPredicate))
+ .run();
+ Tasks.resetBlockingDetails();
+
+ if (!pollSuccess) {
+ throw new IllegalStateException(errorMsg);
+ }
+ }
+
+ public static <T> Predicate<Collection<T>> allMatch(final Predicate<T> predicate) {
+ return new Predicate<Collection<T>>() {
+ @Override
+ public boolean apply(Collection<T> input) {
+ return Iterables.all(input, predicate);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java
----------------------------------------------------------------------
diff --git a/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java
new file mode 100644
index 0000000..8c8004b
--- /dev/null
+++ b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/CallbackEntityHttpClient.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.brooklynnode.EntityHttpClient;
+import brooklyn.util.http.HttpTool.HttpClientBuilder;
+import brooklyn.util.http.HttpToolResponse;
+
+import com.google.common.base.Function;
+
+public class CallbackEntityHttpClient implements EntityHttpClient {
+ public static class Request {
+ private Entity entity;
+ private String method;
+ private String path;
+ private Map<String, String> params;
+ public Request(Entity entity, String method, String path, Map<String, String> params) {
+ this.entity = entity;
+ this.method = method;
+ this.path = path;
+ this.params = params;
+ }
+ public Entity getEntity() {
+ return entity;
+ }
+ public String getMethod() {
+ return method;
+ }
+ public String getPath() {
+ return path;
+ }
+ public Map<String, String> getParams() {
+ return params;
+ }
+ }
+ private Function<Request, String> callback;
+ private Entity entity;
+
+ public CallbackEntityHttpClient(Entity entity, Function<Request, String> callback) {
+ this.entity = entity;
+ this.callback = callback;
+ }
+
+ @Override
+ public HttpClientBuilder getHttpClientForBrooklynNode() {
+ throw new IllegalStateException("Method call not expected");
+ }
+
+ @Override
+ public HttpToolResponse get(String path) {
+ String result = callback.apply(new Request(entity, HttpGet.METHOD_NAME, path, Collections.<String, String>emptyMap()));
+ return new HttpToolResponse(HttpStatus.SC_OK, null, result.getBytes(), 0, 0, 0);
+ }
+
+ @Override
+ public HttpToolResponse post(String path, Map<String, String> headers, byte[] body) {
+ throw new IllegalStateException("Method call not expected");
+ }
+
+ @Override
+ public HttpToolResponse post(String path, Map<String, String> headers, Map<String, String> formParams) {
+ String result = callback.apply(new Request(entity, HttpPost.METHOD_NAME, path, formParams));
+ return new HttpToolResponse(HttpStatus.SC_OK, Collections.<String, List<String>>emptyMap(), result.getBytes(), 0, 0, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java
----------------------------------------------------------------------
diff --git a/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java
new file mode 100644
index 0000000..6036507
--- /dev/null
+++ b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/SelectMasterEffectorTest.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.Callable;
+
+import org.apache.http.client.methods.HttpPost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
+import brooklyn.entity.basic.ApplicationBuilder;
+import brooklyn.entity.basic.BasicApplication;
+import brooklyn.entity.basic.EntityLocal;
+import brooklyn.entity.brooklynnode.BrooklynCluster;
+import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector;
+import brooklyn.entity.brooklynnode.BrooklynClusterImpl;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.effector.CallbackEntityHttpClient.Request;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.feed.AttributePollHandler;
+import brooklyn.event.feed.DelegatingPollHandler;
+import brooklyn.event.feed.Poller;
+import brooklyn.event.feed.function.FunctionFeed;
+import brooklyn.management.ManagementContext;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.test.entity.LocalManagementContextForTests;
+import brooklyn.util.task.BasicExecutionContext;
+import brooklyn.util.task.BasicExecutionManager;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+
+public class SelectMasterEffectorTest {
+ private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class);
+
+ protected ManagementContext mgmt;
+ protected BasicApplication app;
+ protected BasicExecutionContext ec;
+ protected BrooklynCluster cluster;
+ protected FunctionFeed scanMaster;
+ protected Poller<Void> poller;
+
+ @BeforeMethod
+ public void setUp() {
+ mgmt = new LocalManagementContextForTests();
+ EntitySpec<BasicApplication> appSpec = EntitySpec.create(BasicApplication.class)
+ .child(EntitySpec.create(BrooklynCluster.class));
+ app = ApplicationBuilder.newManagedApp(appSpec, mgmt);
+ cluster = (BrooklynCluster)Iterables.getOnlyElement(app.getChildren());
+
+ BasicExecutionManager em = new BasicExecutionManager("mycontext");
+ ec = new BasicExecutionContext(em);
+
+ poller = new Poller<Void>((EntityLocal)app, false);
+ poller.scheduleAtFixedRate(
+ new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ masterFailoverIfNeeded();
+ return null;
+ }
+ },
+ new DelegatingPollHandler<Void>(Collections.<AttributePollHandler<? super Void>>emptyList()),
+ Duration.millis(200));
+ poller.start();
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ poller.stop();
+ }
+
+ @Test
+ public void testInvalidNewMasterIdFails() {
+ try {
+ BrooklynCluster cluster = app.addChild(EntitySpec.create(BrooklynCluster.class));
+ selectMaster(cluster, "1234");
+ fail("Non-existend entity ID provided.");
+ } catch (Exception e) {
+ assertTrue(e.toString().contains("1234 is not an ID of brooklyn node in this cluster"));
+ }
+ }
+
+ @Test
+ public void testSelectMaster() {
+ HttpCallback cb = new HttpCallback();
+ BrooklynNode node1 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class)
+ .impl(TestHttpEntity.class)
+ .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb));
+ BrooklynNode node2 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class)
+ .impl(TestHttpEntity.class)
+ .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb));
+
+ cluster.addMemberChild(node1);
+ cluster.addMemberChild(node2);
+
+ setManagementState(node1, ManagementNodeState.MASTER);
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, node1);
+
+ selectMaster(cluster, node2.getId());
+ checkMaster(cluster, node2);
+ }
+
+ @Test(groups="WIP")
+ //after throwing an exception in HttpCallback tasks are no longer executed, why?
+ public void testSelectMasterFailsAtChangeState() {
+ HttpCallback cb = new HttpCallback();
+ cb.setFailAtStateChange(true);
+
+ BrooklynNode node1 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class)
+ .impl(TestHttpEntity.class)
+ .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb));
+ BrooklynNode node2 = cluster.addMemberChild(EntitySpec.create(BrooklynNode.class)
+ .impl(TestHttpEntity.class)
+ .configure(TestHttpEntity.HTTP_CLIENT_CALLBACK, cb));
+
+ cluster.addMemberChild(node1);
+ cluster.addMemberChild(node2);
+
+ setManagementState(node1, ManagementNodeState.MASTER);
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, node1);
+
+ selectMaster(cluster, node2.getId());
+ checkMaster(cluster, node1);
+ }
+
+ private void checkMaster(Group cluster, Entity node) {
+ assertEquals(node.getAttribute(BrooklynNode.MANAGEMENT_NODE_STATE), ManagementNodeState.MASTER);
+ assertEquals(cluster.getAttribute(BrooklynCluster.MASTER_NODE), node);
+ for (Entity member : cluster.getMembers()) {
+ if (member != node) {
+ assertEquals(member.getAttribute(BrooklynNode.MANAGEMENT_NODE_STATE), ManagementNodeState.HOT_STANDBY);
+ }
+ assertEquals((int)member.getAttribute(TestHttpEntity.HA_PRIORITY), 0);
+ }
+ }
+
+ private static class HttpCallback implements Function<CallbackEntityHttpClient.Request, String> {
+ private enum State {
+ INITIAL,
+ PROMOTED
+ }
+ private State state = State.INITIAL;
+ private boolean failAtStateChange;
+
+ @Override
+ public String apply(Request input) {
+ if ("/v1/server/ha/state".equals(input.getPath())) {
+ if (failAtStateChange) {
+ throw new RuntimeException("Testing failure at chaning node state");
+ }
+
+ checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/state", "mode", "HOT_STANDBY");
+ Entity entity = input.getEntity();
+ EntityTestUtils.assertAttributeEquals(entity, BrooklynNode.MANAGEMENT_NODE_STATE, ManagementNodeState.MASTER);
+ EntityTestUtils.assertAttributeEquals(entity, TestHttpEntity.HA_PRIORITY, 0);
+
+ setManagementState(entity, ManagementNodeState.HOT_STANDBY);
+
+ return "MASTER";
+ } else {
+ switch(state) {
+ case INITIAL:
+ checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/priority", "priority", "1");
+ state = State.PROMOTED;
+ setPriority(input.getEntity(), Integer.parseInt(input.getParams().get("priority")));
+ return "0";
+ case PROMOTED:
+ checkRequest(input, HttpPost.METHOD_NAME, "/v1/server/ha/priority", "priority", "0");
+ state = State.INITIAL;
+ setPriority(input.getEntity(), Integer.parseInt(input.getParams().get("priority")));
+ return "1";
+ default: throw new IllegalStateException("Illegal call at state " + state + ". Request = " + input.getMethod() + " " + input.getPath());
+ }
+ }
+ }
+
+ public void checkRequest(Request input, String methodName, String path, String... keyValue) {
+ if (!input.getMethod().equals(methodName) || !input.getPath().equals(path)) {
+ throw new IllegalStateException("Request doesn't match expected state. Expected = " + input.getMethod() + " " + input.getPath() + ". " +
+ "Actual = " + methodName + " " + path);
+ }
+ for(int i = 0; i < keyValue.length / 2; i++) {
+ String key = keyValue[i];
+ String value = keyValue[i+1];
+ String inputValue = input.getParams().get(key);
+ if(!Objects.equal(value, inputValue)) {
+ throw new IllegalStateException("Request doesn't match expected parameter " + methodName + " " + path + ". Parameter " + key +
+ " expected = " + value + ", actual = " + inputValue);
+ }
+ }
+ }
+
+ public void setFailAtStateChange(boolean failAtStateChange) {
+ this.failAtStateChange = failAtStateChange;
+ }
+
+ }
+
+ private void masterFailoverIfNeeded() {
+ if (cluster.getAttribute(BrooklynCluster.MASTER_NODE) == null) {
+ Collection<Entity> members = cluster.getMembers();
+ if (members.size() > 0) {
+ for (Entity member : members) {
+ if (member.getAttribute(TestHttpEntity.HA_PRIORITY) == 1) {
+ masterFailover(member);
+ return;
+ }
+ }
+ masterFailover(members.iterator().next());
+ }
+ }
+ }
+
+ private void masterFailover(Entity member) {
+ LOG.debug("Master failover to " + member);
+ setManagementState(member, ManagementNodeState.MASTER);
+ EntityTestUtils.assertAttributeEqualsEventually(cluster, BrooklynCluster.MASTER_NODE, (BrooklynNode)member);
+ return;
+ }
+
+ public static void setManagementState(Entity entity, ManagementNodeState state) {
+ ((EntityLocal)entity).setAttribute(BrooklynNode.MANAGEMENT_NODE_STATE, state);
+ }
+
+ public static void setPriority(Entity entity, int priority) {
+ ((EntityLocal)entity).setAttribute(TestHttpEntity.HA_PRIORITY, priority);
+ }
+
+ private void selectMaster(DynamicCluster cluster, String id) {
+ ec.submit(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, ImmutableMap.of(SelectMasterEffector.NEW_MASTER_ID.getName(), id))).asTask().getUnchecked();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java
----------------------------------------------------------------------
diff --git a/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java
new file mode 100644
index 0000000..259a271
--- /dev/null
+++ b/software/base/src/test/java/brooklyn/entity/brooklynnode/effector/TestHttpEntity.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.util.Collection;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.AbstractEntity;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.EntityHttpClient;
+import brooklyn.entity.brooklynnode.effector.CallbackEntityHttpClient.Request;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.location.Location;
+
+import com.google.common.base.Function;
+import com.google.common.reflect.TypeToken;
+
+public class TestHttpEntity extends AbstractEntity implements BrooklynNode {
+ @SuppressWarnings("serial")
+ public static final ConfigKey<Function<Request, String>> HTTP_CLIENT_CALLBACK = ConfigKeys.newConfigKey(new TypeToken<Function<Request, String>>(){}, "httpClientCallback");
+ public static final AttributeSensor<Integer> HA_PRIORITY = new BasicAttributeSensor<Integer>(Integer.class, "priority");
+
+ @Override
+ public void init() {
+ super.init();
+ getMutableEntityType().addEffector(SetHAPriorityEffectorBody.SET_HA_PRIORITY);
+ getMutableEntityType().addEffector(SetHAModeEffectorBody.SET_HA_MODE);
+ setAttribute(HA_PRIORITY, 0);
+ }
+
+ @Override
+ public EntityHttpClient http() {
+ return new CallbackEntityHttpClient(this, getConfig(HTTP_CLIENT_CALLBACK));
+ }
+
+ @Override
+ public void start(Collection<? extends Location> locations) {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public void restart() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java
----------------------------------------------------------------------
diff --git a/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java b/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java
index e0ba7ea..5d1ca9b 100644
--- a/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java
+++ b/usage/rest-api/src/main/java/brooklyn/rest/api/ServerApi.java
@@ -27,6 +27,7 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
+import brooklyn.management.ha.HighAvailabilityMode;
import brooklyn.management.ha.ManagementNodeState;
import brooklyn.rest.apidoc.Apidoc;
import brooklyn.rest.domain.HighAvailabilitySummary;
@@ -90,14 +91,34 @@ public interface ServerApi {
@ApiOperation(value = "Returns the HA state of this management node")
public ManagementNodeState getHighAvailabilityNodeState();
+ @POST
+ @Path("/ha/state")
+ @ApiOperation(value = "Changes the HA state of this management node")
+ public ManagementNodeState setHighAvailabilityNodeState(
+ @ApiParam(name = "state", value = "The state to change to")
+ @FormParam("mode") HighAvailabilityMode mode);
+
@GET
@Path("/ha/states")
@ApiOperation(value = "Returns the HA states and detail for all nodes in this management plane",
responseClass = "brooklyn.rest.domain.HighAvailabilitySummary")
public HighAvailabilitySummary getHighAvailabilityPlaneStates();
+
+ @GET
+ @Path("/ha/priority")
+ @ApiOperation(value = "Returns the HA node priority for MASTER failover")
+ public long getHighAvailabitlityPriority();
+ @POST
+ @Path("/ha/priority")
+ @ApiOperation(value = "Sets the HA node priority for MASTER failover")
+ public long setHighAvailabilityPriority(
+ @ApiParam(name = "priority", value = "The priority to be set")
+ @FormParam("priority") long priority);
+
@GET
@Path("/user")
@ApiOperation(value = "Return user information for this Brooklyn instance", responseClass = "String", multiValueResponse = false)
- public String getUser();
+ public String getUser();
+
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6f895aaf/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java
----------------------------------------------------------------------
diff --git a/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java b/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java
index 55fe968..a9cf8a5 100644
--- a/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java
+++ b/usage/rest-server/src/main/java/brooklyn/rest/resources/ServerResource.java
@@ -38,6 +38,8 @@ import brooklyn.entity.basic.StartableApplication;
import brooklyn.management.Task;
import brooklyn.management.entitlement.EntitlementContext;
import brooklyn.management.entitlement.Entitlements;
+import brooklyn.management.ha.HighAvailabilityManager;
+import brooklyn.management.ha.HighAvailabilityMode;
import brooklyn.management.ha.ManagementNodeState;
import brooklyn.management.ha.ManagementPlaneSyncRecord;
import brooklyn.management.internal.ManagementContextInternal;
@@ -228,7 +230,28 @@ public class ServerResource extends AbstractBrooklynRestResource implements Serv
public ManagementNodeState getHighAvailabilityNodeState() {
return mgmt().getHighAvailabilityManager().getNodeState();
}
-
+
+ @Override
+ public ManagementNodeState setHighAvailabilityNodeState(HighAvailabilityMode mode) {
+ HighAvailabilityManager haMgr = mgmt().getHighAvailabilityManager();
+ ManagementNodeState existingState = haMgr.getNodeState();
+ haMgr.changeMode(mode);
+ return existingState;
+ }
+
+ @Override
+ public long getHighAvailabitlityPriority() {
+ return mgmt().getHighAvailabilityManager().getPriority();
+ }
+
+ @Override
+ public long setHighAvailabilityPriority(long priority) {
+ HighAvailabilityManager haMgr = mgmt().getHighAvailabilityManager();
+ long oldPrio = haMgr.getPriority();
+ haMgr.setPriority(priority);
+ return oldPrio;
+ }
+
@Override
public HighAvailabilitySummary getHighAvailabilityPlaneStates() {
ManagementPlaneSyncRecord memento = mgmt().getHighAvailabilityManager().getManagementPlaneSyncState();
@@ -244,4 +267,5 @@ public class ServerResource extends AbstractBrooklynRestResource implements Serv
return null; //User can be null if no authentication was requested
}
}
+
}