You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/11/03 16:51:54 UTC

[10/29] git commit: bunch of clean ups - mainly addressing @aledsage's comments on @neykov's cluster upgrade PR, plus misc things i've noticed. needs further testing though.

bunch of clean ups - mainly addressing @aledsage's comments on @neykov's cluster upgrade PR, plus misc things i've noticed.
needs further testing though.


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7bae4e66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7bae4e66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7bae4e66

Branch: refs/heads/master
Commit: 7bae4e66342048069b079eb035b492b1c0d000fd
Parents: 634b66b
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Sat Oct 25 00:32:24 2014 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Oct 31 09:38:19 2014 -0500

----------------------------------------------------------------------
 .../entity/brooklynnode/BrooklynCluster.java    |   4 +-
 .../brooklynnode/BrooklynClusterImpl.java       |  59 ++---
 .../entity/brooklynnode/BrooklynNodeImpl.java   |   3 +-
 .../brooklynnode/BrooklynUpgradeEffector.java   | 214 -------------------
 .../brooklynnode/RemoteEffectorBuilder.java     |  23 +-
 .../BrooklynClusterUpgradeEffectorBody.java     | 203 ++++++++++++++++++
 .../BrooklynNodeUpgradeEffectorBody.java        | 212 ++++++++++++++++++
 .../effector/UpgradeClusterEffectorBody.java    | 199 -----------------
 8 files changed, 460 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
index a3c2157..30a46bd 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynCluster.java
@@ -26,11 +26,11 @@ import brooklyn.entity.effector.Effectors;
 import brooklyn.entity.group.DynamicCluster;
 import brooklyn.entity.proxying.ImplementedBy;
 import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.BasicAttributeSensor;
