You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2014/11/26 02:07:31 UTC

[4/6] incubator-brooklyn git commit: de-dupes mark two - persist uniqueTag

de-dupes mark two - persist uniqueTag

previously uniqueTag was not being persisted so we were getting duplication in some situations,
or were relying on the equality check; now we persist this, and we say something is apparently
equal even if it doesn't have the unique tag (and we transfer the tag across).

note that rebinded adjuncts are added *after* those done in the entity itself (via the rebind() call there);
this seems surprising, but should be okay as ideally we shouldn't be adding new adjuncts at all.
(but we might need some special logic in order to put in place "change my adjuncts on rebind" behaviour at an entity.)


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/ff0abc34
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/ff0abc34
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/ff0abc34

Branch: refs/heads/master
Commit: ff0abc34b317907f79cf26bb4664da6324bb90ee
Parents: 2af3422
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Fri Nov 21 14:43:24 2014 +0000
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Fri Nov 21 14:56:15 2014 +0000

----------------------------------------------------------------------
 api/src/main/java/brooklyn/entity/Feed.java     |   5 +-
 .../main/java/brooklyn/mementos/Memento.java    |   6 +
 .../java/brooklyn/policy/EntityAdjunct.java     |   2 +-
 .../brooklyn/basic/AbstractBrooklynObject.java  |  15 +-
 .../brooklyn/entity/basic/AbstractEntity.java   |  32 +++-
 .../java/brooklyn/entity/basic/Entities.java    |   3 +-
 .../AbstractBrooklynObjectRebindSupport.java    |   6 +
 .../entity/rebind/RebindManagerImpl.java        |  11 +-
 .../entity/rebind/dto/AbstractMemento.java      |  61 ++++---
 .../entity/rebind/dto/BasicLocationMemento.java |   1 -
 .../entity/rebind/dto/MementosGenerators.java   |   8 +-
 .../java/brooklyn/event/feed/AbstractFeed.java  |  56 +++---
 .../java/brooklyn/event/feed/FeedConfig.java    |   6 +
 .../main/java/brooklyn/event/feed/Poller.java   |  26 ++-
 .../event/feed/function/FunctionPollConfig.java |   8 +
 .../internal/BrooklynGarbageCollector.java      |  13 +-
 .../policy/basic/AbstractEntityAdjunct.java     |   8 +-
 .../brooklyn/entity/rebind/RebindFeedTest.java  | 178 +++++++++++++++++--
 .../entity/rebind/RebindTestFixture.java        |  40 ++++-
 .../event/feed/function/FunctionFeedTest.java   |  38 ++--
 .../brooklyn/management/ha/HotStandbyTest.java  |  48 ++++-
 .../java/brooklyn/util/guava/Functionals.java   |  21 +++
 22 files changed, 470 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/api/src/main/java/brooklyn/entity/Feed.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/entity/Feed.java b/api/src/main/java/brooklyn/entity/Feed.java
