You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/11/03 16:51:54 UTC
[10/29] git commit: bunch of clean ups - mainly addressing
@aledsage's comments on @neykov's cluster upgrade PR,
plus misc things i've noticed. needs further testing though.
bunch of clean ups - mainly addressing @aledsage's comments on @neykov's cluster upgrade PR, plus misc things i've noticed.
needs further testing though.
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7bae4e66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7bae4e66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7bae4e66
Branch: refs/heads/master
Commit: 7bae4e66342048069b079eb035b492b1c0d000fd
Parents: 634b66b
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Sat Oct 25 00:32:24 2014 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Oct 31 09:38:19 2014 -0500
----------------------------------------------------------------------
.../entity/brooklynnode/BrooklynCluster.java | 4 +-
.../brooklynnode/BrooklynClusterImpl.java | 59 ++---
.../entity/brooklynnode/BrooklynNodeImpl.java | 3 +-
.../brooklynnode/BrooklynUpgradeEffector.java | 214 -------------------
.../brooklynnode/RemoteEffectorBuilder.java | 23 +-
.../BrooklynClusterUpgradeEffectorBody.java | 203 ++++++++++++++++++
.../BrooklynNodeUpgradeEffectorBody.java | 212 ++++++++++++++++++
.../effector/UpgradeClusterEffectorBody.java | 199 -----------------
8 files changed, 460 insertions(+), 457 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
index a3c2157..30a46bd 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
@@ -26,11 +26,11 @@ import brooklyn.entity.effector.Effectors;
import brooklyn.entity.group.DynamicCluster;
import brooklyn.entity.proxying.ImplementedBy;
import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.Sensors;
@ImplementedBy(BrooklynClusterImpl.class)
public interface BrooklynCluster extends DynamicCluster {
- public static final AttributeSensor<BrooklynNode> MASTER_NODE = new BasicAttributeSensor<BrooklynNode>(
+ public static final AttributeSensor<BrooklynNode> MASTER_NODE = Sensors.newSensor(
BrooklynNode.class, "brooklyncluster.master", "Pointer to the child node with MASTER state in the cluster");
public interface SelectMasterEffector {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
index bfaf33c..dc3ee17 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
@@ -26,9 +26,10 @@ import org.slf4j.LoggerFactory;
import brooklyn.entity.Entity;
import brooklyn.entity.basic.EntityPredicates;
-import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceProblemsLogic;
import brooklyn.entity.brooklynnode.effector.SelectMasterEffectorBody;
-import brooklyn.entity.brooklynnode.effector.UpgradeClusterEffectorBody;
+import brooklyn.entity.brooklynnode.effector.BrooklynClusterUpgradeEffectorBody;
import brooklyn.entity.group.DynamicClusterImpl;
import brooklyn.event.feed.function.FunctionFeed;
import brooklyn.event.feed.function.FunctionPollConfig;
@@ -41,69 +42,69 @@ import com.google.common.collect.Iterables;
public class BrooklynClusterImpl extends DynamicClusterImpl implements BrooklynCluster {
private static final String MSG_NO_MASTER = "No master node in cluster";
+ private static final String MSG_TOO_MANY_MASTERS = "Too many master nodes in cluster";
private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class);
//TODO set MEMBER_SPEC
+ @SuppressWarnings("unused")
private FunctionFeed scanMaster;
@Override
public void init() {
super.init();
getMutableEntityType().addEffector(SelectMasterEffectorBody.SELECT_MASTER);
- getMutableEntityType().addEffector(UpgradeClusterEffectorBody.UPGRADE_CLUSTER);
+ getMutableEntityType().addEffector(BrooklynClusterUpgradeEffectorBody.UPGRADE_CLUSTER);
- ServiceNotUpLogic.updateNotUpIndicator(this, MASTER_NODE, MSG_NO_MASTER);
+ ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_NO_MASTER);
scanMaster = FunctionFeed.builder()
.entity(this)
.poll(new FunctionPollConfig<Object, BrooklynNode>(MASTER_NODE)
.period(Duration.ONE_SECOND)
- .callable(new Callable<BrooklynNode>() {
- @Override
- public BrooklynNode call() throws Exception {
- return findMasterChild();
- }
- }))
+ .callable(new MasterChildFinder()))
.build();
}
+ private final class MasterChildFinder implements Callable<BrooklynNode> {
+ @Override
+ public BrooklynNode call() throws Exception {
+ return findMasterChild();
+ }
+ }
+
private BrooklynNode findMasterChild() {
Collection<Entity> masters = FluentIterable.from(getMembers())
.filter(EntityPredicates.attributeEqualTo(BrooklynNode.MANAGEMENT_NODE_STATE, ManagementNodeState.MASTER))
.toList();
if (masters.size() == 0) {
- ServiceNotUpLogic.updateNotUpIndicator(this, MASTER_NODE, MSG_NO_MASTER);
+ ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_NO_MASTER);
return null;
+
} else if (masters.size() == 1) {
- ServiceNotUpLogic.clearNotUpIndicator(this, MASTER_NODE);
+ ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(this, MASTER_NODE);
return (BrooklynNode)Iterables.getOnlyElement(masters);
+
} else if (masters.size() == 2) {
- //Probably hit a window where we have a new master
+ LOG.warn("Two masters detected, probably a handover just occured: " + masters);
+
+ //Don't clearProblemsIndicator - if there were no masters previously why have two now.
+ //But also don't set it. Probably hit a window where we have a new master
//its BrooklynNode picked it up, but the BrooklynNode
//for the old master hasn't refreshed its state yet.
//Just pick one of them, should sort itself out in next update.
- LOG.warn("Two masters detected, probably a handover just occured: " + masters);
-
- //Don't clearNotUpIndicator - if there were no masters previously
- //why have two now.
-
- return (BrooklynNode)Iterables.getOnlyElement(masters);
+
+ //TODO Do set such indicator if this continues for an extended period of time
+
+ return (BrooklynNode)masters.iterator().next();
+
} else {
- //Set on fire?
+ ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_TOO_MANY_MASTERS);
String msg = "Multiple (>=3) master nodes in cluster: " + masters;
LOG.error(msg);
throw new IllegalStateException(msg);
- }
- }
-
- @Override
- public void stop() {
- super.stop();
-
- if (scanMaster != null && scanMaster.isActivated()) {
- scanMaster.stop();
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
index 6aa2420..5bdbe2d 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
@@ -39,6 +39,7 @@ import brooklyn.entity.basic.Lifecycle;
import brooklyn.entity.basic.ServiceStateLogic;
import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.brooklynnode.effector.BrooklynNodeUpgradeEffectorBody;
import brooklyn.entity.brooklynnode.effector.SetHAModeEffectorBody;
import brooklyn.entity.brooklynnode.effector.SetHAPriorityEffectorBody;
import brooklyn.entity.effector.EffectorBody;
@@ -103,7 +104,7 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod
getMutableEntityType().addEffector(StopNodeAndKillAppsEffectorBody.STOP_NODE_AND_KILL_APPS);
getMutableEntityType().addEffector(SetHAPriorityEffectorBody.SET_HA_PRIORITY);
getMutableEntityType().addEffector(SetHAModeEffectorBody.SET_HA_MODE);
- getMutableEntityType().addEffector(BrooklynUpgradeEffector.UPGRADE);
+ getMutableEntityType().addEffector(BrooklynNodeUpgradeEffectorBody.UPGRADE);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynUpgradeEffector.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynUpgradeEffector.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynUpgradeEffector.java
deleted file mode 100644
index 3dc2015..0000000
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynUpgradeEffector.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.brooklynnode;
-
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.Effector;
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.EntityPredicates;
-import brooklyn.entity.basic.SoftwareProcess;
-import brooklyn.entity.effector.EffectorBody;
-import brooklyn.entity.effector.Effectors;
-import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.entity.software.SshEffectorTasks;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.MapConfigKey;
-import brooklyn.management.TaskAdaptable;
-import brooklyn.management.ha.HighAvailabilityMode;
-import brooklyn.management.ha.ManagementNodeState;
-import brooklyn.util.config.ConfigBag;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.exceptions.ReferenceWithError;
-import brooklyn.util.guava.Functionals;
-import brooklyn.util.net.Urls;
-import brooklyn.util.repeat.Repeater;
-import brooklyn.util.task.DynamicTasks;
-import brooklyn.util.task.Tasks;
-import brooklyn.util.text.Identifiers;
-import brooklyn.util.text.Strings;
-import brooklyn.util.time.Duration;
-
-import com.google.common.base.Functions;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.reflect.TypeToken;
-
-@SuppressWarnings("serial")
-/** Upgrades a brooklyn node in-place on the box,
- * by creating a child brooklyn node and ensuring it can rebind in HOT_STANDBY
- * <p>
- * Requires the target node to have persistence enabled.
- */
-public class BrooklynUpgradeEffector {
-
- private static final Logger log = LoggerFactory.getLogger(BrooklynUpgradeEffector.class);
-
- public static final ConfigKey<String> DOWNLOAD_URL = BrooklynNode.DOWNLOAD_URL.getConfigKey();
- public static final ConfigKey<Map<String,Object>> EXTRA_CONFIG = MapConfigKey.builder(new TypeToken<Map<String,Object>>() {}).name("extraConfig").description("Additional new config to set on this entity as part of upgrading").build();
-
- public static final Effector<Void> UPGRADE = Effectors.effector(Void.class, "upgrade")
- .description("Changes the Brooklyn build used to run this node, by spawning a dry-run node then copying the installed files across. "
- + "This node must be running for persistence for in-place upgrading to work.")
- .parameter(BrooklynNode.SUGGESTED_VERSION).parameter(DOWNLOAD_URL).parameter(EXTRA_CONFIG)
- .impl(new UpgradeImpl()).build();
-
- public static class UpgradeImpl extends EffectorBody<Void> {
- @Override
- public Void call(ConfigBag parametersO) {
- if (!isPersistenceModeEnabled(entity())) {
- // would could try a `forcePersistNow`, but that's sloppy;
- // for now, require HA/persistence for upgrading
- DynamicTasks.queue( Tasks.warning("Persistence does not appear to be enabled at this node. "
- + "In-place upgrade is unlikely to succeed.", null) );
- }
-
- ConfigBag parameters = ConfigBag.newInstanceCopying(parametersO);
-
- /*
- * all parameters are passed to children, apart from EXTRA_CONFIG
- * whose value (as a map) is so passed; it provides an easy way to set extra config in the gui.
- * (IOW a key-value mapping can be passed either inside EXTRA_CONFIG or as a sibling to EXTRA_CONFIG)
- */
- if (parameters.containsKey(EXTRA_CONFIG)) {
- Map<String, Object> extra = parameters.get(EXTRA_CONFIG);
- parameters.remove(EXTRA_CONFIG);
- parameters.putAll(extra);
- }
- log.debug(this+" upgrading, using "+parameters);
-
- // TODO require entity() node state master or hot standby AND require persistence enabled, or a new 'force_attempt_upgrade' parameter to be applied
- // TODO could have a 'skip_dry_run_upgrade' parameter
- // TODO could support 'dry_run_only' parameter, with optional resumption tasks (eg new dynamic effector)
-
- // 1 add new brooklyn version entity as child (so uses same machine), with same config apart from things in parameters
- final BrooklynNode dryRunChild = entity().addChild(EntitySpec.create(BrooklynNode.class).configure(parameters.getAllConfig())
- .displayName("Upgraded Version Dry-Run Node")
- // force dir and label back to their defaults (do not piggy back on what may have already been installed)
- .configure(BrooklynNode.INSTALL_DIR, BrooklynNode.INSTALL_DIR.getConfigKey().getDefaultValue())
- .configure(BrooklynNode.INSTALL_UNIQUE_LABEL, "upgrade-tmp-"+Identifiers.makeRandomId(8))
- .configure(parameters.getAllConfig()));
-
- //force this to start as hot-standby
- String launchParameters = dryRunChild.getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS);
- if (Strings.isBlank(launchParameters)) launchParameters = "";
- else launchParameters += " ";
- launchParameters += "--highAvailability "+HighAvailabilityMode.HOT_STANDBY;
- ((EntityInternal)dryRunChild).setConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS, launchParameters);
-
- Entities.manage(dryRunChild);
- final String dryRunNodeUid = dryRunChild.getId();
- ((EntityInternal)dryRunChild).setDisplayName("Dry-Run Upgraded Brooklyn Node ("+dryRunNodeUid+")");
-
- DynamicTasks.queue(Effectors.invocation(dryRunChild, BrooklynNode.START, ConfigBag.EMPTY));
-
- // 2 confirm hot standby status
- DynamicTasks.queue(newWaitForAttributeTask(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE,
- Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.TWO_MINUTES));
-
- // 3 stop new version
- // 4 stop old version
- DynamicTasks.queue(Tasks.builder().name("shutdown original and transient nodes")
- .add(Effectors.invocation(dryRunChild, BrooklynNode.SHUTDOWN, ConfigBag.EMPTY))
- .add(Effectors.invocation(entity(), BrooklynNode.SHUTDOWN, ConfigBag.EMPTY))
- .build());
-
- // 5 move old files, and move new files
- DynamicTasks.queue(Tasks.builder().name("setup new version").body(new Runnable() {
- @Override
- public void run() {
- String runDir = entity().getAttribute(SoftwareProcess.RUN_DIR);
- String bkDir = Urls.mergePaths(runDir, "..", Urls.getBasename(runDir)+"-backups", dryRunNodeUid);
- String dryRunDir = Preconditions.checkNotNull(dryRunChild.getAttribute(SoftwareProcess.RUN_DIR));
- log.debug(this+" storing backup of previous version in "+bkDir);
- DynamicTasks.queue(SshEffectorTasks.ssh(
- "cd "+runDir,
- "mkdir -p "+bkDir,
- "mv * "+bkDir,
- "cd "+dryRunDir,
- "mv * "+runDir
- ).summary("move files"));
- }
- }).build());
-
- entity().getConfigMap().addToLocalBag(parameters.getAllConfig());
-
- // 6 start this entity, running the new version
- DynamicTasks.queue(Effectors.invocation(entity(), BrooklynNode.START, ConfigBag.EMPTY));
-
- DynamicTasks.waitForLast();
- Entities.unmanage(dryRunChild);
-
- return null;
- }
-
- private boolean isPersistenceModeEnabled(EntityInternal entity) {
- // TODO when there are PERSIST* options in BrooklynNode, look at them here!
- // or, better, have a sensor for persistence
- String params = entity.getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS);
- if (params==null) return false;
- if (params.indexOf("persist")==0) return false;
- return true;
- }
-
- }
-
- private static class WaitForRepeaterCallable implements Callable<Boolean> {
- protected Repeater repeater;
- protected boolean requireTrue;
-
- public WaitForRepeaterCallable(Repeater repeater, boolean requireTrue) {
- this.repeater = repeater;
- this.requireTrue = requireTrue;
- }
-
- @Override
- public Boolean call() {
- ReferenceWithError<Boolean> result = repeater.runKeepingError();
- if (Boolean.TRUE.equals(result.getWithoutError()))
- return true;
- if (result.hasError())
- throw Exceptions.propagate(result.getError());
- if (requireTrue)
- throw new IllegalStateException("timeout - "+repeater.getDescription());
- return false;
- }
- }
-
- private static <T> TaskAdaptable<Boolean> newWaitForAttributeTask(Entity node, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
- return awaiting( Repeater.create("waiting on "+node+" "+sensor.getName()+" "+condition)
- .backoff(Duration.millis(10), 1.5, Duration.millis(200))
- .limitTimeTo(timeout==null ? Duration.PRACTICALLY_FOREVER : timeout)
- .until(Functionals.callable(Functions.forPredicate(EntityPredicates.attributeSatisfies(sensor, condition)), node)),
- true);
- }
-
- private static TaskAdaptable<Boolean> awaiting(Repeater repeater, boolean requireTrue) {
- return Tasks.<Boolean>builder().name(repeater.getDescription()).body(new WaitForRepeaterCallable(repeater, requireTrue)).build();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/RemoteEffectorBuilder.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/RemoteEffectorBuilder.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/RemoteEffectorBuilder.java
index eba3c1a..3b1cdc1 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/RemoteEffectorBuilder.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/RemoteEffectorBuilder.java
@@ -37,19 +37,28 @@ public class RemoteEffectorBuilder {
return input.getContentAsString();
}
}
+
public static Collection<Effector<String>> of(Collection<?> cfgEffectors) {
Collection<Effector<String>> effectors = new ArrayList<Effector<String>>();
for (Object objEff : cfgEffectors) {
Map<?, ?> cfgEff = (Map<?, ?>)objEff;
-// String returnTypeName = (String)cfgEff.get("returnType");
String effName = (String)cfgEff.get("name");
String description = (String)cfgEff.get("description");
-// Class<?> returnType = getType(returnTypeName);
EffectorBuilder<String> eff = Effectors.effector(String.class, effName);
Collection<?> params = (Collection<?>)cfgEff.get("parameters");
+ /* The *return type* should NOT be included in the signature here.
+ * It might be a type known only at the mirrored brooklyn node
+ * (in which case loading it here would fail); or possibly it could
+ * be a different version of the type here, in which case the signature
+ * would look valid here, but deserializing it would fail.
+ *
+ * Best to just pass the json representation back to the caller.
+ * (They won't be able to tell the difference between that and deserialize-then-serialize!)
+ */
+
if (description != null) {
eff.description(description);
}
@@ -65,21 +74,11 @@ public class RemoteEffectorBuilder {
}
private static void buildParam(EffectorBuilder<String> eff, Map<?, ?> cfgParam) {
-// String type = (String)cfgParam.get("type");
String name = (String)cfgParam.get("name");
String description = (String)cfgParam.get("description");
String defaultValue = (String)cfgParam.get("defaultValue");
-// Class<?> paramType = getType(type);
eff.parameter(Object.class, name, description, defaultValue /*TypeCoercions.coerce(defaultValue, paramType)*/);
}
-// private static Class<?> getType(String type) {
-// try {
-// return Class.forName(type);
-// } catch (ClassNotFoundException e) {
-// return Object.class;
-// }
-// }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
new file mode 100644
index 0000000..8215a0b
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Effector;
+import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.brooklynnode.BrooklynCluster;
+import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector;
+import brooklyn.entity.brooklynnode.BrooklynCluster.UpgradeClusterEffector;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.net.Urls;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.time.Duration;
+
+import com.google.api.client.util.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
+
+public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void> implements UpgradeClusterEffector {
+ public static final Effector<Void> UPGRADE_CLUSTER = Effectors.effector(UpgradeClusterEffector.UPGRADE_CLUSTER).impl(new BrooklynClusterUpgradeEffectorBody()).build();
+
+ private AtomicBoolean upgradeInProgress = new AtomicBoolean();
+
+ @Override
+ public Void call(ConfigBag parameters) {
+ if (!upgradeInProgress.compareAndSet(false, true)) {
+ throw new IllegalStateException("An upgrade is already in progress.");
+ }
+
+ EntitySpec<?> memberSpec = entity().getConfig(BrooklynCluster.MEMBER_SPEC);
+ Preconditions.checkNotNull(memberSpec, BrooklynCluster.MEMBER_SPEC.getName() + " is required for " + UpgradeClusterEffector.class.getName());
+
+ Map<ConfigKey<?>, Object> specCfg = memberSpec.getConfig();
+ String oldDownloadUrl = (String) specCfg.get(BrooklynNode.DOWNLOAD_URL);
+ String oldUploadUrl = (String) specCfg.get(BrooklynNode.DISTRO_UPLOAD_URL);
+ String newDownloadUrl = parameters.get(BrooklynNode.DOWNLOAD_URL.getConfigKey());
+ String newUploadUrl = inferUploadUrl(newDownloadUrl);
+ try {
+ memberSpec.configure(BrooklynNode.DOWNLOAD_URL, newUploadUrl);
+ memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, newUploadUrl);
+ upgrade(parameters);
+ } catch (Exception e) {
+ memberSpec.configure(BrooklynNode.DOWNLOAD_URL, oldDownloadUrl);
+ memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, oldUploadUrl);
+ throw Exceptions.propagate(e);
+ } finally {
+ upgradeInProgress.set(false);
+ }
+ return null;
+ }
+
+ private String inferUploadUrl(String newDownloadUrl) {
+ boolean isLocal = "file".equals(Urls.getProtocol(newDownloadUrl)) || new File(newDownloadUrl).exists();
+ if (isLocal) {
+ return newDownloadUrl;
+ } else {
+ return null;
+ }
+ }
+
+ private void upgrade(ConfigBag parameters) {
+ //TODO might be worth separating each step in a task for better UI
+ //TODO currently this will fight with auto-scaler policies; you should turn them off
+
+ Group cluster = (Group)entity();
+ Collection<Entity> initialMembers = cluster.getMembers();
+ int initialClusterSize = initialMembers.size();
+
+ //1. Initially create a single node to check if it will launch successfully
+ Entity initialNode = Iterables.getOnlyElement(createNodes(1));
+
+ //2. If everything is OK with the first node launch the rest as well
+ Collection<Entity> remainingNodes = createNodes(initialClusterSize - 1);
+
+ //3. Once we have all nodes running without errors switch master
+ DynamicTasks.queue(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, MutableMap.of(SelectMasterEffector.NEW_MASTER_ID, initialNode.getId()))).asTask().getUnchecked();
+
+ //4. Stop the nodes which were running at the start of the upgrade call, but keep them around.
+ // Should we create a quarantine-like zone for old stopped version?
+ // For members that were created meanwhile - they will be using the new version already. If the new version
+ // isn't good then they will fail to start as well, forcing the policies to retry (and succeed once the
+ // URL is reverted).
+ //TODO can get into problem state if more old nodes are created; better might be to set the
+ //version on this cluster before the above select-master call, and then delete any which are running the old
+ //version (would require tracking the version number at the entity)
+ HashSet<Entity> oldMembers = new HashSet<Entity>(initialMembers);
+ oldMembers.removeAll(remainingNodes);
+ oldMembers.remove(initialNode);
+ DynamicTasks.queue(Effectors.invocation(BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, Collections.emptyMap(), oldMembers)).asTask().getUnchecked();
+ }
+
+ private Collection<Entity> createNodes(int nodeCnt) {
+ DynamicCluster cluster = (DynamicCluster)entity();
+
+ //1. Create the nodes
+ Collection<Entity> newNodes = cluster.resizeByDelta(nodeCnt);
+
+ //2. Wait for them to be RUNNING
+ waitAttributeNotEqualTo(
+ newNodes,
+ BrooklynNode.SERVICE_STATE_ACTUAL, Lifecycle.STARTING);
+
+ //3. Set HOT_STANDBY in case it is not enabled on the command line ...
+ DynamicTasks.queue(Effectors.invocation(
+ BrooklynNode.SET_HA_MODE,
+ MutableMap.of(SetHAModeEffector.MODE, HighAvailabilityMode.HOT_STANDBY),
+ newNodes)).asTask().getUnchecked();
+
+ //4. ... and wait until all of the nodes change state
+ //TODO if the REST call is blocking this is not needed
+ waitAttributeEqualTo(
+ newNodes,
+ BrooklynNode.MANAGEMENT_NODE_STATE,
+ ManagementNodeState.HOT_STANDBY);
+
+ //5. Just in case check if all of the nodes are SERVICE_UP (which would rule out ON_FIRE as well)
+ Collection<Entity> failedNodes = Collections2.filter(newNodes, EntityPredicates.attributeEqualTo(BrooklynNode.SERVICE_UP, Boolean.FALSE));
+ if (!failedNodes.isEmpty()) {
+ throw new IllegalStateException("Nodes " + failedNodes + " are not " + BrooklynNode.SERVICE_UP + " though successfully in " + ManagementNodeState.HOT_STANDBY);
+ }
+ return newNodes;
+ }
+
+ private <T> void waitAttributeEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
+ waitPredicate(
+ nodes,
+ EntityPredicates.attributeEqualTo(sensor, value),
+ "Waiting for nodes " + nodes + ", sensor " + sensor + " to be " + value,
+ "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change to " + value);
+ }
+
+ private <T> void waitAttributeNotEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
+ waitPredicate(
+ nodes,
+ EntityPredicates.attributeNotEqualTo(sensor, value),
+ "Waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value,
+ "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value);
+ }
+
+ private <T extends Entity> void waitPredicate(Collection<T> nodes, Predicate<T> waitPredicate, String blockingMsg, String errorMsg) {
+ Tasks.setBlockingDetails(blockingMsg);
+ boolean pollSuccess = Repeater.create(blockingMsg)
+ .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
+ .limitTimeTo(Duration.ONE_HOUR)
+ .until(nodes, allMatch(waitPredicate))
+ .run();
+ Tasks.resetBlockingDetails();
+
+ if (!pollSuccess) {
+ throw new IllegalStateException(errorMsg);
+ }
+ }
+
+ public static <T> Predicate<Collection<T>> allMatch(final Predicate<T> predicate) {
+ return new Predicate<Collection<T>>() {
+ @Override
+ public boolean apply(Collection<T> input) {
+ return Iterables.all(input, predicate);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
new file mode 100644
index 0000000..a6433aa
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Effector;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.software.SshEffectorTasks;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.MapConfigKey;
+import brooklyn.management.TaskAdaptable;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.ReferenceWithError;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.net.Urls;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.reflect.TypeToken;
+
+@SuppressWarnings("serial")
+/** Upgrades a brooklyn node in-place on the box,
+ * by creating a child brooklyn node and ensuring it can rebind in HOT_STANDBY
+ * <p>
+ * Requires the target node to have persistence enabled.
+ */
+public class BrooklynNodeUpgradeEffectorBody extends EffectorBody<Void> {
+
+ private static final Logger log = LoggerFactory.getLogger(BrooklynNodeUpgradeEffectorBody.class);
+
+ public static final ConfigKey<String> DOWNLOAD_URL = BrooklynNode.DOWNLOAD_URL.getConfigKey();
+ public static final ConfigKey<Map<String,Object>> EXTRA_CONFIG = MapConfigKey.builder(new TypeToken<Map<String,Object>>() {}).name("extraConfig").description("Additional new config to set on this entity as part of upgrading").build();
+
+ public static final Effector<Void> UPGRADE = Effectors.effector(Void.class, "upgrade")
+ .description("Changes the Brooklyn build used to run this node, by spawning a dry-run node then copying the installed files across. "
+ + "This node must be running for persistence for in-place upgrading to work.")
+ .parameter(BrooklynNode.SUGGESTED_VERSION).parameter(DOWNLOAD_URL).parameter(EXTRA_CONFIG)
+ .impl(new BrooklynNodeUpgradeEffectorBody()).build();
+
+ @Override
+ public Void call(ConfigBag parametersO) {
+ if (!isPersistenceModeEnabled(entity())) {
+ // would could try a `forcePersistNow`, but that's sloppy;
+ // for now, require HA/persistence for upgrading
+ DynamicTasks.queue( Tasks.warning("Persistence does not appear to be enabled at this node. "
+ + "In-place upgrade is unlikely to succeed.", null) );
+ }
+
+ ConfigBag parameters = ConfigBag.newInstanceCopying(parametersO);
+
+ /*
+ * all parameters are passed to children, apart from EXTRA_CONFIG
+ * whose value (as a map) is so passed; it provides an easy way to set extra config in the gui.
+ * (IOW a key-value mapping can be passed either inside EXTRA_CONFIG or as a sibling to EXTRA_CONFIG)
+ */
+ if (parameters.containsKey(EXTRA_CONFIG)) {
+ Map<String, Object> extra = parameters.get(EXTRA_CONFIG);
+ parameters.remove(EXTRA_CONFIG);
+ parameters.putAll(extra);
+ }
+ log.debug(this+" upgrading, using "+parameters);
+
+ // TODO require entity() node state master or hot standby AND require persistence enabled, or a new 'force_attempt_upgrade' parameter to be applied
+ // TODO could have a 'skip_dry_run_upgrade' parameter
+ // TODO could support 'dry_run_only' parameter, with optional resumption tasks (eg new dynamic effector)
+
+ // 1 add new brooklyn version entity as child (so uses same machine), with same config apart from things in parameters
+ final BrooklynNode dryRunChild = entity().addChild(EntitySpec.create(BrooklynNode.class).configure(parameters.getAllConfig())
+ .displayName("Upgraded Version Dry-Run Node")
+ // force dir and label back to their defaults (do not piggy back on what may have already been installed)
+ .configure(BrooklynNode.INSTALL_DIR, BrooklynNode.INSTALL_DIR.getConfigKey().getDefaultValue())
+ .configure(BrooklynNode.INSTALL_UNIQUE_LABEL, "upgrade-tmp-"+Identifiers.makeRandomId(8))
+ .configure(parameters.getAllConfig()));
+
+ //force this to start as hot-standby
+ String launchParameters = dryRunChild.getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS);
+ if (Strings.isBlank(launchParameters)) launchParameters = "";
+ else launchParameters += " ";
+ launchParameters += "--highAvailability "+HighAvailabilityMode.HOT_STANDBY;
+ ((EntityInternal)dryRunChild).setConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS, launchParameters);
+
+ Entities.manage(dryRunChild);
+ final String dryRunNodeUid = dryRunChild.getId();
+ ((EntityInternal)dryRunChild).setDisplayName("Dry-Run Upgraded Brooklyn Node ("+dryRunNodeUid+")");
+
+ DynamicTasks.queue(Effectors.invocation(dryRunChild, BrooklynNode.START, ConfigBag.EMPTY));
+
+ // 2 confirm hot standby status
+ DynamicTasks.queue(newWaitForAttributeTask(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE,
+ Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.TWO_MINUTES));
+
+ // 3 stop new version
+ // 4 stop old version
+ DynamicTasks.queue(Tasks.builder().name("shutdown original and transient nodes")
+ .add(Effectors.invocation(dryRunChild, BrooklynNode.SHUTDOWN, ConfigBag.EMPTY))
+ .add(Effectors.invocation(entity(), BrooklynNode.SHUTDOWN, ConfigBag.EMPTY))
+ .build());
+
+ // 5 move old files, and move new files
+ DynamicTasks.queue(Tasks.builder().name("setup new version").body(new Runnable() {
+ @Override
+ public void run() {
+ String runDir = entity().getAttribute(SoftwareProcess.RUN_DIR);
+ String bkDir = Urls.mergePaths(runDir, "..", Urls.getBasename(runDir)+"-backups", dryRunNodeUid);
+ String dryRunDir = Preconditions.checkNotNull(dryRunChild.getAttribute(SoftwareProcess.RUN_DIR));
+ log.debug(this+" storing backup of previous version in "+bkDir);
+ DynamicTasks.queue(SshEffectorTasks.ssh(
+ "cd "+runDir,
+ "mkdir -p "+bkDir,
+ "mv * "+bkDir,
+ "cd "+dryRunDir,
+ "mv * "+runDir
+ ).summary("move files"));
+ }
+ }).build());
+
+ entity().getConfigMap().addToLocalBag(parameters.getAllConfig());
+
+ // 6 start this entity, running the new version
+ DynamicTasks.queue(Effectors.invocation(entity(), BrooklynNode.START, ConfigBag.EMPTY));
+
+ DynamicTasks.waitForLast();
+ Entities.unmanage(dryRunChild);
+
+ return null;
+ }
+
+ private boolean isPersistenceModeEnabled(EntityInternal entity) {
+ // TODO when there are PERSIST* options in BrooklynNode, look at them here!
+ // or, better, have a sensor for persistence
+ String params = entity.getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS);
+ if (params==null) return false;
+ if (params.indexOf("persist")==0) return false;
+ return true;
+ }
+
+ private static class WaitForRepeaterCallable implements Callable<Boolean> {
+ protected Repeater repeater;
+ protected boolean requireTrue;
+
+ public WaitForRepeaterCallable(Repeater repeater, boolean requireTrue) {
+ this.repeater = repeater;
+ this.requireTrue = requireTrue;
+ }
+
+ @Override
+ public Boolean call() {
+ ReferenceWithError<Boolean> result = repeater.runKeepingError();
+ if (Boolean.TRUE.equals(result.getWithoutError()))
+ return true;
+ if (result.hasError())
+ throw Exceptions.propagate(result.getError());
+ if (requireTrue)
+ throw new IllegalStateException("timeout - "+repeater.getDescription());
+ return false;
+ }
+ }
+
+ private static <T> TaskAdaptable<Boolean> newWaitForAttributeTask(Entity node, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
+ return awaiting( Repeater.create("waiting on "+node+" "+sensor.getName()+" "+condition)
+ .backoff(Duration.millis(10), 1.5, Duration.millis(200))
+ .limitTimeTo(timeout==null ? Duration.PRACTICALLY_FOREVER : timeout)
+ .until(Functionals.callable(Functions.forPredicate(EntityPredicates.attributeSatisfies(sensor, condition)), node)),
+ true);
+ }
+
+ private static TaskAdaptable<Boolean> awaiting(Repeater repeater, boolean requireTrue) {
+ return Tasks.<Boolean>builder().name(repeater.getDescription()).body(new WaitForRepeaterCallable(repeater, requireTrue)).build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
deleted file mode 100644
index 70b184a..0000000
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.brooklynnode.effector;
-
-import java.io.File;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.Effector;
-import brooklyn.entity.Entity;
-import brooklyn.entity.Group;
-import brooklyn.entity.basic.EntityPredicates;
-import brooklyn.entity.basic.Lifecycle;
-import brooklyn.entity.brooklynnode.BrooklynCluster;
-import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector;
-import brooklyn.entity.brooklynnode.BrooklynCluster.UpgradeClusterEffector;
-import brooklyn.entity.brooklynnode.BrooklynNode;
-import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector;
-import brooklyn.entity.effector.EffectorBody;
-import brooklyn.entity.effector.Effectors;
-import brooklyn.entity.group.DynamicCluster;
-import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.event.AttributeSensor;
-import brooklyn.management.ha.HighAvailabilityMode;
-import brooklyn.management.ha.ManagementNodeState;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.config.ConfigBag;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.net.Urls;
-import brooklyn.util.repeat.Repeater;
-import brooklyn.util.task.DynamicTasks;
-import brooklyn.util.task.Tasks;
-import brooklyn.util.time.Duration;
-
-import com.google.api.client.util.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Iterables;
-
-public class UpgradeClusterEffectorBody extends EffectorBody<Void> implements UpgradeClusterEffector {
- public static final Effector<Void> UPGRADE_CLUSTER = Effectors.effector(UpgradeClusterEffector.UPGRADE_CLUSTER).impl(new UpgradeClusterEffectorBody()).build();
-
- private AtomicBoolean upgradeInProgress = new AtomicBoolean();
-
- @Override
- public Void call(ConfigBag parameters) {
- if (!upgradeInProgress.compareAndSet(false, true)) {
- throw new IllegalStateException("An upgrade is already in progress.");
- }
-
- EntitySpec<?> memberSpec = entity().getConfig(BrooklynCluster.MEMBER_SPEC);
- Preconditions.checkNotNull(memberSpec, BrooklynCluster.MEMBER_SPEC.getName() + " is required for " + UpgradeClusterEffector.class.getName());
-
- Map<ConfigKey<?>, Object> specCfg = memberSpec.getConfig();
- String oldDownloadUrl = (String) specCfg.get(BrooklynNode.DOWNLOAD_URL);
- String oldUploadUrl = (String) specCfg.get(BrooklynNode.DISTRO_UPLOAD_URL);
- String newDownloadUrl = parameters.get(BrooklynNode.DOWNLOAD_URL.getConfigKey());
- String newUploadUrl = inferUploadUrl(newDownloadUrl);
- try {
- memberSpec.configure(BrooklynNode.DOWNLOAD_URL, newUploadUrl);
- memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, newUploadUrl);
- upgrade(parameters);
- } catch (Exception e) {
- memberSpec.configure(BrooklynNode.DOWNLOAD_URL, oldDownloadUrl);
- memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, oldUploadUrl);
- throw Exceptions.propagate(e);
- } finally {
- upgradeInProgress.set(false);
- }
- return null;
- }
-
- private String inferUploadUrl(String newDownloadUrl) {
- boolean isLocal = "file".equals(Urls.getProtocol(newDownloadUrl)) || new File(newDownloadUrl).exists();
- if (isLocal) {
- return newDownloadUrl;
- } else {
- return null;
- }
- }
-
- private void upgrade(ConfigBag parameters) {
- //TODO might be worth separating each step in a task for better UI
-
- Group cluster = (Group)entity();
- Collection<Entity> initialMembers = cluster.getMembers();
- int initialClusterSize = initialMembers.size();
-
- //1. Initially create a single node to check if it will launch successfully
- Entity initialNode = Iterables.getOnlyElement(createNodes(1));
-
- //2. If everything is OK with the first node launch the rest as well
- Collection<Entity> remainingNodes = createNodes(initialClusterSize - 1);
-
- //3. Once we have all nodes running without errors switch master
- DynamicTasks.queue(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, MutableMap.of(SelectMasterEffector.NEW_MASTER_ID, initialNode.getId()))).asTask().getUnchecked();
-
- //4. Stop the nodes which were running at the start of the upgrade call, but keep them around.
- // Should we create a quarantine-like zone for old stopped version?
- // For members that were created meanwhile - they will be using the new version already. If the new version
- // isn't good then they will fail to start as well, forcing the policies to retry (and succeed once the
- // URL is reverted).
- HashSet<Entity> oldMembers = new HashSet<Entity>(initialMembers);
- oldMembers.removeAll(remainingNodes);
- oldMembers.remove(initialNode);
- DynamicTasks.queue(Effectors.invocation(BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, Collections.emptyMap(), oldMembers)).asTask().getUnchecked();
- }
-
- private Collection<Entity> createNodes(int nodeCnt) {
- DynamicCluster cluster = (DynamicCluster)entity();
-
- //1. Create the nodes
- Collection<Entity> newNodes = cluster.resizeByDelta(nodeCnt);
-
- //2. Wait for them to be RUNNING
- waitAttributeNotEqualTo(
- newNodes,
- BrooklynNode.SERVICE_STATE_ACTUAL, Lifecycle.STARTING);
-
- //3. Set HOT_STANDBY in case it is not enabled on the command line ...
- DynamicTasks.queue(Effectors.invocation(
- BrooklynNode.SET_HA_MODE,
- MutableMap.of(SetHAModeEffector.MODE, HighAvailabilityMode.HOT_STANDBY),
- newNodes)).asTask().getUnchecked();
-
- //4. ... and wait until all of the nodes change state
- //TODO if the REST call is blocking this is not needed
- waitAttributeEqualTo(
- newNodes,
- BrooklynNode.MANAGEMENT_NODE_STATE,
- ManagementNodeState.HOT_STANDBY);
-
- //5. Just in case check if all of the nodes are SERVICE_UP (which would rule out ON_FIRE as well)
- Collection<Entity> failedNodes = Collections2.filter(newNodes, EntityPredicates.attributeEqualTo(BrooklynNode.SERVICE_UP, Boolean.FALSE));
- if (!failedNodes.isEmpty()) {
- throw new IllegalStateException("Nodes " + failedNodes + " are not " + BrooklynNode.SERVICE_UP + " though successfully in " + ManagementNodeState.HOT_STANDBY);
- }
- return newNodes;
- }
-
- private <T> void waitAttributeEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
- waitPredicate(
- nodes,
- EntityPredicates.attributeEqualTo(sensor, value),
- "Waiting for nodes " + nodes + ", sensor " + sensor + " to be " + value,
- "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change to " + value);
- }
-
- private <T> void waitAttributeNotEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
- waitPredicate(
- nodes,
- EntityPredicates.attributeNotEqualTo(sensor, value),
- "Waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value,
- "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value);
- }
-
- private <T extends Entity> void waitPredicate(Collection<T> nodes, Predicate<T> waitPredicate, String blockingMsg, String errorMsg) {
- Tasks.setBlockingDetails(blockingMsg);
- boolean pollSuccess = Repeater.create(blockingMsg)
- .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
- .limitTimeTo(Duration.ONE_HOUR)
- .until(nodes, allMatch(waitPredicate))
- .run();
- Tasks.resetBlockingDetails();
-
- if (!pollSuccess) {
- throw new IllegalStateException(errorMsg);
- }
- }
-
- public static <T> Predicate<Collection<T>> allMatch(final Predicate<T> predicate) {
- return new Predicate<Collection<T>>() {
- @Override
- public boolean apply(Collection<T> input) {
- return Iterables.all(input, predicate);
- }
- };
- }
-}