+import brooklyn.event.basic.Sensors;
 
 @ImplementedBy(BrooklynClusterImpl.class)
 public interface BrooklynCluster extends DynamicCluster {
-    public static final AttributeSensor<BrooklynNode> MASTER_NODE = new BasicAttributeSensor<BrooklynNode>(
+    public static final AttributeSensor<BrooklynNode> MASTER_NODE = Sensors.newSensor(
             BrooklynNode.class, "brooklyncluster.master", "Pointer to the child node with MASTER state in the cluster");
 
     public interface SelectMasterEffector {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
index bfaf33c..dc3ee17 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
@@ -26,9 +26,10 @@ import org.slf4j.LoggerFactory;
 
 import brooklyn.entity.Entity;
 import brooklyn.entity.basic.EntityPredicates;
-import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
+import brooklyn.entity.basic.ServiceStateLogic;
+import brooklyn.entity.basic.ServiceStateLogic.ServiceProblemsLogic;
 import brooklyn.entity.brooklynnode.effector.SelectMasterEffectorBody;
-import brooklyn.entity.brooklynnode.effector.UpgradeClusterEffectorBody;
+import brooklyn.entity.brooklynnode.effector.BrooklynClusterUpgradeEffectorBody;
 import brooklyn.entity.group.DynamicClusterImpl;
 import brooklyn.event.feed.function.FunctionFeed;
 import brooklyn.event.feed.function.FunctionPollConfig;
@@ -41,69 +42,69 @@ import com.google.common.collect.Iterables;
 public class BrooklynClusterImpl extends DynamicClusterImpl implements BrooklynCluster {
 
     private static final String MSG_NO_MASTER = "No master node in cluster";
+    private static final String MSG_TOO_MANY_MASTERS = "Too many master nodes in cluster";
 
     private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class);
 
     //TODO set MEMBER_SPEC
 
+    @SuppressWarnings("unused")
     private FunctionFeed scanMaster;
 
     @Override
     public void init() {
         super.init();
         getMutableEntityType().addEffector(SelectMasterEffectorBody.SELECT_MASTER);
-        getMutableEntityType().addEffector(UpgradeClusterEffectorBody.UPGRADE_CLUSTER);
+        getMutableEntityType().addEffector(BrooklynClusterUpgradeEffectorBody.UPGRADE_CLUSTER);
 
-        ServiceNotUpLogic.updateNotUpIndicator(this, MASTER_NODE, MSG_NO_MASTER);
+        ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_NO_MASTER);
         scanMaster = FunctionFeed.builder()
                 .entity(this)
                 .poll(new FunctionPollConfig<Object, BrooklynNode>(MASTER_NODE)
                         .period(Duration.ONE_SECOND)
-                        .callable(new Callable<BrooklynNode>() {
-                                @Override
-                                public BrooklynNode call() throws Exception {
-                                    return findMasterChild();
-                                }
-                            }))
+                        .callable(new MasterChildFinder()))
                 .build();
     }
 
+    private final class MasterChildFinder implements Callable<BrooklynNode> {
+        @Override
+        public BrooklynNode call() throws Exception {
+            return findMasterChild();
+        }
+    }
+
     private BrooklynNode findMasterChild() {
         Collection<Entity> masters = FluentIterable.from(getMembers())
                 .filter(EntityPredicates.attributeEqualTo(BrooklynNode.MANAGEMENT_NODE_STATE, ManagementNodeState.MASTER))
                 .toList();
 
         if (masters.size() == 0) {
-            ServiceNotUpLogic.updateNotUpIndicator(this, MASTER_NODE, MSG_NO_MASTER);
+            ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_NO_MASTER);
             return null;
+            
         } else if (masters.size() == 1) {
-            ServiceNotUpLogic.clearNotUpIndicator(this, MASTER_NODE);
+            ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(this, MASTER_NODE);
             return (BrooklynNode)Iterables.getOnlyElement(masters);
+            
         } else if (masters.size() == 2) {
-            //Probably hit a window where we have a new master
+            LOG.warn("Two masters detected, probably a handover just occured: " + masters);
+
+            //Don't clearProblemsIndicator - if there were no masters previously why have two now.
+            //But also don't set it. Probably hit a window where we have a new master
             //its BrooklynNode picked it up, but the BrooklynNode
             //for the old master hasn't refreshed its state yet.
             //Just pick one of them, should sort itself out in next update.
-            LOG.warn("Two masters detected, probably a handover just occured: " + masters);
-
-            //Don't clearNotUpIndicator - if there were no masters previously
-            //why have two now.
-
-            return (BrooklynNode)Iterables.getOnlyElement(masters);
+            
+            //TODO Do set such indicator if this continues for an extended period of time
+            
+            return (BrooklynNode)masters.iterator().next();
+            
         } else {
-            //Set on fire?
+            ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_TOO_MANY_MASTERS);
             String msg = "Multiple (>=3) master nodes in cluster: " + masters;
             LOG.error(msg);
             throw new IllegalStateException(msg);
-        }
-    }
-
-    @Override
-    public void stop() {
-        super.stop();
-
-        if (scanMaster != null && scanMaster.isActivated()) {
-            scanMaster.stop();
+            
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
index 6aa2420..5bdbe2d 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynNodeImpl.java
@@ -39,6 +39,7 @@ import brooklyn.entity.basic.Lifecycle;
 import brooklyn.entity.basic.ServiceStateLogic;
 import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
 import brooklyn.entity.basic.SoftwareProcessImpl;
+import brooklyn.entity.brooklynnode.effector.BrooklynNodeUpgradeEffectorBody;
 import brooklyn.entity.brooklynnode.effector.SetHAModeEffectorBody;
 import brooklyn.entity.brooklynnode.effector.SetHAPriorityEffectorBody;
 import brooklyn.entity.effector.EffectorBody;
@@ -103,7 +104,7 @@ public class BrooklynNodeImpl extends SoftwareProcessImpl implements BrooklynNod
         getMutableEntityType().addEffector(StopNodeAndKillAppsEffectorBody.STOP_NODE_AND_KILL_APPS);
         getMutableEntityType().addEffector(SetHAPriorityEffectorBody.SET_HA_PRIORITY);
         getMutableEntityType().addEffector(SetHAModeEffectorBody.SET_HA_MODE);
-        getMutableEntityType().addEffector(BrooklynUpgradeEffector.UPGRADE);
+        getMutableEntityType().addEffector(BrooklynNodeUpgradeEffectorBody.UPGRADE);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynUpgradeEffector.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynUpgradeEffector.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynUpgradeEffector.java
deleted file mode 100644
index 3dc2015..0000000
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/BrooklynUpgradeEffector.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.brooklynnode;
-
-import java.util.Map;
-import java.util.concurrent.Callable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.Effector;
-import brooklyn.entity.Entity;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.EntityPredicates;
-import brooklyn.entity.basic.SoftwareProcess;
-import brooklyn.entity.effector.EffectorBody;
-import brooklyn.entity.effector.Effectors;
-import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.entity.software.SshEffectorTasks;
-import brooklyn.event.AttributeSensor;
-import brooklyn.event.basic.MapConfigKey;
-import brooklyn.management.TaskAdaptable;
-import brooklyn.management.ha.HighAvailabilityMode;
-import brooklyn.management.ha.ManagementNodeState;
-import brooklyn.util.config.ConfigBag;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.exceptions.ReferenceWithError;
-import brooklyn.util.guava.Functionals;
-import brooklyn.util.net.Urls;
-import brooklyn.util.repeat.Repeater;
-import brooklyn.util.task.DynamicTasks;
-import brooklyn.util.task.Tasks;
-import brooklyn.util.text.Identifiers;
-import brooklyn.util.text.Strings;
-import brooklyn.util.time.Duration;
-
-import com.google.common.base.Functions;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.reflect.TypeToken;
-
-@SuppressWarnings("serial")
-/** Upgrades a brooklyn node in-place on the box, 
- * by creating a child brooklyn node and ensuring it can rebind in HOT_STANDBY
- * <p>
- * Requires the target node to have persistence enabled. 
- */
-public class BrooklynUpgradeEffector {
-
-    private static final Logger log = LoggerFactory.getLogger(BrooklynUpgradeEffector.class);
-    
-    public static final ConfigKey<String> DOWNLOAD_URL = BrooklynNode.DOWNLOAD_URL.getConfigKey();
-    public static final ConfigKey<Map<String,Object>> EXTRA_CONFIG = MapConfigKey.builder(new TypeToken<Map<String,Object>>() {}).name("extraConfig").description("Additional new config to set on this entity as part of upgrading").build();
-
-    public static final Effector<Void> UPGRADE = Effectors.effector(Void.class, "upgrade")
-        .description("Changes the Brooklyn build used to run this node, by spawning a dry-run node then copying the installed files across. "
-            + "This node must be running for persistence for in-place upgrading to work.")
-        .parameter(BrooklynNode.SUGGESTED_VERSION).parameter(DOWNLOAD_URL).parameter(EXTRA_CONFIG)
-        .impl(new UpgradeImpl()).build();
-    
-    public static class UpgradeImpl extends EffectorBody<Void> {
-        @Override
-        public Void call(ConfigBag parametersO) {
-            if (!isPersistenceModeEnabled(entity())) {
-                // would could try a `forcePersistNow`, but that's sloppy; 
-                // for now, require HA/persistence for upgrading 
-                DynamicTasks.queue( Tasks.warning("Persistence does not appear to be enabled at this node. "
-                    + "In-place upgrade is unlikely to succeed.", null) );
-            }
-            
-            ConfigBag parameters = ConfigBag.newInstanceCopying(parametersO);
-            
-            /*
-             * all parameters are passed to children, apart from EXTRA_CONFIG
-             * whose value (as a map) is so passed; it provides an easy way to set extra config in the gui.
-             * (IOW a key-value mapping can be passed either inside EXTRA_CONFIG or as a sibling to EXTRA_CONFIG)  
-             */
-            if (parameters.containsKey(EXTRA_CONFIG)) {
-                Map<String, Object> extra = parameters.get(EXTRA_CONFIG);
-                parameters.remove(EXTRA_CONFIG);
-                parameters.putAll(extra);
-            }
-            log.debug(this+" upgrading, using "+parameters);
-            
-            // TODO require entity() node state master or hot standby AND require persistence enabled, or a new 'force_attempt_upgrade' parameter to be applied
-            // TODO could have a 'skip_dry_run_upgrade' parameter
-            // TODO could support 'dry_run_only' parameter, with optional resumption tasks (eg new dynamic effector)
-
-            // 1 add new brooklyn version entity as child (so uses same machine), with same config apart from things in parameters
-            final BrooklynNode dryRunChild = entity().addChild(EntitySpec.create(BrooklynNode.class).configure(parameters.getAllConfig())
-                .displayName("Upgraded Version Dry-Run Node")
-                // force dir and label back to their defaults (do not piggy back on what may have already been installed)
-                .configure(BrooklynNode.INSTALL_DIR, BrooklynNode.INSTALL_DIR.getConfigKey().getDefaultValue())
-                .configure(BrooklynNode.INSTALL_UNIQUE_LABEL, "upgrade-tmp-"+Identifiers.makeRandomId(8))
-                .configure(parameters.getAllConfig()));
-            
-            //force this to start as hot-standby
-            String launchParameters = dryRunChild.getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS);
-            if (Strings.isBlank(launchParameters)) launchParameters = "";
-            else launchParameters += " ";
-            launchParameters += "--highAvailability "+HighAvailabilityMode.HOT_STANDBY;
-            ((EntityInternal)dryRunChild).setConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS, launchParameters);
-            
-            Entities.manage(dryRunChild);
-            final String dryRunNodeUid = dryRunChild.getId();
-            ((EntityInternal)dryRunChild).setDisplayName("Dry-Run Upgraded Brooklyn Node ("+dryRunNodeUid+")");
-
-            DynamicTasks.queue(Effectors.invocation(dryRunChild, BrooklynNode.START, ConfigBag.EMPTY));
-            
-            // 2 confirm hot standby status
-            DynamicTasks.queue(newWaitForAttributeTask(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE, 
-                Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.TWO_MINUTES));
-
-            // 3 stop new version
-            // 4 stop old version
-            DynamicTasks.queue(Tasks.builder().name("shutdown original and transient nodes")
-                .add(Effectors.invocation(dryRunChild, BrooklynNode.SHUTDOWN, ConfigBag.EMPTY))
-                .add(Effectors.invocation(entity(), BrooklynNode.SHUTDOWN, ConfigBag.EMPTY))
-                .build());
-            
-            // 5 move old files, and move new files
-            DynamicTasks.queue(Tasks.builder().name("setup new version").body(new Runnable() {
-                @Override
-                public void run() {
-                    String runDir = entity().getAttribute(SoftwareProcess.RUN_DIR);
-                    String bkDir = Urls.mergePaths(runDir, "..", Urls.getBasename(runDir)+"-backups", dryRunNodeUid);
-                    String dryRunDir = Preconditions.checkNotNull(dryRunChild.getAttribute(SoftwareProcess.RUN_DIR));
-                    log.debug(this+" storing backup of previous version in "+bkDir);
-                    DynamicTasks.queue(SshEffectorTasks.ssh(
-                        "cd "+runDir,
-                        "mkdir -p "+bkDir,
-                        "mv * "+bkDir,
-                        "cd "+dryRunDir,
-                        "mv * "+runDir
-                        ).summary("move files"));
-                }
-            }).build());
-
-            entity().getConfigMap().addToLocalBag(parameters.getAllConfig());
-            
-            // 6 start this entity, running the new version
-            DynamicTasks.queue(Effectors.invocation(entity(), BrooklynNode.START, ConfigBag.EMPTY));
-            
-            DynamicTasks.waitForLast();
-            Entities.unmanage(dryRunChild);
-            
-            return null;
-        }
-
-        private boolean isPersistenceModeEnabled(EntityInternal entity) {
-            // TODO when there are PERSIST* options in BrooklynNode, look at them here!
-            // or, better, have a sensor for persistence
-            String params = entity.getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS);
-            if (params==null) return false;
-            if (params.indexOf("persist")==0) return false;
-            return true;
-        }
-
-    }
-
-    private static class WaitForRepeaterCallable implements Callable<Boolean> {
-        protected Repeater repeater;
-        protected boolean requireTrue;
-
-        public WaitForRepeaterCallable(Repeater repeater, boolean requireTrue) {
-            this.repeater = repeater;
-            this.requireTrue = requireTrue;
-        }
-
-        @Override
-        public Boolean call() {
-            ReferenceWithError<Boolean> result = repeater.runKeepingError();
-            if (Boolean.TRUE.equals(result.getWithoutError()))
-                return true;
-            if (result.hasError()) 
-                throw Exceptions.propagate(result.getError());
-            if (requireTrue)
-                throw new IllegalStateException("timeout - "+repeater.getDescription());
-            return false;
-        }
-    }
-
-    private static <T> TaskAdaptable<Boolean> newWaitForAttributeTask(Entity node, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
-        return awaiting( Repeater.create("waiting on "+node+" "+sensor.getName()+" "+condition)
-                    .backoff(Duration.millis(10), 1.5, Duration.millis(200))
-                    .limitTimeTo(timeout==null ? Duration.PRACTICALLY_FOREVER : timeout)
-                    .until(Functionals.callable(Functions.forPredicate(EntityPredicates.attributeSatisfies(sensor, condition)), node)),
-                    true);
-    }
-
-    private static TaskAdaptable<Boolean> awaiting(Repeater repeater, boolean requireTrue) {
-        return Tasks.<Boolean>builder().name(repeater.getDescription()).body(new WaitForRepeaterCallable(repeater, requireTrue)).build();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/RemoteEffectorBuilder.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/RemoteEffectorBuilder.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/RemoteEffectorBuilder.java
index eba3c1a..3b1cdc1 100644
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/RemoteEffectorBuilder.java
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/RemoteEffectorBuilder.java
@@ -37,19 +37,28 @@ public class RemoteEffectorBuilder {
             return input.getContentAsString();
         }
     }