index 10d1fbd..029ed78 100644
--- a/api/src/main/java/brooklyn/entity/Feed.java
+++ b/api/src/main/java/brooklyn/entity/Feed.java
@@ -48,8 +48,9 @@ public interface Feed extends EntityAdjunct, Rebindable {
     boolean isActivated();
     
     /** 
-     * @eturn true iff the feed is running
-     */
+     * @return true iff the feed is actually running (like {@link #isActivated()}, but including other items like poll jobs not cancelled)
+     * @deprecated since 0.7.0 use {@link #isRunning()}
+     */ @Deprecated
     boolean isActive();
     
     void start();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/api/src/main/java/brooklyn/mementos/Memento.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/mementos/Memento.java b/api/src/main/java/brooklyn/mementos/Memento.java
index bb6f8b1..dfe6d4a 100644
--- a/api/src/main/java/brooklyn/mementos/Memento.java
+++ b/api/src/main/java/brooklyn/mementos/Memento.java
@@ -22,7 +22,9 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
 
+import brooklyn.entity.Entity;
 import brooklyn.entity.rebind.RebindSupport;
+import brooklyn.policy.EntityAdjunct;
 
 /**
  * Represents the internal state of something in brooklyn, so that it can be reconstructed (e.g. after restarting brooklyn).
@@ -74,4 +76,8 @@ public interface Memento extends Serializable {
     public Class<?> getTypeClass();
 
     public Collection<Object> getTags();
+    
+    /** Null for {@link Entity}, but important for adjuncts; see {@link EntityAdjunct#getUniqueTag()} */
+    public String getUniqueTag();
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/api/src/main/java/brooklyn/policy/EntityAdjunct.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/policy/EntityAdjunct.java b/api/src/main/java/brooklyn/policy/EntityAdjunct.java
index ed42605..4405ea2 100644
--- a/api/src/main/java/brooklyn/policy/EntityAdjunct.java
+++ b/api/src/main/java/brooklyn/policy/EntityAdjunct.java
@@ -45,7 +45,7 @@ public interface EntityAdjunct extends BrooklynObject {
     boolean isDestroyed();
     
     /**
-     * Whether the adjunct is available/active
+     * Whether the adjunct is available/active, ie started and not stopped or interrupted
      */
     boolean isRunning();
     

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/basic/AbstractBrooklynObject.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/basic/AbstractBrooklynObject.java b/core/src/main/java/brooklyn/basic/AbstractBrooklynObject.java
index ff1bbd8..9195b35 100644
--- a/core/src/main/java/brooklyn/basic/AbstractBrooklynObject.java
+++ b/core/src/main/java/brooklyn/basic/AbstractBrooklynObject.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import brooklyn.basic.internal.ApiObjectsFactory;
+import brooklyn.entity.basic.AbstractEntity;
 import brooklyn.entity.proxying.InternalFactory;
 import brooklyn.entity.rebind.RebindManagerImpl;
 import brooklyn.management.ManagementContext;
@@ -131,10 +132,18 @@ public abstract class AbstractBrooklynObject implements BrooklynObjectInternal {
     }
 
     /**
-     * Called by framework on rebind (in new-style instances),
-     * after configuring but before the instance is managed (or is attached to an entity, depending on its type),
-     * and before a reference to this policy is shared.
+     * Called by framework on rebind (in new-style instances):
+     * <ul>
+     * <li> after configuring, but
+     * <li> before the instance is managed, and
+     * <li> before adjuncts are attached to entities, and
+     * <li> before a reference to an object is shared.
+     * </ul>
      * Note that {@link #init()} will not be called on rebind.
+     * <p>
+     * If you need to intercept behaviour <i>after</i> adjuncts are attached,
+     * consider {@link AbstractEntity#onManagementStarting()} 
+     * (but probably worth raising a discussion on the mailing list!)
      */
     public void rebind() {
         // no-op

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java b/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
index 5dfec58..de28236 100644
--- a/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
+++ b/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
@@ -81,6 +81,7 @@ import brooklyn.policy.EntityAdjunct;
 import brooklyn.policy.Policy;
 import brooklyn.policy.PolicySpec;
 import brooklyn.policy.basic.AbstractEntityAdjunct;
+import brooklyn.policy.basic.AbstractEntityAdjunct.AdjunctTagSupport;
 import brooklyn.policy.basic.AbstractPolicy;
 import brooklyn.util.BrooklynLanguageExtensions;
 import brooklyn.util.collections.MutableList;
@@ -1226,13 +1227,24 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
     }
     
     private <T extends EntityAdjunct> T findApparentlyEqualAndWarnIfNotSameUniqueTag(Collection<? extends T> items, T newItem) {
-        T oldItem = findApparentlyEqual(items, newItem);
+        T oldItem = findApparentlyEqual(items, newItem, true);
         
         if (oldItem!=null) {
+            String oldItemTag = oldItem.getUniqueTag();
             String newItemTag = newItem.getUniqueTag();
-            if (newItemTag!=null) {
-                return oldItem;
+            if (oldItemTag!=null || newItemTag!=null) {
+                if (Objects.equal(oldItemTag, newItemTag)) {
+                    // if same tag, return old item for replacing without comment
+                    return oldItem;
+                }
+                // if one has a tag bug not the other, and they are apparently equal,
+                // transfer the tag across
+                T tagged = oldItemTag!=null ? oldItem : newItem;
+                T tagless = oldItemTag!=null ? newItem : oldItem;
+                LOG.warn("Apparently equal items "+oldItem+" and "+newItem+"; but one has a unique tag "+tagged.getUniqueTag()+"; applying to the other");
+                ((AdjunctTagSupport)tagless.tags()).setUniqueTag(tagged.getUniqueTag());
             }
+            
             if (isRebinding()) {
                 LOG.warn("Adding to "+this+", "+newItem+" appears identical to existing "+oldItem+"; will replace. "
                     + "Underlying addition should be modified so it is not added twice during rebind or unique tag should be used to indicate it is identical.");
@@ -1246,7 +1258,7 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
             return null;
         }
     }
-    private <T extends EntityAdjunct> T findApparentlyEqual(Collection<? extends T> itemsCopy, T newItem) {
+    private <T extends EntityAdjunct> T findApparentlyEqual(Collection<? extends T> itemsCopy, T newItem, boolean transferUniqueTag) {
         // TODO workaround for issue where enrichers/feeds/policies can get added multiple times on rebind,
         // if it's added in onBecomingManager or connectSensors; 
         // the right fix will be more disciplined about how/where these are added;
@@ -1261,10 +1273,16 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E
         
         String newItemTag = newItem.getUniqueTag();
         for (T oldItem: itemsCopy) {
-            if (oldItem.getUniqueTag()!=null) {
-                if ((oldItem.getUniqueTag().equals(newItemTag)))
+            String oldItemTag = oldItem.getUniqueTag();
+            if (oldItemTag!=null && newItemTag!=null) { 
+                if (oldItemTag.equals(newItemTag)) {
                     return oldItem;
-            } else if (newItemTag==null && oldItem.getClass().equals(newItem.getClass())) {
+                } else {
+                    continue;
+                }
+            }
+            // either does not have a unique tag, do deep equality
+            if (oldItem.getClass().equals(newItem.getClass())) {
                 if (EqualsBuilder.reflectionEquals(oldItem, newItem, false,
                         // internal admin in 'beforeEntityAdjunct' should be ignored
                         beforeEntityAdjunct,

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/entity/basic/Entities.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/Entities.java b/core/src/main/java/brooklyn/entity/basic/Entities.java
index 51f86f6..7736d50 100644
--- a/core/src/main/java/brooklyn/entity/basic/Entities.java
+++ b/core/src/main/java/brooklyn/entity/basic/Entities.java
@@ -742,8 +742,9 @@ public class Entities {
         for (Location loc : mgmt.getLocationManager().getLocations()) {
             destroyCatching(loc);
         }
-        if (mgmt instanceof ManagementContextInternal)
+        if (mgmt instanceof ManagementContextInternal) {
             ((ManagementContextInternal)mgmt).terminate();
+        }
         if (error!=null) throw Exceptions.propagate(error);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/entity/rebind/AbstractBrooklynObjectRebindSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/AbstractBrooklynObjectRebindSupport.java b/core/src/main/java/brooklyn/entity/rebind/AbstractBrooklynObjectRebindSupport.java
index edd70c3..0d6e663 100644
--- a/core/src/main/java/brooklyn/entity/rebind/AbstractBrooklynObjectRebindSupport.java
+++ b/core/src/main/java/brooklyn/entity/rebind/AbstractBrooklynObjectRebindSupport.java
@@ -24,6 +24,9 @@ import org.slf4j.LoggerFactory;
 import brooklyn.basic.AbstractBrooklynObject;
 import brooklyn.entity.rebind.dto.MementosGenerators;
 import brooklyn.mementos.Memento;
+import brooklyn.policy.EntityAdjunct;
+import brooklyn.policy.basic.AbstractEntityAdjunct.AdjunctTagSupport;
+import brooklyn.util.text.Strings;
 
 public abstract class AbstractBrooklynObjectRebindSupport<T extends Memento> implements RebindSupport<T> {
 
@@ -63,6 +66,9 @@ public abstract class AbstractBrooklynObjectRebindSupport<T extends Memento> imp
     protected abstract void addCustoms(RebindContext rebindContext, T memento);
     
     protected void addTags(RebindContext rebindContext, T memento) {
+        if (instance instanceof EntityAdjunct && Strings.isNonBlank(memento.getUniqueTag())) {
+            ((AdjunctTagSupport)(instance.tags())).setUniqueTag(memento.getUniqueTag());
+        }
         for (Object tag : memento.getTags()) {
             instance.tags().addTag(tag);
         }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index 2a282b0..f53f93a 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@ -58,8 +58,8 @@ import brooklyn.entity.proxying.InternalLocationFactory;
 import brooklyn.entity.proxying.InternalPolicyFactory;
 import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
 import brooklyn.entity.rebind.persister.BrooklynPersistenceUtils;
-import brooklyn.entity.rebind.persister.PersistenceActivityMetrics;
 import brooklyn.entity.rebind.persister.BrooklynPersistenceUtils.CreateBackupMode;
+import brooklyn.entity.rebind.persister.PersistenceActivityMetrics;
 import brooklyn.event.feed.AbstractFeed;
 import brooklyn.internal.BrooklynFeatureEnablement;
 import brooklyn.location.Location;
@@ -565,7 +565,7 @@ public class RebindManagerImpl implements RebindManager {
             //  3. instantiate entities+locations so that inter-entity references can subsequently be set during deserialize (and entity config/state is set).
             //  4. deserialize the memento
             //  5. instantiate policies+enricherss+feeds (could perhaps merge this with (3), depending how they are implemented)
-            //  6. reconstruct the entities etc (i.e. calling init on the already-instantiated instances).
+            //  6. reconstruct the entities etc (i.e. calling rebind() on the already-instantiated instances)
             //  7. add policies+enrichers+feeds to all the entities.
             //  8. manage the entities
             
@@ -1473,6 +1473,10 @@ public class RebindManagerImpl implements RebindManager {
         return (readOnlyRebindCount < 5) || (readOnlyRebindCount%1000==0);
     }
 
+    public int getReadOnlyRebindCount() {
+        return readOnlyRebindCount;
+    }
+    
     @Override
     public Map<String, Object> getMetrics() {
         Map<String,Object> result = MutableMap.of();
@@ -1480,6 +1484,9 @@ public class RebindManagerImpl implements RebindManager {
         result.put("rebind", rebindMetrics.asMap());
         result.put("persist", persistMetrics.asMap());
         
+        if (readOnlyRebindCount>=0)
+            result.put("rebindReadOnlyCount", readOnlyRebindCount);
+        
         // include first rebind counts, so we know whether we rebinded or not
         result.put("firstRebindCounts", MutableMap.of(
             "applications", firstRebindAppCount,

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/entity/rebind/dto/AbstractMemento.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/dto/AbstractMemento.java b/core/src/main/java/brooklyn/entity/rebind/dto/AbstractMemento.java
index c7ec1d9..5603e65 100644
--- a/core/src/main/java/brooklyn/entity/rebind/dto/AbstractMemento.java
+++ b/core/src/main/java/brooklyn/entity/rebind/dto/AbstractMemento.java
@@ -44,13 +44,17 @@ public abstract class AbstractMemento implements Memento, Serializable {
         protected Class<?> typeClass;
         protected String displayName;
         protected String catalogItemId;
-        protected Map<String, Object> fields = Maps.newLinkedHashMap();
+        protected Map<String, Object> customFields = Maps.newLinkedHashMap();
         protected List<Object> tags = Lists.newArrayList();
+        
+        // only supported for EntityAdjuncts
+        protected String uniqueTag;
 
         @SuppressWarnings("unchecked")
         protected B self() {
             return (B) this;
         }
+        @SuppressWarnings("deprecation")
         public B from(Memento other) {
             brooklynVersion = other.getBrooklynVersion();
             id = other.getId();
@@ -58,34 +62,38 @@ public abstract class AbstractMemento implements Memento, Serializable {
             typeClass = other.getTypeClass();
             displayName = other.getDisplayName();
             catalogItemId = other.getCatalogItemId();
-            fields.putAll(other.getCustomFields());
+            customFields.putAll(other.getCustomFields());
             tags.addAll(other.getTags());
+            uniqueTag = other.getUniqueTag();
             return self();
         }
-        public B brooklynVersion(String val) {
-            brooklynVersion = val; return self();
-        }
-        public B id(String val) {
-            id = val; return self();
-        }
-        public B type(String val) {
-            type = val; return self();
-        }
-        public B typeClass(Class<?> val) {
-            typeClass = val; return self();
-        }
-        public B displayName(String val) {
-            displayName = val; return self();
-        }
-        public B catalogItemId(String val) {
-            catalogItemId = val; return self();
-        }
+        // this method set is incomplete; and they are not used, as the protected fields are set directly
+        // kept in case we want to expose this elsewhere, but we should complete the list
+//        public B brooklynVersion(String val) {
+//            brooklynVersion = val; return self();
+//        }
+//        public B id(String val) {
+//            id = val; return self();
+//        }
+//        public B type(String val) {
+//            type = val; return self();
+//        }
+//        public B typeClass(Class<?> val) {
+//            typeClass = val; return self();
+//        }
+//        public B displayName(String val) {
+//            displayName = val; return self();
+//        }
+//        public B catalogItemId(String val) {
+//            catalogItemId = val; return self();
+//        }
+        
         /**
          * @deprecated since 0.7.0; use config/attributes so generic persistence will work, rather than requiring "custom fields"
          */
         @Deprecated
         public B customFields(Map<String,?> vals) {
-            fields.putAll(vals); return self();
+            customFields.putAll(vals); return self();
         }
     }
     
@@ -95,6 +103,9 @@ public abstract class AbstractMemento implements Memento, Serializable {
     private String displayName;
     private String catalogItemId;
     private List<Object> tags;
+    
+    // for EntityAdjuncts; not used for entity
+    private String uniqueTag;
 
     private transient Class<?> typeClass;
 
@@ -110,8 +121,9 @@ public abstract class AbstractMemento implements Memento, Serializable {
         typeClass = builder.typeClass;
         displayName = builder.displayName;
         catalogItemId = builder.catalogItemId;
-        setCustomFields(builder.fields);
+        setCustomFields(builder.customFields);
         tags = toPersistedList(builder.tags);
+        uniqueTag = builder.uniqueTag;
     }
 
     // "fields" is not included as a field here, so that it is serialized after selected subclass fields
@@ -157,6 +169,11 @@ public abstract class AbstractMemento implements Memento, Serializable {
     public List<Object> getTags() {
         return fromPersistedList(tags);
     }
+
+    @Override
+    public String getUniqueTag() {
+        return uniqueTag;
+    }
     
     @Deprecated
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/entity/rebind/dto/BasicLocationMemento.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/dto/BasicLocationMemento.java b/core/src/main/java/brooklyn/entity/rebind/dto/BasicLocationMemento.java
index 7a0b0bd..12c958c 100644
--- a/core/src/main/java/brooklyn/entity/rebind/dto/BasicLocationMemento.java
+++ b/core/src/main/java/brooklyn/entity/rebind/dto/BasicLocationMemento.java
@@ -55,7 +55,6 @@ public class BasicLocationMemento extends AbstractTreeNodeMemento implements Loc
             locationConfig.putAll(other.getLocationConfig());
             locationConfigUnused.addAll(other.getLocationConfigUnused());
             locationConfigDescription = other.getLocationConfigDescription();
-            fields.putAll(other.getCustomFields());
             return self();
         }
         public LocationMemento build() {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java b/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java
index 7e8d598..8ec638a 100644
--- a/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java
+++ b/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java
@@ -44,6 +44,7 @@ import brooklyn.location.basic.AbstractLocation;
 import brooklyn.location.basic.LocationInternal;
 import brooklyn.management.ManagementContext;
 import brooklyn.management.Task;
+import brooklyn.management.internal.NonDeploymentManagementContext;
 import brooklyn.mementos.BrooklynMemento;
 import brooklyn.mementos.CatalogItemMemento;
 import brooklyn.mementos.EnricherMemento;
@@ -53,6 +54,7 @@ import brooklyn.mementos.LocationMemento;
 import brooklyn.mementos.Memento;
 import brooklyn.mementos.PolicyMemento;
 import brooklyn.policy.Enricher;
+import brooklyn.policy.EntityAdjunct;
 import brooklyn.policy.Policy;
 import brooklyn.policy.basic.AbstractPolicy;
 import brooklyn.util.collections.MutableMap;
@@ -178,7 +180,7 @@ public class MementosGenerators {
         for (Location location : entity.getLocations()) {
             builder.locations.add(location.getId()); 
         }
-        
+
         for (Entity child : entity.getChildren()) {
             builder.children.add(child.getId()); 
         }
@@ -385,7 +387,9 @@ public class MementosGenerators {
         builder.catalogItemId = instance.getCatalogItemId();
         builder.type = instance.getClass().getName();
         builder.typeClass = instance.getClass();
-        
+        if (instance instanceof EntityAdjunct) {
+            builder.uniqueTag = ((EntityAdjunct)instance).getUniqueTag();
+        }
         for (Object tag : instance.tags().getTags()) {
             builder.tags.add(tag); 
         }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/event/feed/AbstractFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/feed/AbstractFeed.java b/core/src/main/java/brooklyn/event/feed/AbstractFeed.java
index 5303ac7..d487a9a 100644
--- a/core/src/main/java/brooklyn/event/feed/AbstractFeed.java
+++ b/core/src/main/java/brooklyn/event/feed/AbstractFeed.java
@@ -86,15 +86,9 @@ public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed
 
     protected void initUniqueTag(String uniqueTag, Object ...valsForDefault) {
         if (Strings.isNonBlank(uniqueTag)) this.uniqueTag = uniqueTag;
-        else if (Strings.isBlank(this.uniqueTag)) this.uniqueTag = getDefaultUniqueTag(valsForDefault);
+        else this.uniqueTag = getDefaultUniqueTag(valsForDefault);
     }
 
-    @Override
-    public String getUniqueTag() {
-        if (Strings.isBlank(uniqueTag)) initUniqueTag(null);
-        return super.getUniqueTag();
-    }
-    
     protected String getDefaultUniqueTag(Object ...valsForDefault) {
         StringBuilder sb = new StringBuilder();
         sb.append(JavaClassNames.simpleClassName(this));
@@ -117,29 +111,6 @@ public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed
     }
 
     @Override
-    public boolean isActivated() {
-        return activated;
-    }
-    
-    @Override
-    public boolean isActive() {
-        return activated && !suspended;
-    }
-    
-    public EntityLocal getEntity() {
-        return entity;
-    }
-    
-    protected boolean isConnected() {
-        // TODO Default impl will result in multiple logs for same error if becomes unreachable
-        // (e.g. if ssh gets NoRouteToHostException, then every AttributePollHandler for that
-        // feed will log.warn - so if polling for 10 sensors/attributes will get 10 log messages).
-        // Would be nice if reduced this logging duplication.
-        // (You can reduce it by providing a better 'isConnected' implementation of course.)
-        return isActivated() && entity!=null && !((EntityInternal)entity).getManagementSupport().isNoLongerManaged();
-    }
-
-    @Override
     public void start() {
         if (log.isDebugEnabled()) log.debug("Starting feed {} for {}", this, entity);
         if (activated) { 
@@ -206,13 +177,36 @@ public abstract class AbstractFeed extends AbstractEntityAdjunct implements Feed
     }
 
     @Override
+    public boolean isActivated() {
+        return activated;
+    }
+    
+    @Override
+    public boolean isActive() {
+        return isRunning();
+    }
+    
+    public EntityLocal getEntity() {
+        return entity;
+    }
+    
+    protected boolean isConnected() {
+        // TODO Default impl will result in multiple logs for same error if becomes unreachable
+        // (e.g. if ssh gets NoRouteToHostException, then every AttributePollHandler for that
+        // feed will log.warn - so if polling for 10 sensors/attributes will get 10 log messages).
+        // Would be nice if reduced this logging duplication.
+        // (You can reduce it by providing a better 'isConnected' implementation of course.)
+        return isRunning() && entity!=null && !((EntityInternal)entity).getManagementSupport().isNoLongerManaged();
+    }
+
+    @Override
     public boolean isSuspended() {
         return suspended;
     }
 
     @Override
     public boolean isRunning() {
-        return !isSuspended() && !isDestroyed();
+        return isActivated() && !isSuspended() && !isDestroyed() && getPoller()!=null && getPoller().isRunning();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/event/feed/FeedConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/feed/FeedConfig.java b/core/src/main/java/brooklyn/event/feed/FeedConfig.java
index 54eeea1..64642e1 100644
--- a/core/src/main/java/brooklyn/event/feed/FeedConfig.java
+++ b/core/src/main/java/brooklyn/event/feed/FeedConfig.java
@@ -24,6 +24,7 @@ import brooklyn.event.AttributeSensor;
 import brooklyn.event.basic.Sensors;
 import brooklyn.event.feed.http.HttpPollConfig;
 import brooklyn.util.collections.MutableList;
+import brooklyn.util.guava.Functionals;
 import brooklyn.util.javalang.JavaClassNames;
 import brooklyn.util.text.Strings;
 
@@ -106,6 +107,11 @@ public class FeedConfig<V, T, F extends FeedConfig<V, T, F>> {
     }
     /** as {@link #checkSuccess(Predicate)} */
     public F checkSuccess(final Function<? super V,Boolean> val) {
+        return checkSuccess(Functionals.predicate(val));
+    }
+    @SuppressWarnings("unused")
+    /** @deprecated since 0.7.0, kept for rebind */ @Deprecated
+    private F checkSuccessLegacy(final Function<? super V,Boolean> val) {
         return checkSuccess(new Predicate<V>() {
             @Override
             public boolean apply(V input) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/event/feed/Poller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/feed/Poller.java b/core/src/main/java/brooklyn/event/feed/Poller.java
index 8aa8252..6fa9147 100644
--- a/core/src/main/java/brooklyn/event/feed/Poller.java
+++ b/core/src/main/java/brooklyn/event/feed/Poller.java
@@ -56,7 +56,7 @@ public class Poller<V> {
     private final Set<PollJob<V>> pollJobs = new LinkedHashSet<PollJob<V>>();
     private final Set<Task<?>> oneOffTasks = new LinkedHashSet<Task<?>>();
     private final Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>();
-    private volatile boolean running = false;
+    private volatile boolean started = false;
     
     private static class PollJob<V> {
         final PollHandler<? super V> handler;
@@ -106,7 +106,7 @@ public class Poller<V> {
     
     /** Submits a one-off poll job; recommended that callers supply to-String so that task has a decent description */
     public void submit(Callable<?> job) {
-        if (running) {
+        if (started) {
             throw new IllegalStateException("Cannot submit additional tasks after poller has started");
         }
         oneOffJobs.add(job);
@@ -116,7 +116,7 @@ public class Poller<V> {
         scheduleAtFixedRate(job, handler, Duration.millis(period));
     }
     public void scheduleAtFixedRate(Callable<V> job, PollHandler<? super V> handler, Duration period) {
-        if (running) {
+        if (started) {
             throw new IllegalStateException("Cannot schedule additional tasks after poller has started");
         }
         PollJob<V> foo = new PollJob<V>(job, handler, period);
@@ -129,12 +129,12 @@ public class Poller<V> {
         // Is that ok, are can we do better?
         
         if (log.isDebugEnabled()) log.debug("Starting poll for {} (using {})", new Object[] {entity, this});
-        if (running) { 
+        if (started) { 
             throw new IllegalStateException(String.format("Attempt to start poller %s of entity %s when already running", 
                     this, entity));
         }
         
-        running = true;
+        started = true;
         
         for (final Callable<?> oneOffJob : oneOffJobs) {
             Task<?> task = Tasks.builder().dynamic(false).body((Callable<Object>) oneOffJob).name("Poll").description("One-time poll job "+oneOffJob).build();
@@ -168,12 +168,12 @@ public class Poller<V> {
     
     public void stop() {
         if (log.isDebugEnabled()) log.debug("Stopping poll for {} (using {})", new Object[] {entity, this});
-        if (!running) { 
+        if (!started) { 
             throw new IllegalStateException(String.format("Attempt to stop poller %s of entity %s when not running", 
                     this, entity));
         }
         
-        running = false;
+        started = false;
         for (Task<?> task : oneOffTasks) {
             if (task != null) task.cancel(true);
         }
@@ -185,7 +185,17 @@ public class Poller<V> {
     }
 
     public boolean isRunning() {
-        return running;
+        boolean hasActiveTasks = false;
+        for (Task<?> task: tasks) {
+            if (task.isBegun() && !task.isDone()) {
+                hasActiveTasks = true;
+                break;
+            }
+        }
+        if (!started && hasActiveTasks) {
+            log.warn("Poller should not be running, but has active tasks, tasks: "+tasks);
+        }
+        return started && hasActiveTasks;
     }
     
     protected boolean isEmpty() {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/event/feed/function/FunctionPollConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/feed/function/FunctionPollConfig.java b/core/src/main/java/brooklyn/event/feed/function/FunctionPollConfig.java
index 0f12672..8db02b1 100644
--- a/core/src/main/java/brooklyn/event/feed/function/FunctionPollConfig.java
+++ b/core/src/main/java/brooklyn/event/feed/function/FunctionPollConfig.java
@@ -27,6 +27,7 @@ import brooklyn.event.AttributeSensor;
 import brooklyn.event.feed.FeedConfig;
 import brooklyn.event.feed.PollConfig;
 import brooklyn.util.GroovyJavaMethods;
+import brooklyn.util.guava.Functionals;
 import brooklyn.util.javalang.JavaClassNames;
 
 import com.google.common.base.Supplier;
@@ -74,6 +75,13 @@ public class FunctionPollConfig<S, T> extends PollConfig<S, T, FunctionPollConfi
      */
     @SuppressWarnings("unchecked")
     public <newS> FunctionPollConfig<newS, T> supplier(final Supplier<? extends newS> val) {
+        this.callable = Functionals.callable( checkNotNull(val, "supplier") );
+        return (FunctionPollConfig<newS, T>) this;
+    }
+    
+    /** @deprecated since 0.7.0, kept for legacy compatibility when deserializing */
+    @SuppressWarnings({ "unchecked", "unused" })
+    private <newS> FunctionPollConfig<newS, T> supplierLegacy(final Supplier<? extends newS> val) {
         checkNotNull(val, "supplier");
         this.callable = new Callable<newS>() {
             @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java b/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java
index 0eb96e0..402fd4d 100644
--- a/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java
+++ b/core/src/main/java/brooklyn/management/internal/BrooklynGarbageCollector.java
@@ -216,13 +216,16 @@ public class BrooklynGarbageCollector {
         if (LOG.isDebugEnabled())
             LOG.debug(prefix+" - using "+getUsageString());
     }
-    
-    public String getUsageString() {
-        return
-            Strings.makeSizeString(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())+" / "+
+
+    public static String makeBasicUsageString() {
+        return Strings.makeSizeString(Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory())+" / "+
             Strings.makeSizeString(Runtime.getRuntime().totalMemory()) + " memory" +
             " ("+Strings.makeSizeString(MemoryUsageTracker.SOFT_REFERENCES.getBytesUsed()) + " soft); "+
-            Thread.activeCount()+" threads; "+
+            Thread.activeCount()+" threads";
+    }
+    
+    public String getUsageString() {
+        return makeBasicUsageString()+"; "+
             "storage: " + storage.getStorageMetrics() + "; " +
             "tasks: " +
             executionManager.getNumActiveTasks()+" active, "+

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java b/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java
index 60597b5..a005e34 100644
--- a/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java
+++ b/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java
@@ -365,13 +365,19 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
         return new AdjunctTagSupport();
     }
 
-    protected class AdjunctTagSupport extends BasicTagSupport {
+    public class AdjunctTagSupport extends BasicTagSupport {
         @Override
         public Set<Object> getTags() {
             ImmutableSet.Builder<Object> rb = ImmutableSet.builder().addAll(super.getTags());
             if (getUniqueTag()!=null) rb.add(getUniqueTag());
             return rb.build();
         }
+        public String getUniqueTag() {
+            return AbstractEntityAdjunct.this.getUniqueTag();
+        }
+        public void setUniqueTag(String uniqueTag) {
+            AbstractEntityAdjunct.this.uniqueTag = uniqueTag;
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/test/java/brooklyn/entity/rebind/RebindFeedTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindFeedTest.java b/core/src/test/java/brooklyn/entity/rebind/RebindFeedTest.java
index e3d3a2a..18c1edc 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindFeedTest.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindFeedTest.java
@@ -23,10 +23,10 @@ import static org.testng.Assert.assertEquals;
 import java.net.URL;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.Callable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -49,15 +49,20 @@ import brooklyn.event.feed.ssh.SshValueFunctions;
 import brooklyn.location.Location;
 import brooklyn.location.basic.LocalhostMachineProvisioningLocation;
 import brooklyn.location.basic.SshMachineLocation;
-import brooklyn.management.Task;
+import brooklyn.management.internal.BrooklynGarbageCollector;
 import brooklyn.test.EntityTestUtils;
 import brooklyn.test.entity.TestEntity;
 import brooklyn.test.entity.TestEntityImpl.TestEntityWithoutEnrichers;
+import brooklyn.util.collections.MutableList;
 import brooklyn.util.http.BetterMockWebServer;
-import brooklyn.util.repeat.Repeater;
 import brooklyn.util.task.BasicExecutionManager;
+import brooklyn.util.text.Identifiers;
+import brooklyn.util.text.Strings;
 import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
 
+import com.google.common.base.Function;
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Callables;
@@ -104,6 +109,7 @@ public class RebindFeedTest extends RebindTestFixtureWithApp {
         assertEquals(origEntity.feeds().getFeeds().size(), 1);
 
         final long taskCountBefore = ((BasicExecutionManager)origManagementContext.getExecutionManager()).getNumIncompleteTasks();
+        log.info("Count of incomplete tasks before "+taskCountBefore);
         
         log.info("Tasks before rebind: "+
             ((BasicExecutionManager)origManagementContext.getExecutionManager()).getAllTasks());
@@ -124,21 +130,8 @@ public class RebindFeedTest extends RebindTestFixtureWithApp {
         Entities.unmanage(origApp);
         origApp = null;
         origManagementContext.getRebindManager().stop();
-        Repeater.create().every(Duration.millis(20)).limitTimeTo(Duration.TEN_SECONDS).until(new Callable<Boolean>() {
-            @Override
-            public Boolean call() throws Exception {
-                origManagementContext.getGarbageCollector().gcIteration();
-                long taskCountAfterAtOld = ((BasicExecutionManager)origManagementContext.getExecutionManager()).getNumIncompleteTasks();
-                List<Task<?>> tasks = ((BasicExecutionManager)origManagementContext.getExecutionManager()).getAllTasks();
-                int unendedTasks = 0;
-                for (Task<?> t: tasks) {
-                    if (!t.isDone()) unendedTasks++;
-                }
-                log.info("Incomplete task count from "+taskCountBefore+" to "+taskCountAfterAtOld+", "+unendedTasks+" unended; tasks remembered are: "+
-                    tasks);
-                return taskCountAfterAtOld==0;
-            }
-        }).runRequiringTrue();
+        
+        waitForTaskCountToBecome(origManagementContext, 0);
     }
 
     @Test(groups="Integration", invocationCount=50)
@@ -187,6 +180,78 @@ public class RebindFeedTest extends RebindTestFixtureWithApp {
         EntityTestUtils.assertAttributeEqualsEventually(newEntity, SENSOR_INT, (Integer)0);
     }
 
+    @Test
+    public void testReRebindDedupesCorrectlyBasedOnTagOrContentsPersisted() throws Exception {
+        doReReReRebindDedupesCorrectlyBasedOnTagOrContentsPersisted(-1, 2, false);
+    }
+    
+    @Test(groups="Integration")
+    public void testReReReReRebindDedupesCorrectlyBasedOnTagOrContentsPersisted() throws Exception {
+        doReReReRebindDedupesCorrectlyBasedOnTagOrContentsPersisted(1000*1000, 50, true);
+    }
+    
+    public void doReReReRebindDedupesCorrectlyBasedOnTagOrContentsPersisted(int datasize, int iterations, boolean soakTest) throws Exception {
+        final int SYSTEM_TASK_COUNT = 2;  // normally 1, persistence; but as long as less than 4 (the original) we're fine
+        final int MAX_ALLOWED_LEAK = 50*1000*1000;  // memory can vary wildly; but our test should eventually hit GB if there's a leak so this is fine
+        
+        TestEntity origEntity = origApp.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithNewFeedsEachTimeImpl.class)
+            .configure(MyEntityWithNewFeedsEachTimeImpl.DATA_SIZE, datasize)
+            .configure(MyEntityWithNewFeedsEachTimeImpl.MAKE_NEW, true));
+        origApp.start(ImmutableList.<Location>of());
+
+        List<Feed> knownFeeds = MutableList.of();
+        TestEntity currentEntity = origEntity;
+        Collection<Feed> currentFeeds = currentEntity.feeds().getFeeds();
+        
+        int expectedCount = 4;
+        assertEquals(currentFeeds.size(), expectedCount);
+        knownFeeds.addAll(currentFeeds);
+        assertEquals(countActive(knownFeeds), expectedCount);
+        origEntity.setConfig(MyEntityWithNewFeedsEachTimeImpl.MAKE_NEW, !soakTest);
+        
+        long usedOriginally = -1;
+        
+        for (int i=0; i<iterations; i++) {
+            log.info("rebinding, iteration "+i);
+            newApp = rebind();
+            
+            // should get 2 new ones
+            if (!soakTest) expectedCount += 2;
+            
+            currentEntity = (TestEntity) Iterables.getOnlyElement(newApp.getChildren());
+            currentFeeds = currentEntity.feeds().getFeeds();
+            assertEquals(currentFeeds.size(), expectedCount, "feeds are: "+currentFeeds);
+            knownFeeds.addAll(currentFeeds);
+
+            switchOriginalToNewManagementContext();
+            waitForTaskCountToBecome(origManagementContext, expectedCount + SYSTEM_TASK_COUNT);
+            assertEquals(countActive(knownFeeds), expectedCount);
+            knownFeeds.clear();
+            knownFeeds.addAll(currentFeeds);
+            
+            if (soakTest) {
+                System.gc(); System.gc();
+                if (usedOriginally<0) {
+                    Time.sleep(Duration.millis(200));  // give things time to settle; means this number should be larger than others, if anything
+                    usedOriginally = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+                    log.info("Usage after first rebind: "+BrooklynGarbageCollector.makeBasicUsageString()+" ("+Strings.makeJavaSizeString(usedOriginally)+")");
+                } else {
+                    long usedNow = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+                    log.info("Usage: "+BrooklynGarbageCollector.makeBasicUsageString()+" ("+Strings.makeJavaSizeString(usedNow)+")");
+                    Assert.assertFalse(usedNow-usedOriginally > MAX_ALLOWED_LEAK, "Leaked too much memory: "+Strings.makeJavaSizeString(usedNow)+" now used, was "+Strings.makeJavaSizeString(usedOriginally));
+                }
+            }
+        }
+    }
+    
+    private int countActive(List<Feed> knownFeeds) {
+        int activeCount=0;
+        for (Feed f: knownFeeds) {
+            if (f.isRunning()) activeCount++;
+        }
+        return activeCount;
+    }
+
     public static class MyEntityWithHttpFeedImpl extends TestEntityWithoutEnrichers {
         public static final ConfigKey<URL> BASE_URL = ConfigKeys.newConfigKey(URL.class, "rebindFeedTest.baseUrl");
         
@@ -238,4 +303,81 @@ public class RebindFeedTest extends RebindTestFixtureWithApp {
                     .build());
         }
     }
+    
+    public static class MyEntityWithNewFeedsEachTimeImpl extends TestEntityWithoutEnrichers {
+        public static final ConfigKey<Integer> DATA_SIZE = ConfigKeys.newIntegerConfigKey("datasize", "size of data", -1);
+        public static final ConfigKey<Boolean> MAKE_NEW = ConfigKeys.newBooleanConfigKey("makeNew", "whether to make the 'new' ones each time", true);
+        
+        @Override
+        public void init() {
+            super.init();
+            connectSensors();
+        }
+
+        @Override
+        public void rebind() {
+            super.rebind();
+            connectSensors();
+        }
+        
+        public static class BigStringSupplier implements Supplier<String> {
+            final String prefix;
+            final int size;
+            // just to take up memory/disk space
+            final String sample;
+            public BigStringSupplier(String prefix, int size) {
+                this.prefix = prefix;
+                this.size = size;
+                sample = get();
+            }
+            public String get() {
+                return prefix + (size>=0 ? "-"+Identifiers.makeRandomId(size) : "");
+            }
+            @Override
+            public boolean equals(Object obj) {
+                return (obj instanceof BigStringSupplier) && prefix.equals(((BigStringSupplier)obj).prefix);
+            }
+            @Override
+            public int hashCode() {
+                return prefix.hashCode();
+            }
+        }
+        
+        public void connectSensors() {
+            final Duration PERIOD = Duration.FIVE_SECONDS;
+            int size = getConfig(DATA_SIZE);
+            boolean makeNew = getConfig(MAKE_NEW);
+
+            if (makeNew) addFeed(FunctionFeed.builder().entity(this).period(PERIOD)
+                .poll(FunctionPollConfig.forSensor(SENSOR_STRING)
+                    .supplier(new BigStringSupplier("new-each-time-entity-"+this+"-created-"+System.currentTimeMillis()+"-"+Identifiers.makeRandomId(4), size))
+                    .onResult(new IdentityFunctionLogging())).build());
+
+            addFeed(FunctionFeed.builder().entity(this).period(PERIOD)
+                .poll(FunctionPollConfig.forSensor(SENSOR_STRING)
+                    .supplier(new BigStringSupplier("same-each-time-entity-"+this, size))
+                    .onResult(new IdentityFunctionLogging())).build());
+
+            if (makeNew) addFeed(FunctionFeed.builder().entity(this).period(PERIOD)
+                .uniqueTag("new-each-time-"+Identifiers.makeRandomId(4)+"-"+System.currentTimeMillis())
+                .poll(FunctionPollConfig.forSensor(SENSOR_STRING)
+                    .supplier(new BigStringSupplier("new-each-time-entity-"+this, size))
+                    .onResult(new IdentityFunctionLogging())).build());
+
+            addFeed(FunctionFeed.builder().entity(this).period(PERIOD)
+                .uniqueTag("same-each-time-entity-"+this)
+                .poll(FunctionPollConfig.forSensor(SENSOR_STRING)
+                    .supplier(new BigStringSupplier("same-each-time-entity-"+this, size))
+                    .onResult(new IdentityFunctionLogging())).build());
+        }
+    }
+    
+    public static class IdentityFunctionLogging implements Function<Object,String> {
+        @Override
+        public String apply(Object input) {
+            System.out.println(Strings.maxlen(Strings.toString(input), 80));
+            return Strings.toString(input);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
index 24bf357..4337c62 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
@@ -21,7 +21,9 @@ package brooklyn.entity.rebind;
 import static org.testng.Assert.assertEquals;
 
 import java.io.File;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,18 +33,23 @@ import org.testng.annotations.BeforeMethod;
 import brooklyn.catalog.BrooklynCatalog;
 import brooklyn.catalog.CatalogItem;
 import brooklyn.catalog.internal.CatalogUtils;
+import brooklyn.entity.Application;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.basic.EntityFunctions;
 import brooklyn.entity.basic.StartableApplication;
 import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
 import brooklyn.entity.rebind.persister.FileBasedObjectStore;
 import brooklyn.entity.rebind.persister.PersistMode;
+import brooklyn.entity.trait.Startable;
 import brooklyn.management.ManagementContext;
+import brooklyn.management.Task;
 import brooklyn.management.ha.HighAvailabilityMode;
 import brooklyn.management.internal.LocalManagementContext;
 import brooklyn.management.internal.ManagementContextInternal;
 import brooklyn.mementos.BrooklynMementoManifest;
 import brooklyn.util.os.Os;
+import brooklyn.util.repeat.Repeater;
+import brooklyn.util.task.BasicExecutionManager;
 import brooklyn.util.text.Identifiers;
 import brooklyn.util.time.Duration;
 
@@ -95,6 +102,37 @@ public abstract class RebindTestFixture<T extends StartableApplication> {
                 .buildUnstarted();
     }
 
+    /** terminates the original management context (not destroying items) and points it at the new one (and same for apps); 
+     * then clears the variables for the new one, so you can re-rebind */
+    protected void switchOriginalToNewManagementContext() {
+        origManagementContext.getRebindManager().stopPersistence();
+        for (Application e: origManagementContext.getApplications()) ((Startable)e).stop();
+        waitForTaskCountToBecome(origManagementContext, 0);
+        origManagementContext.terminate();
+        origManagementContext = (LocalManagementContext) newManagementContext;
+        origApp = newApp;
+        newManagementContext = null;
+        newApp = null;
+    }
+
+    public static void waitForTaskCountToBecome(final ManagementContext mgmt, final int allowedMax) {
+        Repeater.create().every(Duration.millis(20)).limitTimeTo(Duration.TEN_SECONDS).until(new Callable<Boolean>() {
+            @Override
+            public Boolean call() throws Exception {
+                ((LocalManagementContext)mgmt).getGarbageCollector().gcIteration();
+                long taskCountAfterAtOld = ((BasicExecutionManager)mgmt.getExecutionManager()).getNumIncompleteTasks();
+                List<Task<?>> tasks = ((BasicExecutionManager)mgmt.getExecutionManager()).getAllTasks();
+                int unendedTasks = 0;
+                for (Task<?> t: tasks) {
+                    if (!t.isDone()) unendedTasks++;
+                }
+                LOG.info("Count of incomplete tasks now "+taskCountAfterAtOld+", "+unendedTasks+" unended; tasks remembered are: "+
+                    tasks);
+                return taskCountAfterAtOld<=allowedMax;
+            }
+        }).runRequiringTrue();
+    }
+
     protected boolean useLiveManagementContext() {
         return false;
     }
@@ -179,7 +217,7 @@ public abstract class RebindTestFixture<T extends StartableApplication> {
     @SuppressWarnings("unchecked")
     protected T rebind(RebindOptions options) throws Exception {
         if (newApp != null || newManagementContext != null) {
-            throw new IllegalStateException("already rebound");
+            throw new IllegalStateException("already rebound - use switchOriginalToNewManagementContext() if you are trying to rebind multiple times");
         }
         
         options = RebindOptions.create(options);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/test/java/brooklyn/event/feed/function/FunctionFeedTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/event/feed/function/FunctionFeedTest.java b/core/src/test/java/brooklyn/event/feed/function/FunctionFeedTest.java
index f2e71a4..1b67b8b 100644
--- a/core/src/test/java/brooklyn/event/feed/function/FunctionFeedTest.java
+++ b/core/src/test/java/brooklyn/event/feed/function/FunctionFeedTest.java
@@ -36,6 +36,7 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import brooklyn.entity.BrooklynAppUnitTestSupport;
+import brooklyn.entity.Feed;
 import brooklyn.entity.basic.EntityInternal;
 import brooklyn.entity.basic.EntityInternal.FeedSupport;
 import brooklyn.entity.basic.EntityLocal;
@@ -55,6 +56,7 @@ import com.google.common.base.Functions;
 import com.google.common.base.Predicates;
 import com.google.common.base.Suppliers;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Callables;
 
 public class FunctionFeedTest extends BrooklynAppUnitTestSupport {
@@ -104,6 +106,28 @@ public class FunctionFeedTest extends BrooklynAppUnitTestSupport {
     }
     
     @Test
+    public void testFeedDeDupe() throws Exception {
+        testPollsFunctionRepeatedlyToSetAttribute();
+        entity.addFeed(feed);
+        log.info("Feed 0 is: "+feed);
+        Feed feed0 = feed;
+        
+        testPollsFunctionRepeatedlyToSetAttribute();
+        entity.addFeed(feed);
+        log.info("Feed 1 is: "+feed);
+        Feed feed1 = feed;
+        Assert.assertFalse(feed1==feed0);
+
+        FeedSupport feeds = ((EntityInternal)entity).feeds();
+        Assert.assertEquals(feeds.getFeeds().size(), 1, "Wrong feed count: "+feeds.getFeeds());
+
+        // a couple extra checks, compared to the de-dupe test in other *FeedTest classes
+        Feed feedAdded = Iterables.getOnlyElement(feeds.getFeeds());
+        Assert.assertTrue(feedAdded==feed1);
+        Assert.assertFalse(feedAdded==feed0);
+    }
+    
+    @Test
     public void testCallsOnSuccessWithResultOfCallable() throws Exception {
         feed = FunctionFeed.builder()
                 .entity(entity)
@@ -224,20 +248,6 @@ public class FunctionFeedTest extends BrooklynAppUnitTestSupport {
                 .onFailureOrException(Functions.<Integer>constant(null));
     }
     
-    @Test
-    public void testFeedDeDupe() throws Exception {
-        testPollsFunctionRepeatedlyToSetAttribute();
-        entity.addFeed(feed);
-        log.info("Feed 0 is: "+feed);
-        
-        testPollsFunctionRepeatedlyToSetAttribute();
-        log.info("Feed 1 is: "+feed);
-        entity.addFeed(feed);
-                
-        FeedSupport feeds = ((EntityInternal)entity).feeds();
-        Assert.assertEquals(feeds.getFeeds().size(), 1, "Wrong feed count: "+feeds.getFeeds());
-    }
-    
     private static class IncrementingCallable implements Callable<Integer> {
         private final AtomicInteger next = new AtomicInteger(0);
         

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/core/src/test/java/brooklyn/management/ha/HotStandbyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/ha/HotStandbyTest.java b/core/src/test/java/brooklyn/management/ha/HotStandbyTest.java
index d76faeb..9c36eb8 100644
--- a/core/src/test/java/brooklyn/management/ha/HotStandbyTest.java
+++ b/core/src/test/java/brooklyn/management/ha/HotStandbyTest.java
@@ -27,6 +27,7 @@ import java.util.Date;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +43,9 @@ import brooklyn.entity.basic.Entities;
 import brooklyn.entity.proxying.EntitySpec;
 import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl;
 import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithFunctionFeedImpl;
+import brooklyn.entity.rebind.RebindFeedTest.MyEntityWithNewFeedsEachTimeImpl;
 import brooklyn.entity.rebind.RebindManagerImpl;
+import brooklyn.entity.rebind.RebindTestFixture;
 import brooklyn.entity.rebind.persister.BrooklynMementoPersisterToObjectStore;
 import brooklyn.entity.rebind.persister.InMemoryObjectStore;
 import brooklyn.entity.rebind.persister.ListeningObjectStore;
@@ -60,6 +63,7 @@ import brooklyn.test.entity.TestEntity;
 import brooklyn.util.collections.MutableList;
 import brooklyn.util.collections.MutableMap;
 import brooklyn.util.javalang.JavaClassNames;
+import brooklyn.util.repeat.Repeater;
 import brooklyn.util.text.ByteSizeStrings;
 import brooklyn.util.time.Duration;
 import brooklyn.util.time.Time;
@@ -599,20 +603,58 @@ public class HotStandbyTest {
     }
     
     @Test
-    public void testHotStandbyDoesNoStartFeeds() throws Exception {
+    public void testHotStandbyDoesNotStartFeeds() throws Exception {
         HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
         TestApplication app = createFirstAppAndPersist(n1);
         TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithFunctionFeedImpl.class));
         forcePersistNow(n1);
+        Assert.assertTrue(entity.feeds().getFeeds().size() > 0, "Feeds: "+entity.feeds().getFeeds());
         for (Feed feed : entity.feeds().getFeeds()) {
-            assertTrue(feed.isActive(), "Feed expected running, but it is non-running");
+            assertTrue(feed.isRunning(), "Feed expected running, but it is non-running");
         }
 
         HaMgmtNode n2 = createHotStandby(Duration.PRACTICALLY_FOREVER);
         TestEntity entityRO = (TestEntity) n2.mgmt.lookup(entity.getId(), Entity.class);
+        Assert.assertTrue(entityRO.feeds().getFeeds().size() > 0, "Feeds: "+entity.feeds().getFeeds());
         for (Feed feedRO : entityRO.feeds().getFeeds()) {
-            assertFalse(feedRO.isActive(), "Feed expected non-active, but it is running");
+            assertFalse(feedRO.isRunning(), "Feed expected non-active, but it is running");
         }
     }
+    
+    @Test(groups="Integration")
+    public void testHotStandbyDoesNotStartFeedsRebindingManyTimes() throws Exception {
+        testHotStandbyDoesNotStartFeeds();
+        final HaMgmtNode hsb = createHotStandby(Duration.millis(10));
+        Repeater.create("until 10 rebinds").every(Duration.millis(100)).until(
+            new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    return ((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10;
+                }
+            }).runRequiringTrue();
+        // make sure not too many tasks (allowing 5 for rebind etc; currently just 2)
+        RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5);
+    }
+
+    @Test(groups="Integration")
+    public void testHotStandbyDoesNotStartFeedsRebindingManyTimesWithAnotherFeedGenerator() throws Exception {
+        HaMgmtNode n1 = createMaster(Duration.PRACTICALLY_FOREVER);
+        TestApplication app = createFirstAppAndPersist(n1);
+        TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityWithNewFeedsEachTimeImpl.class));
+        forcePersistNow(n1);
+        Assert.assertTrue(entity.feeds().getFeeds().size() == 4, "Feeds: "+entity.feeds().getFeeds());
+        
+        final HaMgmtNode hsb = createHotStandby(Duration.millis(10));
+        Repeater.create("until 10 rebinds").every(Duration.millis(100)).until(
+            new Callable<Boolean>() {
+                @Override
+                public Boolean call() throws Exception {
+                    return ((RebindManagerImpl)hsb.mgmt.getRebindManager()).getReadOnlyRebindCount() >= 10;
+                }
+            }).runRequiringTrue();
+        // make sure not too many tasks (allowing 5 for rebind etc; currently just 2)
+        RebindTestFixture.waitForTaskCountToBecome(hsb.mgmt, 5);
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/ff0abc34/utils/common/src/main/java/brooklyn/util/guava/Functionals.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/guava/Functionals.java b/utils/common/src/main/java/brooklyn/util/guava/Functionals.java
index dcbd9da..08e41b1 100644
--- a/utils/common/src/main/java/brooklyn/util/guava/Functionals.java
+++ b/utils/common/src/main/java/brooklyn/util/guava/Functionals.java
@@ -90,6 +90,9 @@ public class Functionals {
             @Override public O apply(I input) {
                 return supplier.get();
             }
+            @Override public String toString() {
+                return "function("+supplier+")";
+            }
         }
         return new SupplierAsFunction();
     }
@@ -120,6 +123,10 @@ public class Functionals {
             public T call() {
                 return supplier.get();
             }
+            @Override
+            public String toString() {
+                return "callable("+supplier+")";
+            }
         }
         return new SupplierAsCallable();
     }
@@ -127,4 +134,18 @@ public class Functionals {
         return callable(Suppliers.compose(f, Suppliers.ofInstance(x)));
     }
 
+    public static <T> Predicate<T> predicate(final Function<T,Boolean> f) {
+        class FunctionAsPredicate implements Predicate<T> {
+            @Override
+            public boolean apply(T input) {
+                return f.apply(input);
+            }
+            @Override
+            public String toString() {
+                return "predicate("+f+")";
+            }
+        }
+        return new FunctionAsPredicate();
+    }
+
 }