You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:06:17 UTC
[19/24] incubator-brooklyn git commit: [BROOKLYN-162] Renaming
package policy
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java b/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java
deleted file mode 100644
index 525ba20..0000000
--- a/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.autoscaling;
-
-import java.util.List;
-
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.collections.TimeWindowedList;
-import brooklyn.util.collections.TimestampedValue;
-import brooklyn.util.time.Duration;
-
-import com.google.common.base.Objects;
-
-/**
- * Using a {@link TimeWindowedList}, tracks the recent history of values to allow a summary of
- * those values to be obtained.
- *
- * @author aled
- */
-public class SizeHistory {
-
- public static class WindowSummary {
- /** The most recent value (or -1 if there has been no value) */
- public final long latest;
-
- /** The minimum vaule within the given time period */
- public final long min;
-
- /** The maximum vaule within the given time period */
- public final long max;
-
- /** true if, since that max value, there have not been any higher values */
- public final boolean stableForGrowth;
-
- /** true if, since that low value, there have not been any lower values */
- public final boolean stableForShrinking;
-
- public WindowSummary(long latest, long min, long max, boolean stableForGrowth, boolean stableForShrinking) {
- this.latest = latest;
- this.min = min;
- this.max = max;
- this.stableForGrowth = stableForGrowth;
- this.stableForShrinking = stableForShrinking;
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this).add("latest", latest).add("min", min).add("max", max)
- .add("stableForGrowth", stableForGrowth).add("stableForShrinking", stableForShrinking).toString();
- }
- }
-
- private final TimeWindowedList<Number> recentDesiredResizes;
-
- public SizeHistory(long windowSize) {
- recentDesiredResizes = new TimeWindowedList<Number>(MutableMap.of("timePeriod", windowSize, "minExpiredVals", 1));
- }
-
- public void add(final int val) {
- recentDesiredResizes.add(val);
- }
-
- public void setWindowSize(Duration newWindowSize) {
- recentDesiredResizes.setTimePeriod(newWindowSize);
- }
-
- /**
- * Summarises the history of values in this time window, with a few special things:
- * <ul>
- * <li>If entire time-window is not covered by the given values, then min is Integer.MIN_VALUE and max is Integer.MAX_VALUE
- * <li>If no values, then latest is -1
- * <li>If no recent values, then keeps last-seen value (no matter how old), to use that
- * <li>"stable for growth" means that since that max value, there have not been any higher values
- * <li>"stable for shrinking" means that since that low value, there have not been any lower values
- * </ul>
- */
- public WindowSummary summarizeWindow(Duration windowSize) {
- long now = System.currentTimeMillis();
- List<TimestampedValue<Number>> windowVals = recentDesiredResizes.getValuesInWindow(now, windowSize);
-
- Number latestObj = latestInWindow(windowVals);
- long latest = (latestObj == null) ? -1: latestObj.longValue();
- long max = maxInWindow(windowVals, windowSize).longValue();
- long min = minInWindow(windowVals, windowSize).longValue();
-
- // TODO Could do more sophisticated "stable" check; this is the easiest code - correct but not most efficient
- // in terms of the caller having to schedule additional stability checks.
- boolean stable = (min == max);
-
- return new WindowSummary(latest, min, max, stable, stable);
- }
-
- /**
- * If the entire time-window is not covered by the given values, then returns Integer.MAX_VALUE.
- */
- private <T extends Number> T maxInWindow(List<TimestampedValue<T>> vals, Duration timeWindow) {
- // TODO bad casting from Integer default result to T
- long now = System.currentTimeMillis();
- long epoch = now - timeWindow.toMilliseconds();
- T result = null;
- double resultAsDouble = Integer.MAX_VALUE;
- for (TimestampedValue<T> val : vals) {
- T valAsNum = val.getValue();
- double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0;
- if (result == null && val.getTimestamp() > epoch) {
- result = withDefault(null, Integer.MAX_VALUE);
- resultAsDouble = result.doubleValue();
- }
- if (result == null || (valAsNum != null && valAsDouble > resultAsDouble)) {
- result = valAsNum;
- resultAsDouble = valAsDouble;
- }
- }
- return withDefault(result, Integer.MAX_VALUE);
- }
-
- /**
- * If the entire time-window is not covered by the given values, then returns Integer.MIN_VALUE
- */
- private <T extends Number> T minInWindow(List<TimestampedValue<T>> vals, Duration timeWindow) {
- long now = System.currentTimeMillis();
- long epoch = now - timeWindow.toMilliseconds();
- T result = null;
- double resultAsDouble = Integer.MIN_VALUE;
- for (TimestampedValue<T> val : vals) {
- T valAsNum = val.getValue();
- double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0;
- if (result == null && val.getTimestamp() > epoch) {
- result = withDefault(null, Integer.MIN_VALUE);
- resultAsDouble = result.doubleValue();
- }
- if (result == null || (val.getValue() != null && valAsDouble < resultAsDouble)) {
- result = valAsNum;
- resultAsDouble = valAsDouble;
- }
- }
- return withDefault(result, Integer.MIN_VALUE);
- }
-
- @SuppressWarnings("unchecked")
- private <T> T withDefault(T result, Integer defaultValue) {
- return result!=null ? result : (T) defaultValue;
- }
- /**
- * @return null if empty, or the most recent value
- */
- private <T extends Number> T latestInWindow(List<TimestampedValue<T>> vals) {
- return vals.isEmpty() ? null : vals.get(vals.size()-1).getValue();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java b/policy/src/main/java/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java
deleted file mode 100644
index 922b33d..0000000
--- a/policy/src/main/java/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.followthesun;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.location.basic.AbstractLocation;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-
-public class DefaultFollowTheSunModel<ContainerType, ItemType> implements FollowTheSunModel<ContainerType, ItemType> {
-
- private static final Logger LOG = LoggerFactory.getLogger(DefaultFollowTheSunModel.class);
-
- // Concurrent maps cannot have null value; use this to represent when no container is supplied for an item
- private static final String NULL = "null-val";
- private static final Location NULL_LOCATION = new AbstractLocation(newHashMap("name","null-location")) {};
-
- private final String name;
- private final Set<ContainerType> containers = Collections.newSetFromMap(new ConcurrentHashMap<ContainerType,Boolean>());
- private final Map<ItemType, ContainerType> itemToContainer = new ConcurrentHashMap<ItemType, ContainerType>();
- private final Map<ContainerType, Location> containerToLocation = new ConcurrentHashMap<ContainerType, Location>();
- private final Map<ItemType, Location> itemToLocation = new ConcurrentHashMap<ItemType, Location>();
- private final Map<ItemType, Map<? extends ItemType, Double>> itemUsage = new ConcurrentHashMap<ItemType, Map<? extends ItemType,Double>>();
- private final Set<ItemType> immovableItems = Collections.newSetFromMap(new ConcurrentHashMap<ItemType, Boolean>());
-
- public DefaultFollowTheSunModel(String name) {
- this.name = name;
- }
-
- @Override
- public Set<ItemType> getItems() {
- return itemToContainer.keySet();
- }
-
- @Override
- public ContainerType getItemContainer(ItemType item) {
- ContainerType result = itemToContainer.get(item);
- return (isNull(result) ? null : result);
- }
-
- @Override
- public Location getItemLocation(ItemType item) {
- Location result = itemToLocation.get(item);
- return (isNull(result) ? null : result);
- }
-
- @Override
- public Location getContainerLocation(ContainerType container) {
- Location result = containerToLocation.get(container);
- return (isNull(result) ? null : result);
- }
-
- // Provider methods.
-
- @Override public String getName() {
- return name;
- }
-
- // TODO: delete?
- @Override public String getName(ItemType item) {
- return item.toString();
- }
-
- @Override public boolean isItemMoveable(ItemType item) {
- // If don't know about item, then assume not movable; otherwise has this item been explicitly flagged as immovable?
- return hasItem(item) && !immovableItems.contains(item);
- }
-
- @Override public boolean isItemAllowedIn(ItemType item, Location location) {
- return true; // TODO?
- }
-
- @Override public boolean hasActiveMigration(ItemType item) {
- return false; // TODO?
- }
-
- @Override
- // FIXME Too expensive to compute; store in a different data structure?
- public Map<ItemType, Map<Location, Double>> getDirectSendsToItemByLocation() {
- Map<ItemType, Map<Location, Double>> result = new LinkedHashMap<ItemType, Map<Location,Double>>(getNumItems());
-
- for (Map.Entry<ItemType, Map<? extends ItemType, Double>> entry : itemUsage.entrySet()) {
- ItemType targetItem = entry.getKey();
- Map<? extends ItemType, Double> sources = entry.getValue();
- if (sources.isEmpty()) continue; // no-one talking to us
-
- Map<Location, Double> targetUsageByLocation = new LinkedHashMap<Location, Double>();
- result.put(targetItem, targetUsageByLocation);
-
- for (Map.Entry<? extends ItemType, Double> entry2 : sources.entrySet()) {
- ItemType sourceItem = entry2.getKey();
- Location sourceLocation = getItemLocation(sourceItem);
- double usageVal = (entry.getValue() != null) ? entry2.getValue() : 0d;
- if (sourceLocation == null) continue; // don't know where to attribute this load; e.g. item may have just terminated
- if (sourceItem.equals(targetItem)) continue; // ignore msgs to self
-
- Double usageValTotal = targetUsageByLocation.get(sourceLocation);
- double newUsageValTotal = (usageValTotal != null ? usageValTotal : 0d) + usageVal;
- targetUsageByLocation.put(sourceLocation, newUsageValTotal);
- }
- }
-
- return result;
- }
-
- @Override
- public Set<ContainerType> getAvailableContainersFor(ItemType item, Location location) {
- checkNotNull(location);
- return getContainersInLocation(location);
- }
-
-
- // Mutators.
-
- @Override
- public void onItemMoved(ItemType item, ContainerType newContainer) {
- // idempotent, as may be called multiple times
- Location newLocation = (newContainer != null) ? containerToLocation.get(newContainer) : null;
- ContainerType newContainerNonNull = toNonNullContainer(newContainer);
- Location newLocationNonNull = toNonNullLocation(newLocation);
- ContainerType oldContainer = itemToContainer.put(item, newContainerNonNull);
- Location oldLocation = itemToLocation.put(item, newLocationNonNull);
- }
-
- @Override
- public void onContainerAdded(ContainerType container, Location location) {
- Location locationNonNull = toNonNullLocation(location);
- containers.add(container);
- containerToLocation.put(container, locationNonNull);
- for (ItemType item : getItemsOnContainer(container)) {
- itemToLocation.put(item, locationNonNull);
- }
- }
-
- @Override
- public void onContainerRemoved(ContainerType container) {
- containers.remove(container);
- containerToLocation.remove(container);
- }
-
- public void onContainerLocationUpdated(ContainerType container, Location location) {
- if (!containers.contains(container)) {
- // unknown container; probably just stopped?
- // If this overtook onContainerAdded, then assume we'll lookup the location and get it right in onContainerAdded
- if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of location for unknown container {}, to {}", container, location);
- return;
- }
- Location locationNonNull = toNonNullLocation(location);
- containerToLocation.put(container, locationNonNull);
- for (ItemType item : getItemsOnContainer(container)) {
- itemToLocation.put(item, locationNonNull);
- }
- }
-
- @Override
- public void onItemAdded(ItemType item, ContainerType container, boolean immovable) {
- // idempotent, as may be called multiple times
-
- if (immovable) {
- immovableItems.add(item);
- }
- Location location = (container != null) ? containerToLocation.get(container) : null;
- ContainerType containerNonNull = toNonNullContainer(container);
- Location locationNonNull = toNonNullLocation(location);
- ContainerType oldContainer = itemToContainer.put(item, containerNonNull);
- Location oldLocation = itemToLocation.put(item, locationNonNull);
- }
-
- @Override
- public void onItemRemoved(ItemType item) {
- itemToContainer.remove(item);
- itemToLocation.remove(item);
- itemUsage.remove(item);
- immovableItems.remove(item);
- }
-
- @Override
- public void onItemUsageUpdated(ItemType item, Map<? extends ItemType, Double> newValue) {
- if (hasItem(item)) {
- itemUsage.put(item, newValue);
- } else {
- // Can happen when item removed - get notification of removal and workrate from group and item
- // respectively, so can overtake each other
- if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of usage for unknown item {}, to {}", item, newValue);
- }
- }
-
-
- // Additional methods for tests.
-
- /**
- * Warning: this can be an expensive (time and memory) operation if there are a lot of items/containers.
- */
- @VisibleForTesting
- public String itemDistributionToString() {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- dumpItemDistribution(new PrintStream(baos));
- return new String(baos.toByteArray());
- }
-
- @VisibleForTesting
- public void dumpItemDistribution() {
- dumpItemDistribution(System.out);
- }
-
- @VisibleForTesting
- public void dumpItemDistribution(PrintStream out) {
- Map<ItemType, Map<Location, Double>> directSendsToItemByLocation = getDirectSendsToItemByLocation();
-
- out.println("Follow-The-Sun dump: ");
- for (Location location: getLocations()) {
- out.println("\t"+"Location "+location);
- for (ContainerType container : getContainersInLocation(location)) {
- out.println("\t\t"+"Container "+container);
- for (ItemType item : getItemsOnContainer(container)) {
- Map<Location, Double> inboundUsage = directSendsToItemByLocation.get(item);
- Map<? extends ItemType, Double> outboundUsage = itemUsage.get(item);
- double totalInboundByLocation = (inboundUsage != null) ? sum(inboundUsage.values()) : 0d;
- double totalInboundByActor = (outboundUsage != null) ? sum(outboundUsage.values()) : 0d;
- out.println("\t\t\t"+"Item "+item);
- out.println("\t\t\t\t"+"Inbound-by-location: "+totalInboundByLocation+": "+inboundUsage);
- out.println("\t\t\t\t"+"Inbound-by-actor: "+totalInboundByActor+": "+outboundUsage);
- }
- }
- }
- out.flush();
- }
-
- private boolean hasItem(ItemType item) {
- return itemToContainer.containsKey(item);
- }
-
- private Set<Location> getLocations() {
- return ImmutableSet.copyOf(containerToLocation.values());
- }
-
- private Set<ContainerType> getContainersInLocation(Location location) {
- Set<ContainerType> result = new LinkedHashSet<ContainerType>();
- for (Map.Entry<ContainerType, Location> entry : containerToLocation.entrySet()) {
- if (location.equals(entry.getValue())) {
- result.add(entry.getKey());
- }
- }
- return result;
- }
-
- private Set<ItemType> getItemsOnContainer(ContainerType container) {
- Set<ItemType> result = new LinkedHashSet<ItemType>();
- for (Map.Entry<ItemType, ContainerType> entry : itemToContainer.entrySet()) {
- if (container.equals(entry.getValue())) {
- result.add(entry.getKey());
- }
- }
- return result;
- }
-
- private int getNumItems() {
- return itemToContainer.size();
- }
-
- @SuppressWarnings("unchecked")
- private ContainerType nullContainer() {
- return (ContainerType) NULL; // relies on erasure
- }
-
- private Location nullLocation() {
- return NULL_LOCATION;
- }
-
- private ContainerType toNonNullContainer(ContainerType val) {
- return (val != null) ? val : nullContainer();
- }
-
- private Location toNonNullLocation(Location val) {
- return (val != null) ? val : nullLocation();
- }
-
- private boolean isNull(Object val) {
- return val == NULL || val == NULL_LOCATION;
- }
-
- // TODO Move to utils; or stop AbstractLocation from removing things from the map!
- public static <K,V> Map<K,V> newHashMap(K k, V v) {
- Map<K,V> result = Maps.newLinkedHashMap();
- result.put(k, v);
- return result;
- }
-
- public static double sum(Collection<? extends Number> values) {
- double total = 0;
- for (Number d : values) {
- total += d.doubleValue();
- }
- return total;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunModel.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunModel.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunModel.java
deleted file mode 100644
index a181ae4..0000000
--- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunModel.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.followthesun;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.api.location.Location;
-
-/**
- * Captures the state of items, containers and locations for the purpose of moving items around
- * to minimise latency. For consumption by a {@link FollowTheSunStrategy}.
- */
-public interface FollowTheSunModel<ContainerType, ItemType> {
-
- // Attributes of the pool.
- public String getName();
-
- // Attributes of containers and items.
- public String getName(ItemType item);
- public Set<ItemType> getItems();
- public Map<ItemType, Map<Location, Double>> getDirectSendsToItemByLocation();
- public Location getItemLocation(ItemType item);
- public ContainerType getItemContainer(ItemType item);
- public Location getContainerLocation(ContainerType container);
- public boolean hasActiveMigration(ItemType item);
- public Set<ContainerType> getAvailableContainersFor(ItemType item, Location location);
- public boolean isItemMoveable(ItemType item);
- public boolean isItemAllowedIn(ItemType item, Location location);
-
- // Mutators for keeping the model in-sync with the observed world
- public void onContainerAdded(ContainerType container, Location location);
- public void onContainerRemoved(ContainerType container);
- public void onContainerLocationUpdated(ContainerType container, Location location);
-
- public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable);
- public void onItemRemoved(ItemType item);
- public void onItemUsageUpdated(ItemType item, Map<? extends ItemType, Double> newValues);
- public void onItemMoved(ItemType item, ContainerType newContainer);
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunParameters.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunParameters.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunParameters.java
deleted file mode 100644
index 2b2d9af..0000000
--- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunParameters.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.followthesun;
-
-import java.util.LinkedHashSet;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.api.location.Location;
-
-public class FollowTheSunParameters {
-
- private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunParameters.class);
-
- private FollowTheSunParameters() {}
-
- /** trigger for moving segment X from geo A to geo B:
- * where x is total number of requests submitted in X across the CDM network,
- * and x_A is number of reqs from geo A, with A the most prolific geography
- * (arbitrarily chosen in case of ties so recommended to choose at least a small percent_majority or delta_above_percent_majority, in addition to this field);
- * this parameter T defines a number such that x_A > T*x in order for X to be migrated to A
- * (but see also DELTA_ABOVE_PERCENT_TOTAL, below) */
- public double triggerPercentTotal = 0.3;
- /** fields as above, and T as above,
- * this parameter T' defines a number such that x_A > T*x + T' in order for X to be migrated to A */
- public double triggerDeltaAbovePercentTotal = 0;
- /** fields as above,
- * this parameter T defines a number such that x_A > T in order for X to be migrated to A */
- public double triggerAbsoluteTotal = 2;
-
- /** fields as above, with X_B the number from a different geography B,
- * where A and B are the two most prolific requesters of X, and X_A >= X_B;
- * this parameter T defines a number such that x_A-x_B > T*x in order for X to be migrated to A */
- public double triggerPercentMajority = 0.2;
- /** as corresponding majority and total fields, with x_A-x_B on the LHS of inequality */
- public double triggerDeltaAbovePercentMajority = 1;
- /** as corresponding majority and total fields, with x_A-x_B on the LHS of inequality */
- public double triggerAbsoluteMajority = 4;
-
- /** a list of excluded locations */
- public Set<Location> excludedLocations = new LinkedHashSet<Location>();
-
- public static FollowTheSunParameters newDefault() {
- return new FollowTheSunParameters();
- }
-
- private static double parseDouble(String text, double defaultValue) {
- try {
- double d = Double.parseDouble(text);
- if (!Double.isNaN(d)) return d;
- } catch (Exception e) {
- LOG.warn("Illegal double value '"+text+"', using default "+defaultValue+": "+e, e);
- }
- return defaultValue;
- }
-
- private static String[] parseCommaSeparatedList(String csv) {
- if (csv==null || csv.trim().length()==0) return new String[0];
- return csv.split(",");
- }
-
- public boolean isTriggered(double highest, double total, double nextHighest, double current) {
- if (highest <= current) return false;
- if (highest < total*triggerPercentTotal + triggerDeltaAbovePercentTotal) return false;
- if (highest < triggerAbsoluteTotal) return false;
- //TODO more params about nextHighest vs current
- if (highest-current < total*triggerPercentMajority + triggerDeltaAbovePercentMajority) return false;
- if (highest-current < triggerAbsoluteMajority) return false;
- return true;
- }
-
- public String toString() {
- return "Inter-geography policy params: percentTotal="+triggerPercentTotal+"; deltaAbovePercentTotal="+triggerDeltaAbovePercentTotal+
- "; absoluteTotal="+triggerAbsoluteTotal+"; percentMajority="+triggerPercentMajority+
- "; deltaAbovePercentMajority="+triggerDeltaAbovePercentMajority+"; absoluteMajority="+triggerAbsoluteMajority;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPolicy.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPolicy.java
deleted file mode 100644
index b76682c..0000000
--- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPolicy.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.followthesun;
-
-import static brooklyn.util.JavaGroovyEquivalents.elvis;
-import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
-import static com.google.common.base.Preconditions.checkArgument;
-
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.basic.EntityLocal;
-import org.apache.brooklyn.api.event.AttributeSensor;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.api.event.SensorEventListener;
-import org.apache.brooklyn.api.location.Location;
-import org.apache.brooklyn.api.location.MachineProvisioningLocation;
-import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.Attributes;
-import brooklyn.policy.followthesun.FollowTheSunPool.ContainerItemPair;
-import brooklyn.policy.loadbalancing.Movable;
-import brooklyn.util.collections.MutableMap;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
- // removed from catalog because it cannot currently be configured via catalog mechanisms -
- // PolicySpec.create fails due to no no-arg constructor
- // TODO make model and parameters things which can be initialized from config then reinstate in catalog
-//@Catalog(name="Follow the Sun", description="Policy for moving \"work\" around to follow the demand; "
-// + "the work can be any \"Movable\" entity")
-public class FollowTheSunPolicy extends AbstractPolicy {
-
- private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunPolicy.class);
-
- public static final String NAME = "Follow the Sun (Inter-Geography Latency Optimization)";
-
- @SetFromFlag(defaultVal="100")
- private long minPeriodBetweenExecs;
-
- @SetFromFlag
- private Function<Entity, Location> locationFinder;
-
- private final AttributeSensor<Map<? extends Movable, Double>> itemUsageMetric;
- private final FollowTheSunModel<Entity, Movable> model;
- private final FollowTheSunStrategy<Entity, Movable> strategy;
- private final FollowTheSunParameters parameters;
-
- private FollowTheSunPool poolEntity;
-
- private volatile ScheduledExecutorService executor;
- private final AtomicBoolean executorQueued = new AtomicBoolean(false);
- private volatile long executorTime = 0;
- private boolean loggedConstraintsIgnored = false;
-
- private final Function<Entity, Location> defaultLocationFinder = new Function<Entity, Location>() {
- public Location apply(Entity e) {
- Collection<Location> locs = e.getLocations();
- if (locs.isEmpty()) return null;
- Location contender = Iterables.get(locs, 0);
- while (contender.getParent() != null && !(contender instanceof MachineProvisioningLocation)) {
- contender = contender.getParent();
- }
- return contender;
- }
- };
-
- private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() {
- @Override
- public void onEvent(SensorEvent<Object> event) {
- if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", FollowTheSunPolicy.this, event);
- Entity source = event.getSource();
- Object value = event.getValue();
- Sensor<?> sensor = event.getSensor();
-
- if (sensor.equals(itemUsageMetric)) {
- onItemMetricUpdated((Movable)source, (Map<? extends Movable, Double>) value, true);
- } else if (sensor.equals(Attributes.LOCATION_CHANGED)) {
- onContainerLocationUpdated(source, true);
- } else if (sensor.equals(FollowTheSunPool.CONTAINER_ADDED)) {
- onContainerAdded((Entity) value, true);
- } else if (sensor.equals(FollowTheSunPool.CONTAINER_REMOVED)) {
- onContainerRemoved((Entity) value, true);
- } else if (sensor.equals(FollowTheSunPool.ITEM_ADDED)) {
- onItemAdded((Movable) value, true);
- } else if (sensor.equals(FollowTheSunPool.ITEM_REMOVED)) {
- onItemRemoved((Movable) value, true);
- } else if (sensor.equals(FollowTheSunPool.ITEM_MOVED)) {
- ContainerItemPair pair = (ContainerItemPair) value;
- onItemMoved((Movable)pair.item, pair.container, true);
- }
- }
- };
-
- // FIXME parameters: use a more groovy way of doing it, that's consistent with other policies/entities?
- public FollowTheSunPolicy(AttributeSensor itemUsageMetric,
- FollowTheSunModel<Entity, Movable> model, FollowTheSunParameters parameters) {
- this(MutableMap.of(), itemUsageMetric, model, parameters);
- }
-
- public FollowTheSunPolicy(Map props, AttributeSensor itemUsageMetric,
- FollowTheSunModel<Entity, Movable> model, FollowTheSunParameters parameters) {
- super(props);
- this.itemUsageMetric = itemUsageMetric;
- this.model = model;
- this.parameters = parameters;
- this.strategy = new FollowTheSunStrategy<Entity, Movable>(model, parameters); // TODO: extract interface, inject impl
- this.locationFinder = elvis(locationFinder, defaultLocationFinder);
-
- // TODO Should re-use the execution manager's thread pool, somehow
- executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
- }
-
- @Override
- public void setEntity(EntityLocal entity) {
- checkArgument(entity instanceof FollowTheSunPool, "Provided entity must be a FollowTheSunPool");
- super.setEntity(entity);
- this.poolEntity = (FollowTheSunPool) entity;
-
- // Detect when containers are added to or removed from the pool.
- subscribe(poolEntity, FollowTheSunPool.CONTAINER_ADDED, eventHandler);
- subscribe(poolEntity, FollowTheSunPool.CONTAINER_REMOVED, eventHandler);
- subscribe(poolEntity, FollowTheSunPool.ITEM_ADDED, eventHandler);
- subscribe(poolEntity, FollowTheSunPool.ITEM_REMOVED, eventHandler);
- subscribe(poolEntity, FollowTheSunPool.ITEM_MOVED, eventHandler);
-
- // Take heed of any extant containers.
- for (Entity container : poolEntity.getContainerGroup().getMembers()) {
- onContainerAdded(container, false);
- }
- for (Entity item : poolEntity.getItemGroup().getMembers()) {
- onItemAdded((Movable)item, false);
- }
-
- scheduleLatencyReductionJig();
- }
-
- @Override
- public void suspend() {
- // TODO unsubscribe from everything? And resubscribe on resume?
- super.suspend();
- if (executor != null) executor.shutdownNow();
- executorQueued.set(false);
- }
-
- @Override
- public void resume() {
- super.resume();
- executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
- executorTime = 0;
- executorQueued.set(false);
- }
-
- private ThreadFactory newThreadFactory() {
- return new ThreadFactoryBuilder()
- .setNameFormat("brooklyn-followthesunpolicy-%d")
- .build();
- }
-
- private void scheduleLatencyReductionJig() {
- if (isRunning() && executorQueued.compareAndSet(false, true)) {
- long now = System.currentTimeMillis();
- long delay = Math.max(0, (executorTime + minPeriodBetweenExecs) - now);
-
- executor.schedule(new Runnable() {
- public void run() {
- try {
- executorTime = System.currentTimeMillis();
- executorQueued.set(false);
-
- if (LOG.isTraceEnabled()) LOG.trace("{} executing follow-the-sun migration-strategy", this);
- strategy.rebalance();
-
- } catch (RuntimeException e) {
- if (isRunning()) {
- LOG.error("Error during latency-reduction-jig", e);
- } else {
- LOG.debug("Error during latency-reduction-jig, but no longer running", e);
- }
- }
- }},
- delay,
- TimeUnit.MILLISECONDS);
- }
- }
-
- private void onContainerAdded(Entity container, boolean rebalanceNow) {
- subscribe(container, Attributes.LOCATION_CHANGED, eventHandler);
- Location location = locationFinder.apply(container);
-
- if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of container {} in location {}", new Object[] {this, container, location});
- model.onContainerAdded(container, location);
-
- if (rebalanceNow) scheduleLatencyReductionJig();
- }
-
- private void onContainerRemoved(Entity container, boolean rebalanceNow) {
- if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of container {}", this, container);
- model.onContainerRemoved(container);
- if (rebalanceNow) scheduleLatencyReductionJig();
- }
-
- private void onItemAdded(Movable item, boolean rebalanceNow) {
- Entity parentContainer = (Entity) item.getAttribute(Movable.CONTAINER);
-
- if (LOG.isTraceEnabled()) LOG.trace("{} recording addition of item {} in container {}", new Object[] {this, item, parentContainer});
-
- subscribe(item, itemUsageMetric, eventHandler);
-
- // Update the model, including the current metric value (if any).
- Map<? extends Movable, Double> currentValue = item.getAttribute(itemUsageMetric);
- boolean immovable = (Boolean)elvis(item.getConfig(Movable.IMMOVABLE), false);
- model.onItemAdded(item, parentContainer, immovable);
-
- if (currentValue != null) {
- model.onItemUsageUpdated(item, currentValue);
- }
-
- if (rebalanceNow) scheduleLatencyReductionJig();
- }
-
- private void onItemRemoved(Movable item, boolean rebalanceNow) {
- if (LOG.isTraceEnabled()) LOG.trace("{} recording removal of item {}", this, item);
- unsubscribe(item);
- model.onItemRemoved(item);
- if (rebalanceNow) scheduleLatencyReductionJig();
- }
-
- private void onItemMoved(Movable item, Entity parentContainer, boolean rebalanceNow) {
- if (LOG.isTraceEnabled()) LOG.trace("{} recording moving of item {} to {}", new Object[] {this, item, parentContainer});
- model.onItemMoved(item, parentContainer);
- if (rebalanceNow) scheduleLatencyReductionJig();
- }
-
- private void onContainerLocationUpdated(Entity container, boolean rebalanceNow) {
- Location location = locationFinder.apply(container);
- if (LOG.isTraceEnabled()) LOG.trace("{} recording location for container {}, new value {}", new Object[] {this, container, location});
- model.onContainerLocationUpdated(container, location);
- if (rebalanceNow) scheduleLatencyReductionJig();
- }
-
- private void onItemMetricUpdated(Movable item, Map<? extends Movable, Double> newValues, boolean rebalanceNow) {
- if (LOG.isTraceEnabled()) LOG.trace("{} recording usage update for item {}, new value {}", new Object[] {this, item, newValues});
- model.onItemUsageUpdated(item, newValues);
- if (rebalanceNow) scheduleLatencyReductionJig();
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + (groovyTruth(name) ? "("+name+")" : "");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPool.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPool.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPool.java
deleted file mode 100644
index 402eeef..0000000
--- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPool.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.followthesun;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Serializable;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
-
-import brooklyn.entity.trait.Resizable;
-import brooklyn.event.basic.BasicNotificationSensor;
-
-@ImplementedBy(FollowTheSunPoolImpl.class)
-public interface FollowTheSunPool extends Entity, Resizable {
-
- // FIXME Remove duplication from BalanceableWorkerPool?
-
- // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing
- // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`.
-
- /** Encapsulates an item and a container; emitted by sensors.
- */
- public static class ContainerItemPair implements Serializable {
- private static final long serialVersionUID = 1L;
- public final Entity container;
- public final Entity item;
-
- public ContainerItemPair(Entity container, Entity item) {
- this.container = container;
- this.item = checkNotNull(item);
- }
-
- @Override
- public String toString() {
- return ""+item+" @ "+container;
- }
- }
-
- // Pool constituent notifications.
- public static BasicNotificationSensor<Entity> CONTAINER_ADDED = new BasicNotificationSensor<Entity>(
- Entity.class, "followthesun.container.added", "Container added");
- public static BasicNotificationSensor<Entity> CONTAINER_REMOVED = new BasicNotificationSensor<Entity>(
- Entity.class, "followthesun.container.removed", "Container removed");
- public static BasicNotificationSensor<Entity> ITEM_ADDED = new BasicNotificationSensor<Entity>(
- Entity.class, "followthesun.item.added", "Item added");
- public static BasicNotificationSensor<Entity> ITEM_REMOVED = new BasicNotificationSensor<Entity>(
- Entity.class, "followthesun.item.removed", "Item removed");
- public static BasicNotificationSensor<ContainerItemPair> ITEM_MOVED = new BasicNotificationSensor<ContainerItemPair>(
- ContainerItemPair.class, "followthesun.item.moved", "Item moved to the given container");
-
- public void setContents(Group containerGroup, Group itemGroup);
-
- public Group getContainerGroup();
-
- public Group getItemGroup();
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java
deleted file mode 100644
index 4d74441..0000000
--- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunPoolImpl.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.followthesun;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.api.event.SensorEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.AbstractEntity;
-import brooklyn.entity.basic.AbstractGroup;
-import brooklyn.entity.trait.Resizable;
-import brooklyn.entity.trait.Startable;
-import brooklyn.policy.loadbalancing.Movable;
-
-public class FollowTheSunPoolImpl extends AbstractEntity implements FollowTheSunPool {
-
- // FIXME Remove duplication from BalanceableWorkerPool?
-
- // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing
- // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`.
-
- private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunPool.class);
-
- private Group containerGroup;
- private Group itemGroup;
-
- private final Set<Entity> containers = Collections.synchronizedSet(new HashSet<Entity>());
- private final Set<Entity> items = Collections.synchronizedSet(new HashSet<Entity>());
-
- private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() {
- @Override
- public void onEvent(SensorEvent<Object> event) {
- if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", FollowTheSunPoolImpl.this, event);
- Entity source = event.getSource();
- Object value = event.getValue();
- Sensor sensor = event.getSensor();
-
- if (sensor.equals(AbstractGroup.MEMBER_ADDED)) {
- if (source.equals(containerGroup)) {
- onContainerAdded((Entity) value);
- } else if (source.equals(itemGroup)) {
- onItemAdded((Entity)value);
- } else {
- throw new IllegalStateException("unexpected event source="+source);
- }
- } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) {
- if (source.equals(containerGroup)) {
- onContainerRemoved((Entity) value);
- } else if (source.equals(itemGroup)) {
- onItemRemoved((Entity) value);
- } else {
- throw new IllegalStateException("unexpected event source="+source);
- }
- } else if (sensor.equals(Startable.SERVICE_UP)) {
- // TODO What if start has failed? Is there a sensor to indicate that?
- if ((Boolean)value) {
- onContainerUp(source);
- } else {
- onContainerDown(source);
- }
- } else if (sensor.equals(Movable.CONTAINER)) {
- onItemMoved(source, (Entity) value);
- } else {
- throw new IllegalStateException("Unhandled event type "+sensor+": "+event);
- }
- }
- };
-
- public FollowTheSunPoolImpl() {
- }
-
- @Override
- public void setContents(Group containerGroup, Group itemGroup) {
- this.containerGroup = containerGroup;
- this.itemGroup = itemGroup;
- subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
- subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
- subscribe(itemGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
- subscribe(itemGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
-
- // Process extant containers and items
- for (Entity existingContainer : containerGroup.getMembers()) {
- onContainerAdded(existingContainer);
- }
- for (Entity existingItem : itemGroup.getMembers()) {
- onItemAdded((Entity)existingItem);
- }
- }
-
- @Override
- public Group getContainerGroup() {
- return containerGroup;
- }
-
- @Override
- public Group getItemGroup() {
- return itemGroup;
- }
-
- @Override
- public Integer getCurrentSize() {
- return containerGroup.getCurrentSize();
- }
-
- @Override
- public Integer resize(Integer desiredSize) {
- if (containerGroup instanceof Resizable) return ((Resizable) containerGroup).resize(desiredSize);
-
- throw new UnsupportedOperationException("Container group is not resizable");
- }
-
-
- private void onContainerAdded(Entity newContainer) {
- subscribe(newContainer, Startable.SERVICE_UP, eventHandler);
- if (!(newContainer instanceof Startable) || Boolean.TRUE.equals(newContainer.getAttribute(Startable.SERVICE_UP))) {
- onContainerUp(newContainer);
- }
- }
-
- private void onContainerUp(Entity newContainer) {
- if (containers.add(newContainer)) {
- emit(CONTAINER_ADDED, newContainer);
- }
- }
-
- private void onContainerDown(Entity oldContainer) {
- if (containers.remove(oldContainer)) {
- emit(CONTAINER_REMOVED, oldContainer);
- }
- }
-
- private void onContainerRemoved(Entity oldContainer) {
- unsubscribe(oldContainer);
- onContainerDown(oldContainer);
- }
-
- private void onItemAdded(Entity item) {
- if (items.add(item)) {
- subscribe(item, Movable.CONTAINER, eventHandler);
- emit(ITEM_ADDED, item);
- }
- }
-
- private void onItemRemoved(Entity item) {
- if (items.remove(item)) {
- unsubscribe(item);
- emit(ITEM_REMOVED, item);
- }
- }
-
- private void onItemMoved(Entity item, Entity container) {
- emit(ITEM_MOVED, new ContainerItemPair(container, item));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunStrategy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunStrategy.java b/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunStrategy.java
deleted file mode 100644
index e8ca636..0000000
--- a/policy/src/main/java/brooklyn/policy/followthesun/FollowTheSunStrategy.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.followthesun;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.location.Location;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.policy.loadbalancing.Movable;
-
-import com.google.common.collect.Iterables;
-
-// TODO: extract interface
-public class FollowTheSunStrategy<ContainerType extends Entity, ItemType extends Movable> {
-
- // This is a modified version of the InterGeographyLatencyPolicy (aka Follow-The-Sun) policy from Monterey v3.
-
- // TODO location constraints
-
- private static final Logger LOG = LoggerFactory.getLogger(FollowTheSunStrategy.class);
-
- private final FollowTheSunParameters parameters;
- private final FollowTheSunModel<ContainerType,ItemType> model;
- private final String name;
-
- public FollowTheSunStrategy(FollowTheSunModel<ContainerType,ItemType> model, FollowTheSunParameters parameters) {
- this.model = model;
- this.parameters = parameters;
- this.name = model.getName();
- }
-
- public void rebalance() {
- try {
- Set<ItemType> items = model.getItems();
- Map<ItemType, Map<Location, Double>> directSendsToItemByLocation = model.getDirectSendsToItemByLocation();
-
- for (ItemType item : items) {
- String itemName = model.getName(item);
- Location activeLocation = model.getItemLocation(item);
- ContainerType activeContainer = model.getItemContainer(item);
- Map<Location, Double> sendsByLocation = directSendsToItemByLocation.get(item);
- if (sendsByLocation == null) sendsByLocation = Collections.emptyMap();
-
- if (parameters.excludedLocations.contains(activeLocation)) {
- if (LOG.isTraceEnabled()) LOG.trace("Ignoring segment {} as it is in {}", itemName, activeLocation);
- continue;
- }
- if (!model.isItemMoveable(item)) {
- if (LOG.isDebugEnabled()) LOG.debug("POLICY {} skipping any migration of {}, it is not moveable", name, itemName);
- continue;
- }
- if (model.hasActiveMigration(item)) {
- LOG.info("POLICY {} skipping any migration of {}, it is involved in an active migration already", name, itemName);
- continue;
- }
-
- double total = DefaultFollowTheSunModel.sum(sendsByLocation.values());
-
- if (LOG.isTraceEnabled()) LOG.trace("POLICY {} detected {} msgs/sec in {}, split up as: {}", new Object[] {name, total, itemName, sendsByLocation});
-
- Double current = sendsByLocation.get(activeLocation);
- if (current == null) current=0d;
- List<WeightedObject<Location>> locationsWtd = new ArrayList<WeightedObject<Location>>();
- if (total > 0) {
- for (Map.Entry<Location, Double> entry : sendsByLocation.entrySet()) {
- Location l = entry.getKey();
- Double d = entry.getValue();
- if (d > current) locationsWtd.add(new WeightedObject<Location>(l, d));
- }
- }
- Collections.sort(locationsWtd);
- Collections.reverse(locationsWtd);
-
- double highestMsgRate = -1;
- Location highestLocation = null;
- ContainerType optimalContainerInHighest = null;
- while (!locationsWtd.isEmpty()) {
- WeightedObject<Location> weightedObject = locationsWtd.remove(0);
- highestMsgRate = weightedObject.getWeight();
- highestLocation = weightedObject.getObject();
- optimalContainerInHighest = findOptimal(model.getAvailableContainersFor(item, highestLocation));
- if (optimalContainerInHighest != null) {
- break;
- }
- }
- if (optimalContainerInHighest == null) {
- if (LOG.isDebugEnabled()) LOG.debug("POLICY {} detected {} is already in optimal permitted location ({} of {} msgs/sec)", new Object[] {name, itemName, highestMsgRate, total});
- continue;
- }
-
- double nextHighestMsgRate = -1;
- ContainerType optimalContainerInNextHighest = null;
- while (!locationsWtd.isEmpty()) {
- WeightedObject<Location> weightedObject = locationsWtd.remove(0);
- nextHighestMsgRate = weightedObject.getWeight();
- Location nextHighestLocation = weightedObject.getObject();
- optimalContainerInNextHighest = findOptimal(model.getAvailableContainersFor(item, nextHighestLocation));
- if (optimalContainerInNextHighest != null) {
- break;
- }
- }
- if (optimalContainerInNextHighest == null) {
- nextHighestMsgRate = current;
- }
-
- if (parameters.isTriggered(highestMsgRate, total, nextHighestMsgRate, current)) {
- LOG.info("POLICY "+name+" detected "+itemName+" should be in location "+highestLocation+" on "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec), migrating");
- try {
- if (activeContainer.equals(optimalContainerInHighest)) {
- //shouldn't happen
- LOG.warn("POLICY "+name+" detected "+itemName+" should move to "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec) but it is already there with "+current+" msgs/sec");
- } else {
- item.move(optimalContainerInHighest);
- model.onItemMoved(item, optimalContainerInHighest);
- }
- } catch (Exception e) {
- LOG.warn("POLICY "+name+" detected "+itemName+" should be on "+optimalContainerInHighest+", but can't move it: "+e, e);
- }
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("POLICY "+name+" detected "+itemName+" need not move to "+optimalContainerInHighest+" ("+highestMsgRate+" of "+total+" msgs/sec not much better than "+current+" at "+activeContainer+")");
- }
- }
- } catch (Exception e) {
- LOG.warn("Error in policy "+name+" (ignoring): "+e, e);
- }
- }
-
- private ContainerType findOptimal(Collection<ContainerType> contenders) {
- /*
- * TODO should choose the least loaded mediator. Currently chooses first available, and relies
- * on a load-balancer to move it again; would be good if these could share decision code so move
- * it to the right place immediately. e.g.
- * policyUtil.findLeastLoadedMediator(nodesInLocation);
- */
- return (contenders.isEmpty() ? null : Iterables.get(contenders, 0));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/followthesun/WeightedObject.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/followthesun/WeightedObject.java b/policy/src/main/java/brooklyn/policy/followthesun/WeightedObject.java
deleted file mode 100644
index b1d506d..0000000
--- a/policy/src/main/java/brooklyn/policy/followthesun/WeightedObject.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.followthesun;
-
-public class WeightedObject<T> implements Comparable<WeightedObject<T>>{
-
- final T object;
- final double weight;
-
- public WeightedObject(T obj, double weight) {
- this.object = obj;
- this.weight = weight;
- }
-
- public T getObject() {
- return object;
- }
-
- public double getWeight() {
- return weight;
- }
-
- /**
- * Note that equals and compareTo are not consistent: x.compareTo(y)==0 iff x.equals(y) is
- * highly recommended in Java, but is not required. This can make TreeSet etc behave poorly...
- */
- public int compareTo(WeightedObject<T> o) {
- double diff = o.getWeight() - weight;
- if (diff>0.0000000000000001) return -1;
- if (diff<-0.0000000000000001) return 1;
- return 0;
- }
-
- @Override
- /** true irrespective of weight */
- public boolean equals(Object obj) {
- if (!(obj instanceof WeightedObject<?>)) return false;
- if (getObject()==null) {
- return ((WeightedObject<?>)obj).getObject() == null;
- } else {
- return getObject().equals( ((WeightedObject<?>)obj).getObject() );
- }
- }
-
- @Override
- public int hashCode() {
- if (getObject()==null) return 234519078;
- return getObject().hashCode();
- }
-
- @Override
- public String toString() {
- return ""+getObject()+"["+getWeight()+"]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
deleted file mode 100644
index 9d8c58f..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/AbstractFailureDetector.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import static brooklyn.util.time.Time.makeTimeStringRounded;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.brooklyn.api.entity.basic.EntityLocal;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.management.Task;
-import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-import org.apache.brooklyn.core.util.task.BasicTask;
-import org.apache.brooklyn.core.util.task.ScheduledTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.BrooklynTaskTags;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.reflect.TypeToken;
-
-public abstract class AbstractFailureDetector extends AbstractPolicy {
-
- // TODO Remove duplication from ServiceFailureDetector, particularly for the stabilisation delays.
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractFailureDetector.class);
-
- private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100;
-
- public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newDurationConfigKey(
- "failureDetector.pollPeriod", "", Duration.ONE_SECOND);
-
- @SetFromFlag("failedStabilizationDelay")
- public static final ConfigKey<Duration> FAILED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey(
- "failureDetector.serviceFailedStabilizationDelay",
- "Time period for which the health check consistently fails "
- + "(e.g. doesn't report failed-ok-faled) before concluding failure.",
- Duration.ZERO);
-
- @SetFromFlag("recoveredStabilizationDelay")
- public static final ConfigKey<Duration> RECOVERED_STABILIZATION_DELAY = ConfigKeys.newDurationConfigKey(
- "failureDetector.serviceRecoveredStabilizationDelay",
- "Time period for which the health check succeeds continiually " +
- "(e.g. doesn't report ok-failed-ok) before concluding recovered",
- Duration.ZERO);
-
- @SuppressWarnings("serial")
- public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_FAILED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {},
- "failureDetector.sensor.fail", "A sensor which will indicate failure when set", HASensors.ENTITY_FAILED);
-
- @SuppressWarnings("serial")
- public static final ConfigKey<Sensor<FailureDescriptor>> SENSOR_RECOVERED = ConfigKeys.newConfigKey(new TypeToken<Sensor<FailureDescriptor>>() {},
- "failureDetector.sensor.recover", "A sensor which will indicate recovery from failure when set", HASensors.ENTITY_RECOVERED);
-
- public interface CalculatedStatus {
- boolean isHealthy();
- String getDescription();
- }
-
- private final class PublishJob implements Runnable {
- @Override public void run() {
- try {
- executorTime = System.currentTimeMillis();
- executorQueued.set(false);
-
- publishNow();
-
- } catch (Exception e) {
- if (isRunning()) {
- LOG.error("Problem resizing: "+e, e);
- } else {
- if (LOG.isDebugEnabled()) LOG.debug("Problem resizing, but no longer running: "+e, e);
- }
- } catch (Throwable t) {
- LOG.error("Problem in service-failure-detector: "+t, t);
- throw Exceptions.propagate(t);
- }
- }
- }
-
- private final class HealthPoller implements Runnable {
- @Override
- public void run() {
- checkHealth();
- }
- }
-
- private final class HealthPollingTaskFactory implements Callable<Task<?>> {
- @Override
- public Task<?> call() {
- BasicTask<Void> task = new BasicTask<Void>(new HealthPoller());
- BrooklynTaskTags.setTransient(task);
- return task;
- }
- }
-
- protected static class BasicCalculatedStatus implements CalculatedStatus {
- private boolean healthy;
- private String description;
-
- public BasicCalculatedStatus(boolean healthy, String description) {
- this.healthy = healthy;
- this.description = description;
- }
-
- @Override
- public boolean isHealthy() {
- return healthy;
- }
-
- @Override
- public String getDescription() {
- return description;
- }
- }
-
- public enum LastPublished {
- NONE,
- FAILED,
- RECOVERED;
- }
-
- protected final AtomicReference<Long> stateLastGood = new AtomicReference<Long>();
- protected final AtomicReference<Long> stateLastFail = new AtomicReference<Long>();
-
- protected Long currentFailureStartTime = null;
- protected Long currentRecoveryStartTime = null;
-
- protected LastPublished lastPublished = LastPublished.NONE;
-
- private final AtomicBoolean executorQueued = new AtomicBoolean(false);
- private volatile long executorTime = 0;
-
- private Callable<Task<?>> pollingTaskFactory = new HealthPollingTaskFactory();
-
- private Task<?> scheduledTask;
-
- protected abstract CalculatedStatus calculateStatus();
-
- @Override
- public void setEntity(EntityLocal entity) {
- super.setEntity(entity);
-
- if (isRunning()) {
- doStartPolling();
- }
- }
-
- @Override
- public void suspend() {
- scheduledTask.cancel(true);
- super.suspend();
- }
-
- @Override
- public void resume() {
- currentFailureStartTime = null;
- currentRecoveryStartTime = null;
- lastPublished = LastPublished.NONE;
- executorQueued.set(false);
- executorTime = 0;
-
- super.resume();
- doStartPolling();
- }
-
- @SuppressWarnings("unchecked")
- protected void doStartPolling() {
- if (scheduledTask == null || scheduledTask.isDone()) {
- ScheduledTask task = new ScheduledTask(MutableMap.of("period", getPollPeriod(), "displayName", getTaskName()), pollingTaskFactory);
- scheduledTask = ((EntityInternal)entity).getExecutionContext().submit(task);
- }
- }
-
- private String getTaskName() {
- return getDisplayName();
- }
-
- protected Duration getPollPeriod() {
- return getConfig(POLL_PERIOD);
- }
-
- protected Duration getFailedStabilizationDelay() {
- return getConfig(FAILED_STABILIZATION_DELAY);
- }
-
- protected Duration getRecoveredStabilizationDelay() {
- return getConfig(RECOVERED_STABILIZATION_DELAY);
- }
-
- protected Sensor<FailureDescriptor> getSensorFailed() {
- return getConfig(SENSOR_FAILED);
- }
-
- protected Sensor<FailureDescriptor> getSensorRecovered() {
- return getConfig(SENSOR_RECOVERED);
- }
-
- private synchronized void checkHealth() {
- CalculatedStatus status = calculateStatus();
- boolean healthy = status.isHealthy();
- long now = System.currentTimeMillis();
-
- if (healthy) {
- stateLastGood.set(now);
- if (lastPublished == LastPublished.FAILED) {
- if (currentRecoveryStartTime == null) {
- LOG.info("{} check for {}, now recovering: {}", new Object[] {this, entity, getDescription(status)});
- currentRecoveryStartTime = now;
- schedulePublish();
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing recovering: {}", new Object[] {this, entity, getDescription(status)});
- }
- } else {
- if (currentFailureStartTime != null) {
- LOG.info("{} check for {}, now healthy: {}", new Object[] {this, entity, getDescription(status)});
- currentFailureStartTime = null;
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still healthy: {}", new Object[] {this, entity, getDescription(status)});
- }
- }
- } else {
- stateLastFail.set(now);
- if (lastPublished != LastPublished.FAILED) {
- if (currentFailureStartTime == null) {
- LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)});
- currentFailureStartTime = now;
- schedulePublish();
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, continuing failing: {}", new Object[] {this, entity, getDescription(status)});
- }
- } else {
- if (currentRecoveryStartTime != null) {
- LOG.info("{} check for {}, now failing: {}", new Object[] {this, entity, getDescription(status)});
- currentRecoveryStartTime = null;
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} check for {}, still failed: {}", new Object[] {this, entity, getDescription(status)});
- }
- }
- }
- }
-
- protected void schedulePublish() {
- schedulePublish(0);
- }
-
- @SuppressWarnings("unchecked")
- protected void schedulePublish(long delay) {
- if (isRunning() && executorQueued.compareAndSet(false, true)) {
- long now = System.currentTimeMillis();
- delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
- if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay);
-
- Runnable job = new PublishJob();
-
- ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask<Void>(job));
- ((EntityInternal)entity).getExecutionContext().submit(task);
- }
- }
-
- private synchronized void publishNow() {
- if (!isRunning()) return;
-
- CalculatedStatus calculatedStatus = calculateStatus();
- boolean healthy = calculatedStatus.isHealthy();
-
- Long lastUpTime = stateLastGood.get();
- Long lastDownTime = stateLastFail.get();
- long serviceFailedStabilizationDelay = getFailedStabilizationDelay().toMilliseconds();
- long serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay().toMilliseconds();
- long now = System.currentTimeMillis();
-
- if (healthy) {
- if (lastPublished == LastPublished.FAILED) {
- // only publish if consistently up for serviceRecoveredStabilizationDelay
- long currentRecoveryPeriod = getTimeDiff(now, currentRecoveryStartTime);
- long sinceLastDownPeriod = getTimeDiff(now, lastDownTime);
- if (currentRecoveryPeriod > serviceRecoveredStabilizationDelay && sinceLastDownPeriod > serviceRecoveredStabilizationDelay) {
- String description = getDescription(calculatedStatus);
- LOG.warn("{} check for {}, publishing recovered: {}", new Object[] {this, entity, description});
- entity.emit(getSensorRecovered(), new HASensors.FailureDescriptor(entity, description));
- lastPublished = LastPublished.RECOVERED;
- currentFailureStartTime = null;
- } else {
- long nextAttemptTime = Math.max(serviceRecoveredStabilizationDelay - currentRecoveryPeriod, serviceRecoveredStabilizationDelay - sinceLastDownPeriod);
- schedulePublish(nextAttemptTime);
- }
- }
- } else {
- if (lastPublished != LastPublished.FAILED) {
- // only publish if consistently down for serviceFailedStabilizationDelay
- long currentFailurePeriod = getTimeDiff(now, currentFailureStartTime);
- long sinceLastUpPeriod = getTimeDiff(now, lastUpTime);
- if (currentFailurePeriod > serviceFailedStabilizationDelay && sinceLastUpPeriod > serviceFailedStabilizationDelay) {
- String description = getDescription(calculatedStatus);
- LOG.warn("{} connectivity-check for {}, publishing failed: {}", new Object[] {this, entity, description});
- entity.emit(getSensorFailed(), new HASensors.FailureDescriptor(entity, description));
- lastPublished = LastPublished.FAILED;
- currentRecoveryStartTime = null;
- } else {
- long nextAttemptTime = Math.max(serviceFailedStabilizationDelay - currentFailurePeriod, serviceFailedStabilizationDelay - sinceLastUpPeriod);
- schedulePublish(nextAttemptTime);
- }
- }
- }
- }
-
- protected String getDescription(CalculatedStatus status) {
- Long lastUpTime = stateLastGood.get();
- Long lastDownTime = stateLastGood.get();
- Duration serviceFailedStabilizationDelay = getFailedStabilizationDelay();
- Duration serviceRecoveredStabilizationDelay = getRecoveredStabilizationDelay();
-
- return String.format("%s; healthy=%s; timeNow=%s; lastUp=%s; lastDown=%s; lastPublished=%s; "+
- "currentFailurePeriod=%s; currentRecoveryPeriod=%s",
- status.getDescription(),
- status.isHealthy(),
- Time.makeDateString(System.currentTimeMillis()),
- (lastUpTime != null ? Time.makeDateString(lastUpTime) : "<never>"),
- (lastDownTime != null ? Time.makeDateString(lastDownTime) : "<never>"),
- lastPublished,
- (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceFailedStabilizationDelay) + ")",
- (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")");
- }
-
- private long getTimeDiff(Long recent, Long previous) {
- return (previous == null) ? recent : (recent - previous);
- }
-
- private String getTimeStringSince(Long time) {
- return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java b/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
deleted file mode 100644
index b347949..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/ConditionalSuspendPolicy.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import org.apache.brooklyn.api.entity.basic.EntityLocal;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.api.event.SensorEventListener;
-import org.apache.brooklyn.api.policy.Policy;
-import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.util.javalang.JavaClassNames;
-
-import com.google.common.base.Preconditions;
-
-public class ConditionalSuspendPolicy extends AbstractPolicy {
- private static final Logger LOG = LoggerFactory.getLogger(ConditionalSuspendPolicy.class);
-
- @SetFromFlag("suppressSensor")
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static final ConfigKey<Sensor<?>> SUSPEND_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class,
- "suppressSensor", "Sensor which will suppress the target policy", HASensors.CONNECTION_FAILED);
-
- @SetFromFlag("resetSensor")
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static final ConfigKey<Sensor<?>> RESUME_SENSOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class,
- "resetSensor", "Resume target policy when this sensor is observed", HASensors.CONNECTION_RECOVERED);
-
- @SetFromFlag("target")
- public static final ConfigKey<Object> SUSPEND_TARGET = ConfigKeys.newConfigKey(Object.class,
- "target", "The target policy to suspend. Either direct reference or the value of the suspendTarget config on a policy from the same entity.");
-
- @Override
- public void setEntity(EntityLocal entity) {
- super.setEntity(entity);
- Object target = config().get(SUSPEND_TARGET);
- Preconditions.checkNotNull(target, "Suspend target required");
- Preconditions.checkNotNull(getTargetPolicy(), "Can't find target policy set in " + SUSPEND_TARGET.getName() + ": " + target);
- subscribe();
- uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+getConfig(SUSPEND_SENSOR).getName()+":"+getConfig(RESUME_SENSOR).getName();
- }
-
- private void subscribe() {
- subscribe(entity, getConfig(SUSPEND_SENSOR), new SensorEventListener<Object>() {
- @Override public void onEvent(final SensorEvent<Object> event) {
- if (isRunning()) {
- Policy target = getTargetPolicy();
- target.suspend();
- LOG.debug("Suspended policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue());
- }
- }
-
- });
- subscribe(entity, getConfig(RESUME_SENSOR), new SensorEventListener<Object>() {
- @Override public void onEvent(final SensorEvent<Object> event) {
- if (isRunning()) {
- Policy target = getTargetPolicy();
- target.resume();
- LOG.debug("Resumed policy " + target + ", triggered by " + event.getSensor() + " = " + event.getValue());
- }
- }
- });
- }
-
- private Policy getTargetPolicy() {
- Object target = config().get(SUSPEND_TARGET);
- if (target instanceof Policy) {
- return (Policy)target;
- } else if (target instanceof String) {
- for (Policy policy : entity.getPolicies()) {
- // No way to set config values for keys NOT declared in the policy,
- // so must use displayName as a generally available config value.
- if (target.equals(policy.getDisplayName()) || target.equals(policy.getClass().getName())) {
- return policy;
- }
- }
- } else {
- throw new IllegalStateException("Unexpected type " + target.getClass() + " for target " + target);
- }
- return null;
- }
-}