+    
 
     public static Collection<Effector<String>> of(Collection<?> cfgEffectors) {
         Collection<Effector<String>> effectors = new ArrayList<Effector<String>>();
         for (Object objEff : cfgEffectors) {
             Map<?, ?> cfgEff = (Map<?, ?>)objEff;
-//            String returnTypeName = (String)cfgEff.get("returnType");
             String effName = (String)cfgEff.get("name");
             String description = (String)cfgEff.get("description");
 
-//            Class<?> returnType = getType(returnTypeName);
             EffectorBuilder<String> eff = Effectors.effector(String.class, effName);
             Collection<?> params = (Collection<?>)cfgEff.get("parameters");
 
+            /* The *return type* should NOT be included in the signature here.
+             * It might be a type known only at the mirrored brooklyn node
+             * (in which case loading it here would fail); or possibly it could
+             * be a different version of the type here, in which case the signature
+             * would look valid here, but deserializing it would fail.
+             * 
+             * Best to just pass the json representation back to the caller.
+             * (They won't be able to tell the difference between that and deserialize-then-serialize!)
+             */
+            
             if (description != null) {
                 eff.description(description);
             }
@@ -65,21 +74,11 @@ public class RemoteEffectorBuilder {
     }
 
     private static void buildParam(EffectorBuilder<String> eff, Map<?, ?> cfgParam) {
-//        String type = (String)cfgParam.get("type");
         String name = (String)cfgParam.get("name");
         String description = (String)cfgParam.get("description");
         String defaultValue = (String)cfgParam.get("defaultValue");
 
-//        Class<?> paramType = getType(type);
         eff.parameter(Object.class, name, description, defaultValue /*TypeCoercions.coerce(defaultValue, paramType)*/);
     }
 
-//    private static Class<?> getType(String type) {
-//        try {
-//            return Class.forName(type);
-//        } catch (ClassNotFoundException e) {
-//            return Object.class;
-//        }
-//    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
new file mode 100644
index 0000000..8215a0b
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynClusterUpgradeEffectorBody.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Effector;
+import brooklyn.entity.Entity;
+import brooklyn.entity.Group;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.Lifecycle;
+import brooklyn.entity.brooklynnode.BrooklynCluster;
+import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector;
+import brooklyn.entity.brooklynnode.BrooklynCluster.UpgradeClusterEffector;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.group.DynamicCluster;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.net.Urls;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.time.Duration;
+
+import com.google.api.client.util.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterables;
+
+public class BrooklynClusterUpgradeEffectorBody extends EffectorBody<Void> implements UpgradeClusterEffector {
+    public static final Effector<Void> UPGRADE_CLUSTER = Effectors.effector(UpgradeClusterEffector.UPGRADE_CLUSTER).impl(new BrooklynClusterUpgradeEffectorBody()).build();
+
+    private AtomicBoolean upgradeInProgress = new AtomicBoolean();
+
+    @Override
+    public Void call(ConfigBag parameters) {
+        if (!upgradeInProgress.compareAndSet(false, true)) {
+            throw new IllegalStateException("An upgrade is already in progress.");
+        }
+
+        EntitySpec<?> memberSpec = entity().getConfig(BrooklynCluster.MEMBER_SPEC);
+        Preconditions.checkNotNull(memberSpec, BrooklynCluster.MEMBER_SPEC.getName() + " is required for " + UpgradeClusterEffector.class.getName());
+
+        Map<ConfigKey<?>, Object> specCfg = memberSpec.getConfig();
+        String oldDownloadUrl = (String) specCfg.get(BrooklynNode.DOWNLOAD_URL);
+        String oldUploadUrl = (String) specCfg.get(BrooklynNode.DISTRO_UPLOAD_URL);
+        String newDownloadUrl = parameters.get(BrooklynNode.DOWNLOAD_URL.getConfigKey());
+        String newUploadUrl = inferUploadUrl(newDownloadUrl);
+        try {
+            memberSpec.configure(BrooklynNode.DOWNLOAD_URL, newUploadUrl);
+            memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, newUploadUrl);
+            upgrade(parameters);
+        } catch (Exception e) {
+            memberSpec.configure(BrooklynNode.DOWNLOAD_URL, oldDownloadUrl);
+            memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, oldUploadUrl);
+            throw Exceptions.propagate(e);
+        } finally {
+            upgradeInProgress.set(false);
+        }
+        return null;
+    }
+
+    private String inferUploadUrl(String newDownloadUrl) {
+        boolean isLocal = "file".equals(Urls.getProtocol(newDownloadUrl)) || new File(newDownloadUrl).exists();
+        if (isLocal) {
+            return newDownloadUrl;
+        } else {
+            return null;
+        }
+    }
+
+    private void upgrade(ConfigBag parameters) {
+        //TODO might be worth separating each step in a task for better UI
+        //TODO currently this will fight with auto-scaler policies; you should turn them off
+
+        Group cluster = (Group)entity();
+        Collection<Entity> initialMembers = cluster.getMembers();
+        int initialClusterSize = initialMembers.size();
+
+        //1. Initially create a single node to check if it will launch successfully
+        Entity initialNode = Iterables.getOnlyElement(createNodes(1));
+
+        //2. If everything is OK with the first node launch the rest as well
+        Collection<Entity> remainingNodes = createNodes(initialClusterSize - 1);
+
+        //3. Once we have all nodes running without errors switch master
+        DynamicTasks.queue(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, MutableMap.of(SelectMasterEffector.NEW_MASTER_ID, initialNode.getId()))).asTask().getUnchecked();
+
+        //4. Stop the nodes which were running at the start of the upgrade call, but keep them around.
+        //   Should we create a quarantine-like zone for old stopped version?
+        //   For members that were created meanwhile - they will be using the new version already. If the new version
+        //   isn't good then they will fail to start as well, forcing the policies to retry (and succeed once the
+        //   URL is reverted).
+        //TODO can get into problem state if more old nodes are created; better might be to set the
+        //version on this cluster before the above select-master call, and then delete any which are running the old
+        //version (would require tracking the version number at the entity)
+        HashSet<Entity> oldMembers = new HashSet<Entity>(initialMembers);
+        oldMembers.removeAll(remainingNodes);
+        oldMembers.remove(initialNode);
+        DynamicTasks.queue(Effectors.invocation(BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, Collections.emptyMap(), oldMembers)).asTask().getUnchecked();
+    }
+
+    private Collection<Entity> createNodes(int nodeCnt) {
+        DynamicCluster cluster = (DynamicCluster)entity();
+
+        //1. Create the nodes
+        Collection<Entity> newNodes = cluster.resizeByDelta(nodeCnt);
+
+        //2. Wait for them to be RUNNING
+        waitAttributeNotEqualTo(
+                newNodes,
+                BrooklynNode.SERVICE_STATE_ACTUAL, Lifecycle.STARTING);
+
+        //3. Set HOT_STANDBY in case it is not enabled on the command line ...
+        DynamicTasks.queue(Effectors.invocation(
+                BrooklynNode.SET_HA_MODE,
+                MutableMap.of(SetHAModeEffector.MODE, HighAvailabilityMode.HOT_STANDBY), 
+                newNodes)).asTask().getUnchecked();
+
+        //4. ... and wait until all of the nodes change state
+        //TODO if the REST call is blocking this is not needed
+        waitAttributeEqualTo(
+                newNodes,
+                BrooklynNode.MANAGEMENT_NODE_STATE,
+                ManagementNodeState.HOT_STANDBY);
+
+        //5. Just in case check if all of the nodes are SERVICE_UP (which would rule out ON_FIRE as well)
+        Collection<Entity> failedNodes = Collections2.filter(newNodes, EntityPredicates.attributeEqualTo(BrooklynNode.SERVICE_UP, Boolean.FALSE));
+        if (!failedNodes.isEmpty()) {
+            throw new IllegalStateException("Nodes " + failedNodes + " are not " + BrooklynNode.SERVICE_UP + " though successfully in " + ManagementNodeState.HOT_STANDBY);
+        }
+        return newNodes;
+    }
+
+    private <T> void waitAttributeEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
+        waitPredicate(
+                nodes, 
+                EntityPredicates.attributeEqualTo(sensor, value),
+                "Waiting for nodes " + nodes + ", sensor " + sensor + " to be " + value,
+                "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change to " + value);
+    }
+
+    private <T> void waitAttributeNotEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
+        waitPredicate(
+                nodes, 
+                EntityPredicates.attributeNotEqualTo(sensor, value),
+                "Waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value,
+                "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value);
+    }
+
+    private <T extends Entity> void waitPredicate(Collection<T> nodes, Predicate<T> waitPredicate, String blockingMsg, String errorMsg) {
+        Tasks.setBlockingDetails(blockingMsg);
+        boolean pollSuccess = Repeater.create(blockingMsg)
+            .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
+            .limitTimeTo(Duration.ONE_HOUR)
+            .until(nodes, allMatch(waitPredicate))
+            .run();
+        Tasks.resetBlockingDetails();
+
+        if (!pollSuccess) {
+            throw new IllegalStateException(errorMsg);
+        }
+    }
+
+    public static <T> Predicate<Collection<T>> allMatch(final Predicate<T> predicate) {
+        return new Predicate<Collection<T>>() {
+            @Override
+            public boolean apply(Collection<T> input) {
+                return Iterables.all(input, predicate);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
new file mode 100644
index 0000000..a6433aa
--- /dev/null
+++ b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/BrooklynNodeUpgradeEffectorBody.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.brooklynnode.effector;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Effector;
+import brooklyn.entity.Entity;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.basic.EntityInternal;
+import brooklyn.entity.basic.EntityPredicates;
+import brooklyn.entity.basic.SoftwareProcess;
+import brooklyn.entity.brooklynnode.BrooklynNode;
+import brooklyn.entity.effector.EffectorBody;
+import brooklyn.entity.effector.Effectors;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.software.SshEffectorTasks;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.MapConfigKey;
+import brooklyn.management.TaskAdaptable;
+import brooklyn.management.ha.HighAvailabilityMode;
+import brooklyn.management.ha.ManagementNodeState;
+import brooklyn.util.config.ConfigBag;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.ReferenceWithError;
+import brooklyn.util.guava.Functionals;
+import brooklyn.util.net.Urls;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.task.DynamicTasks;
+import brooklyn.util.task.Tasks;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Functions;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.reflect.TypeToken;
+
+@SuppressWarnings("serial")
+/** Upgrades a brooklyn node in-place on the box, 
+ * by creating a child brooklyn node and ensuring it can rebind in HOT_STANDBY
+ * <p>
+ * Requires the target node to have persistence enabled. 
+ */
+public class BrooklynNodeUpgradeEffectorBody extends EffectorBody<Void> {
+
+    private static final Logger log = LoggerFactory.getLogger(BrooklynNodeUpgradeEffectorBody.class);
+    
+    public static final ConfigKey<String> DOWNLOAD_URL = BrooklynNode.DOWNLOAD_URL.getConfigKey();
+    public static final ConfigKey<Map<String,Object>> EXTRA_CONFIG = MapConfigKey.builder(new TypeToken<Map<String,Object>>() {}).name("extraConfig").description("Additional new config to set on this entity as part of upgrading").build();
+
+    public static final Effector<Void> UPGRADE = Effectors.effector(Void.class, "upgrade")
+        .description("Changes the Brooklyn build used to run this node, by spawning a dry-run node then copying the installed files across. "
+            + "This node must be running for persistence for in-place upgrading to work.")
+        .parameter(BrooklynNode.SUGGESTED_VERSION).parameter(DOWNLOAD_URL).parameter(EXTRA_CONFIG)
+        .impl(new BrooklynNodeUpgradeEffectorBody()).build();
+    
+    @Override
+    public Void call(ConfigBag parametersO) {
+        if (!isPersistenceModeEnabled(entity())) {
+            // would could try a `forcePersistNow`, but that's sloppy; 
+            // for now, require HA/persistence for upgrading 
+            DynamicTasks.queue( Tasks.warning("Persistence does not appear to be enabled at this node. "
+                + "In-place upgrade is unlikely to succeed.", null) );
+        }
+
+        ConfigBag parameters = ConfigBag.newInstanceCopying(parametersO);
+
+        /*
+         * all parameters are passed to children, apart from EXTRA_CONFIG
+         * whose value (as a map) is so passed; it provides an easy way to set extra config in the gui.
+         * (IOW a key-value mapping can be passed either inside EXTRA_CONFIG or as a sibling to EXTRA_CONFIG)  
+         */
+        if (parameters.containsKey(EXTRA_CONFIG)) {
+            Map<String, Object> extra = parameters.get(EXTRA_CONFIG);
+            parameters.remove(EXTRA_CONFIG);
+            parameters.putAll(extra);
+        }
+        log.debug(this+" upgrading, using "+parameters);
+
+        // TODO require entity() node state master or hot standby AND require persistence enabled, or a new 'force_attempt_upgrade' parameter to be applied
+        // TODO could have a 'skip_dry_run_upgrade' parameter
+        // TODO could support 'dry_run_only' parameter, with optional resumption tasks (eg new dynamic effector)
+
+        // 1 add new brooklyn version entity as child (so uses same machine), with same config apart from things in parameters
+        final BrooklynNode dryRunChild = entity().addChild(EntitySpec.create(BrooklynNode.class).configure(parameters.getAllConfig())
+            .displayName("Upgraded Version Dry-Run Node")
+            // force dir and label back to their defaults (do not piggy back on what may have already been installed)
+            .configure(BrooklynNode.INSTALL_DIR, BrooklynNode.INSTALL_DIR.getConfigKey().getDefaultValue())
+            .configure(BrooklynNode.INSTALL_UNIQUE_LABEL, "upgrade-tmp-"+Identifiers.makeRandomId(8))
+            .configure(parameters.getAllConfig()));
+
+        //force this to start as hot-standby
+        String launchParameters = dryRunChild.getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS);
+        if (Strings.isBlank(launchParameters)) launchParameters = "";
+        else launchParameters += " ";
+        launchParameters += "--highAvailability "+HighAvailabilityMode.HOT_STANDBY;
+        ((EntityInternal)dryRunChild).setConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS, launchParameters);
+
+        Entities.manage(dryRunChild);
+        final String dryRunNodeUid = dryRunChild.getId();
+        ((EntityInternal)dryRunChild).setDisplayName("Dry-Run Upgraded Brooklyn Node ("+dryRunNodeUid+")");
+
+        DynamicTasks.queue(Effectors.invocation(dryRunChild, BrooklynNode.START, ConfigBag.EMPTY));
+
+        // 2 confirm hot standby status
+        DynamicTasks.queue(newWaitForAttributeTask(dryRunChild, BrooklynNode.MANAGEMENT_NODE_STATE, 
+            Predicates.equalTo(ManagementNodeState.HOT_STANDBY), Duration.TWO_MINUTES));
+
+        // 3 stop new version
+        // 4 stop old version
+        DynamicTasks.queue(Tasks.builder().name("shutdown original and transient nodes")
+            .add(Effectors.invocation(dryRunChild, BrooklynNode.SHUTDOWN, ConfigBag.EMPTY))
+            .add(Effectors.invocation(entity(), BrooklynNode.SHUTDOWN, ConfigBag.EMPTY))
+            .build());
+
+        // 5 move old files, and move new files
+        DynamicTasks.queue(Tasks.builder().name("setup new version").body(new Runnable() {
+            @Override
+            public void run() {
+                String runDir = entity().getAttribute(SoftwareProcess.RUN_DIR);
+                String bkDir = Urls.mergePaths(runDir, "..", Urls.getBasename(runDir)+"-backups", dryRunNodeUid);
+                String dryRunDir = Preconditions.checkNotNull(dryRunChild.getAttribute(SoftwareProcess.RUN_DIR));
+                log.debug(this+" storing backup of previous version in "+bkDir);
+                DynamicTasks.queue(SshEffectorTasks.ssh(
+                    "cd "+runDir,
+                    "mkdir -p "+bkDir,
+                    "mv * "+bkDir,
+                    "cd "+dryRunDir,
+                    "mv * "+runDir
+                    ).summary("move files"));
+            }
+        }).build());
+
+        entity().getConfigMap().addToLocalBag(parameters.getAllConfig());
+
+        // 6 start this entity, running the new version
+        DynamicTasks.queue(Effectors.invocation(entity(), BrooklynNode.START, ConfigBag.EMPTY));
+
+        DynamicTasks.waitForLast();
+        Entities.unmanage(dryRunChild);
+
+        return null;
+    }
+
+    private boolean isPersistenceModeEnabled(EntityInternal entity) {
+        // TODO when there are PERSIST* options in BrooklynNode, look at them here!
+        // or, better, have a sensor for persistence
+        String params = entity.getConfig(BrooklynNode.EXTRA_LAUNCH_PARAMETERS);
+        if (params==null) return false;
+        if (params.indexOf("persist")==0) return false;
+        return true;
+    }
+
+    private static class WaitForRepeaterCallable implements Callable<Boolean> {
+        protected Repeater repeater;
+        protected boolean requireTrue;
+
+        public WaitForRepeaterCallable(Repeater repeater, boolean requireTrue) {
+            this.repeater = repeater;
+            this.requireTrue = requireTrue;
+        }
+
+        @Override
+        public Boolean call() {
+            ReferenceWithError<Boolean> result = repeater.runKeepingError();
+            if (Boolean.TRUE.equals(result.getWithoutError()))
+                return true;
+            if (result.hasError()) 
+                throw Exceptions.propagate(result.getError());
+            if (requireTrue)
+                throw new IllegalStateException("timeout - "+repeater.getDescription());
+            return false;
+        }
+    }
+
+    private static <T> TaskAdaptable<Boolean> newWaitForAttributeTask(Entity node, AttributeSensor<T> sensor, Predicate<T> condition, Duration timeout) {
+        return awaiting( Repeater.create("waiting on "+node+" "+sensor.getName()+" "+condition)
+                    .backoff(Duration.millis(10), 1.5, Duration.millis(200))
+                    .limitTimeTo(timeout==null ? Duration.PRACTICALLY_FOREVER : timeout)
+                    .until(Functionals.callable(Functions.forPredicate(EntityPredicates.attributeSatisfies(sensor, condition)), node)),
+                    true);
+    }
+
+    private static TaskAdaptable<Boolean> awaiting(Repeater repeater, boolean requireTrue) {
+        return Tasks.<Boolean>builder().name(repeater.getDescription()).body(new WaitForRepeaterCallable(repeater, requireTrue)).build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7bae4e66/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java b/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
deleted file mode 100644
index 70b184a..0000000
--- a/software/base/src/main/java/brooklyn/entity/brooklynnode/effector/UpgradeClusterEffectorBody.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.brooklynnode.effector;
-
-import java.io.File;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.Effector;
-import brooklyn.entity.Entity;
-import brooklyn.entity.Group;
-import brooklyn.entity.basic.EntityPredicates;
-import brooklyn.entity.basic.Lifecycle;
-import brooklyn.entity.brooklynnode.BrooklynCluster;
-import brooklyn.entity.brooklynnode.BrooklynCluster.SelectMasterEffector;
-import brooklyn.entity.brooklynnode.BrooklynCluster.UpgradeClusterEffector;
-import brooklyn.entity.brooklynnode.BrooklynNode;
-import brooklyn.entity.brooklynnode.BrooklynNode.SetHAModeEffector;
-import brooklyn.entity.effector.EffectorBody;
-import brooklyn.entity.effector.Effectors;
-import brooklyn.entity.group.DynamicCluster;
-import brooklyn.entity.proxying.EntitySpec;
-import brooklyn.event.AttributeSensor;
-import brooklyn.management.ha.HighAvailabilityMode;
-import brooklyn.management.ha.ManagementNodeState;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.config.ConfigBag;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.net.Urls;
-import brooklyn.util.repeat.Repeater;
-import brooklyn.util.task.DynamicTasks;
-import brooklyn.util.task.Tasks;
-import brooklyn.util.time.Duration;
-
-import com.google.api.client.util.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Iterables;
-
-public class UpgradeClusterEffectorBody extends EffectorBody<Void> implements UpgradeClusterEffector {
-    public static final Effector<Void> UPGRADE_CLUSTER = Effectors.effector(UpgradeClusterEffector.UPGRADE_CLUSTER).impl(new UpgradeClusterEffectorBody()).build();
-
-    private AtomicBoolean upgradeInProgress = new AtomicBoolean();
-
-    @Override
-    public Void call(ConfigBag parameters) {
-        if (!upgradeInProgress.compareAndSet(false, true)) {
-            throw new IllegalStateException("An upgrade is already in progress.");
-        }
-
-        EntitySpec<?> memberSpec = entity().getConfig(BrooklynCluster.MEMBER_SPEC);
-        Preconditions.checkNotNull(memberSpec, BrooklynCluster.MEMBER_SPEC.getName() + " is required for " + UpgradeClusterEffector.class.getName());
-
-        Map<ConfigKey<?>, Object> specCfg = memberSpec.getConfig();
-        String oldDownloadUrl = (String) specCfg.get(BrooklynNode.DOWNLOAD_URL);
-        String oldUploadUrl = (String) specCfg.get(BrooklynNode.DISTRO_UPLOAD_URL);
-        String newDownloadUrl = parameters.get(BrooklynNode.DOWNLOAD_URL.getConfigKey());
-        String newUploadUrl = inferUploadUrl(newDownloadUrl);
-        try {
-            memberSpec.configure(BrooklynNode.DOWNLOAD_URL, newUploadUrl);
-            memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, newUploadUrl);
-            upgrade(parameters);
-        } catch (Exception e) {
-            memberSpec.configure(BrooklynNode.DOWNLOAD_URL, oldDownloadUrl);
-            memberSpec.configure(BrooklynNode.DISTRO_UPLOAD_URL, oldUploadUrl);
-            throw Exceptions.propagate(e);
-        } finally {
-            upgradeInProgress.set(false);
-        }
-        return null;
-    }
-
-    private String inferUploadUrl(String newDownloadUrl) {
-        boolean isLocal = "file".equals(Urls.getProtocol(newDownloadUrl)) || new File(newDownloadUrl).exists();
-        if (isLocal) {
-            return newDownloadUrl;
-        } else {
-            return null;
-        }
-    }
-
-    private void upgrade(ConfigBag parameters) {
-        //TODO might be worth separating each step in a task for better UI
-
-        Group cluster = (Group)entity();
-        Collection<Entity> initialMembers = cluster.getMembers();
-        int initialClusterSize = initialMembers.size();
-
-        //1. Initially create a single node to check if it will launch successfully
-        Entity initialNode = Iterables.getOnlyElement(createNodes(1));
-
-        //2. If everything is OK with the first node launch the rest as well
-        Collection<Entity> remainingNodes = createNodes(initialClusterSize - 1);
-
-        //3. Once we have all nodes running without errors switch master
-        DynamicTasks.queue(Effectors.invocation(cluster, BrooklynCluster.SELECT_MASTER, MutableMap.of(SelectMasterEffector.NEW_MASTER_ID, initialNode.getId()))).asTask().getUnchecked();
-
-        //4. Stop the nodes which were running at the start of the upgrade call, but keep them around.
-        //   Should we create a quarantine-like zone for old stopped version?
-        //   For members that were created meanwhile - they will be using the new version already. If the new version
-        //   isn't good then they will fail to start as well, forcing the policies to retry (and succeed once the
-        //   URL is reverted).
-        HashSet<Entity> oldMembers = new HashSet<Entity>(initialMembers);
-        oldMembers.removeAll(remainingNodes);
-        oldMembers.remove(initialNode);
-        DynamicTasks.queue(Effectors.invocation(BrooklynNode.STOP_NODE_BUT_LEAVE_APPS, Collections.emptyMap(), oldMembers)).asTask().getUnchecked();
-    }
-
-    private Collection<Entity> createNodes(int nodeCnt) {
-        DynamicCluster cluster = (DynamicCluster)entity();
-
-        //1. Create the nodes
-        Collection<Entity> newNodes = cluster.resizeByDelta(nodeCnt);
-
-        //2. Wait for them to be RUNNING
-        waitAttributeNotEqualTo(
-                newNodes,
-                BrooklynNode.SERVICE_STATE_ACTUAL, Lifecycle.STARTING);
-
-        //3. Set HOT_STANDBY in case it is not enabled on the command line ...
-        DynamicTasks.queue(Effectors.invocation(
-                BrooklynNode.SET_HA_MODE,
-                MutableMap.of(SetHAModeEffector.MODE, HighAvailabilityMode.HOT_STANDBY), 
-                newNodes)).asTask().getUnchecked();
-
-        //4. ... and wait until all of the nodes change state
-        //TODO if the REST call is blocking this is not needed
-        waitAttributeEqualTo(
-                newNodes,
-                BrooklynNode.MANAGEMENT_NODE_STATE,
-                ManagementNodeState.HOT_STANDBY);
-
-        //5. Just in case check if all of the nodes are SERVICE_UP (which would rule out ON_FIRE as well)
-        Collection<Entity> failedNodes = Collections2.filter(newNodes, EntityPredicates.attributeEqualTo(BrooklynNode.SERVICE_UP, Boolean.FALSE));
-        if (!failedNodes.isEmpty()) {
-            throw new IllegalStateException("Nodes " + failedNodes + " are not " + BrooklynNode.SERVICE_UP + " though successfully in " + ManagementNodeState.HOT_STANDBY);
-        }
-        return newNodes;
-    }
-
-    private <T> void waitAttributeEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
-        waitPredicate(
-                nodes, 
-                EntityPredicates.attributeEqualTo(sensor, value),
-                "Waiting for nodes " + nodes + ", sensor " + sensor + " to be " + value,
-                "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change to " + value);
-    }
-
-    private <T> void waitAttributeNotEqualTo(Collection<Entity> nodes, AttributeSensor<T> sensor, T value) {
-        waitPredicate(
-                nodes, 
-                EntityPredicates.attributeNotEqualTo(sensor, value),
-                "Waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value,
-                "Timeout while waiting for nodes " + nodes + ", sensor " + sensor + " to change from " + value);
-    }
-
-    private <T extends Entity> void waitPredicate(Collection<T> nodes, Predicate<T> waitPredicate, String blockingMsg, String errorMsg) {
-        Tasks.setBlockingDetails(blockingMsg);
-        boolean pollSuccess = Repeater.create(blockingMsg)
-            .backoff(Duration.ONE_SECOND, 1.2, Duration.TEN_SECONDS)
-            .limitTimeTo(Duration.ONE_HOUR)
-            .until(nodes, allMatch(waitPredicate))
-            .run();
-        Tasks.resetBlockingDetails();
-
-        if (!pollSuccess) {
-            throw new IllegalStateException(errorMsg);
-        }
-    }
-
-    public static <T> Predicate<Collection<T>> allMatch(final Predicate<T> predicate) {
-        return new Predicate<Collection<T>>() {
-            @Override
-            public boolean apply(Collection<T> input) {
-                return Iterables.all(input, predicate);
-            }
-        };
-    }
-}