You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:06:14 UTC
[16/24] incubator-brooklyn git commit: [BROOKLYN-162] Renaming
package policy
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
new file mode 100644
index 0000000..98d814f
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/AutoScalerPolicy.java
@@ -0,0 +1,1092 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.autoscaling;
+
+import static brooklyn.util.JavaGroovyEquivalents.groovyTruth;
+import static com.google.common.base.Preconditions.checkNotNull;
+import groovy.lang.Closure;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.catalog.Catalog;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.basic.EntityLocal;
+import org.apache.brooklyn.api.event.AttributeSensor;
+import org.apache.brooklyn.api.event.Sensor;
+import org.apache.brooklyn.api.event.SensorEvent;
+import org.apache.brooklyn.api.event.SensorEventListener;
+import org.apache.brooklyn.api.policy.PolicySpec;
+import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
+import org.apache.brooklyn.core.util.flags.SetFromFlag;
+import org.apache.brooklyn.core.util.flags.TypeCoercions;
+import org.apache.brooklyn.core.util.task.Tasks;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.BrooklynTaskTags;
+import brooklyn.entity.basic.Entities;
+import brooklyn.entity.trait.Resizable;
+import brooklyn.entity.trait.Startable;
+import brooklyn.event.basic.BasicConfigKey;
+import brooklyn.event.basic.BasicNotificationSensor;
+
+import org.apache.brooklyn.policy.autoscaling.SizeHistory.WindowSummary;
+import org.apache.brooklyn.policy.loadbalancing.LoadBalancingPolicy;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.reflect.TypeToken;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+
+/**
+ * Policy that is attached to a {@link Resizable} entity and dynamically adjusts its size in response to
+ * emitted {@code POOL_COLD} and {@code POOL_HOT} events. Alternatively, the policy can be configured to
+ * keep a given metric within a required range.
+ * <p>
+ * TThis policy does not itself determine whether the pool is hot or cold, but instead relies on these
+ * events being emitted by the monitored entity itself, or by another policy that is attached to it; see,
+ * for example, {@link LoadBalancingPolicy}.)
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+@Catalog(name="Auto-scaler", description="Policy that is attached to a Resizable entity and dynamically "
+ + "adjusts its size in response to either keep a metric within a given range, or in response to "
+ + "POOL_COLD and POOL_HOT events")
+public class AutoScalerPolicy extends AbstractPolicy {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AutoScalerPolicy.class);
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private String id;
+ private String name;
+ private AttributeSensor<? extends Number> metric;
+ private Entity entityWithMetric;
+ private Number metricUpperBound;
+ private Number metricLowerBound;
+ private int minPoolSize = 1;
+ private int maxPoolSize = Integer.MAX_VALUE;
+ private Integer resizeDownIterationIncrement;
+ private Integer resizeDownIterationMax;
+ private Integer resizeUpIterationIncrement;
+ private Integer resizeUpIterationMax;
+ private Duration minPeriodBetweenExecs;
+ private Duration resizeUpStabilizationDelay;
+ private Duration resizeDownStabilizationDelay;
+ private ResizeOperator resizeOperator;
+ private Function<Entity,Integer> currentSizeOperator;
+ private BasicNotificationSensor<?> poolHotSensor;
+ private BasicNotificationSensor<?> poolColdSensor;
+ private BasicNotificationSensor<?> poolOkSensor;
+ private BasicNotificationSensor<? super MaxPoolSizeReachedEvent> maxSizeReachedSensor;
+ private Duration maxReachedNotificationDelay;
+
+ public Builder id(String val) {
+ this.id = val; return this;
+ }
+ public Builder name(String val) {
+ this.name = val; return this;
+ }
+ public Builder metric(AttributeSensor<? extends Number> val) {
+ this.metric = val; return this;
+ }
+ public Builder entityWithMetric(Entity val) {
+ this.entityWithMetric = val; return this;
+ }
+ public Builder metricLowerBound(Number val) {
+ this.metricLowerBound = val; return this;
+ }
+ public Builder metricUpperBound(Number val) {
+ this.metricUpperBound = val; return this;
+ }
+ public Builder metricRange(Number min, Number max) {
+ metricLowerBound = checkNotNull(min);
+ metricUpperBound = checkNotNull(max);
+ return this;
+ }
+ public Builder minPoolSize(int val) {
+ this.minPoolSize = val; return this;
+ }
+ public Builder maxPoolSize(int val) {
+ this.maxPoolSize = val; return this;
+ }
+ public Builder sizeRange(int min, int max) {
+ minPoolSize = min;
+ maxPoolSize = max;
+ return this;
+ }
+
+ public Builder resizeUpIterationIncrement(Integer val) {
+ this.resizeUpIterationIncrement = val; return this;
+ }
+ public Builder resizeUpIterationMax(Integer val) {
+ this.resizeUpIterationMax = val; return this;
+ }
+ public Builder resizeDownIterationIncrement(Integer val) {
+ this.resizeUpIterationIncrement = val; return this;
+ }
+ public Builder resizeDownIterationMax(Integer val) {
+ this.resizeUpIterationMax = val; return this;
+ }
+
+ public Builder minPeriodBetweenExecs(Duration val) {
+ this.minPeriodBetweenExecs = val; return this;
+ }
+ public Builder resizeUpStabilizationDelay(Duration val) {
+ this.resizeUpStabilizationDelay = val; return this;
+ }
+ public Builder resizeDownStabilizationDelay(Duration val) {
+ this.resizeDownStabilizationDelay = val; return this;
+ }
+ public Builder resizeOperator(ResizeOperator val) {
+ this.resizeOperator = val; return this;
+ }
+ public Builder currentSizeOperator(Function<Entity, Integer> val) {
+ this.currentSizeOperator = val; return this;
+ }
+ public Builder poolHotSensor(BasicNotificationSensor<?> val) {
+ this.poolHotSensor = val; return this;
+ }
+ public Builder poolColdSensor(BasicNotificationSensor<?> val) {
+ this.poolColdSensor = val; return this;
+ }
+ public Builder poolOkSensor(BasicNotificationSensor<?> val) {
+ this.poolOkSensor = val; return this;
+ }
+ public Builder maxSizeReachedSensor(BasicNotificationSensor<? super MaxPoolSizeReachedEvent> val) {
+ this.maxSizeReachedSensor = val; return this;
+ }
+ public Builder maxReachedNotificationDelay(Duration val) {
+ this.maxReachedNotificationDelay = val; return this;
+ }
+ public AutoScalerPolicy build() {
+ return new AutoScalerPolicy(toFlags());
+ }
+ public PolicySpec<AutoScalerPolicy> buildSpec() {
+ return PolicySpec.create(AutoScalerPolicy.class)
+ .configure(toFlags());
+ }
+ private Map<String,?> toFlags() {
+ return MutableMap.<String,Object>builder()
+ .putIfNotNull("id", id)
+ .putIfNotNull("name", name)
+ .putIfNotNull("metric", metric)
+ .putIfNotNull("entityWithMetric", entityWithMetric)
+ .putIfNotNull("metricUpperBound", metricUpperBound)
+ .putIfNotNull("metricLowerBound", metricLowerBound)
+ .putIfNotNull("minPoolSize", minPoolSize)
+ .putIfNotNull("maxPoolSize", maxPoolSize)
+ .putIfNotNull("resizeUpIterationMax", resizeUpIterationMax)
+ .putIfNotNull("resizeUpIterationIncrement", resizeUpIterationIncrement)
+ .putIfNotNull("resizeDownIterationMax", resizeDownIterationMax)
+ .putIfNotNull("resizeDownIterationIncrement", resizeDownIterationIncrement)
+ .putIfNotNull("minPeriodBetweenExecs", minPeriodBetweenExecs)
+ .putIfNotNull("resizeUpStabilizationDelay", resizeUpStabilizationDelay)
+ .putIfNotNull("resizeDownStabilizationDelay", resizeDownStabilizationDelay)
+ .putIfNotNull("resizeOperator", resizeOperator)
+ .putIfNotNull("currentSizeOperator", currentSizeOperator)
+ .putIfNotNull("poolHotSensor", poolHotSensor)
+ .putIfNotNull("poolColdSensor", poolColdSensor)
+ .putIfNotNull("poolOkSensor", poolOkSensor)
+ .putIfNotNull("maxSizeReachedSensor", maxSizeReachedSensor)
+ .putIfNotNull("maxReachedNotificationDelay", maxReachedNotificationDelay)
+ .build();
+ }
+ }
+
+ // TODO Is there a nicer pattern for registering such type-coercions?
+ // Can't put it in the ResizeOperator interface, nor in core TypeCoercions class because interface is defined in policy/.
+ static {
+ TypeCoercions.registerAdapter(Closure.class, ResizeOperator.class, new Function<Closure,ResizeOperator>() {
+ @Override
+ public ResizeOperator apply(final Closure closure) {
+ return new ResizeOperator() {
+ @Override public Integer resize(Entity entity, Integer input) {
+ return (Integer) closure.call(entity, input);
+ }
+ };
+ }
+ });
+ }
+
+ // Pool workrate notifications.
+ public static BasicNotificationSensor<Map> DEFAULT_POOL_HOT_SENSOR = new BasicNotificationSensor<Map>(
+ Map.class, "resizablepool.hot", "Pool is over-utilized; it has insufficient resource for current workload");
+ public static BasicNotificationSensor<Map> DEFAULT_POOL_COLD_SENSOR = new BasicNotificationSensor<Map>(
+ Map.class, "resizablepool.cold", "Pool is under-utilized; it has too much resource for current workload");
+ public static BasicNotificationSensor<Map> DEFAULT_POOL_OK_SENSOR = new BasicNotificationSensor<Map>(
+ Map.class, "resizablepool.cold", "Pool utilization is ok; the available resources are fine for the current workload");
+
+ /**
+ * A convenience for policies that want to register a {@code builder.maxSizeReachedSensor(sensor)}.
+ * Note that this "default" is not set automatically; the default is for no sensor to be used (so
+ * no events emitted).
+ */
+ public static BasicNotificationSensor<MaxPoolSizeReachedEvent> DEFAULT_MAX_SIZE_REACHED_SENSOR = new BasicNotificationSensor<MaxPoolSizeReachedEvent>(
+ MaxPoolSizeReachedEvent.class, "resizablepool.maxSizeReached", "Consistently wanted to resize the pool above the max allowed size");
+
+ public static final String POOL_CURRENT_SIZE_KEY = "pool.current.size";
+ public static final String POOL_HIGH_THRESHOLD_KEY = "pool.high.threshold";
+ public static final String POOL_LOW_THRESHOLD_KEY = "pool.low.threshold";
+ public static final String POOL_CURRENT_WORKRATE_KEY = "pool.current.workrate";
+
+ @SuppressWarnings("serial")
+ @SetFromFlag("metric")
+ public static final ConfigKey<AttributeSensor<? extends Number>> METRIC = BasicConfigKey.builder(new TypeToken<AttributeSensor<? extends Number>>() {})
+ .name("autoscaler.metric")
+ .build();
+
+ @SetFromFlag("entityWithMetric")
+ public static final ConfigKey<Entity> ENTITY_WITH_METRIC = BasicConfigKey.builder(Entity.class)
+ .name("autoscaler.entityWithMetric")
+ .build();
+
+ @SetFromFlag("metricLowerBound")
+ public static final ConfigKey<Number> METRIC_LOWER_BOUND = BasicConfigKey.builder(Number.class)
+ .name("autoscaler.metricLowerBound")
+ .reconfigurable(true)
+ .build();
+
+ @SetFromFlag("metricUpperBound")
+ public static final ConfigKey<Number> METRIC_UPPER_BOUND = BasicConfigKey.builder(Number.class)
+ .name("autoscaler.metricUpperBound")
+ .reconfigurable(true)
+ .build();
+
+ @SetFromFlag("resizeUpIterationIncrement")
+ public static final ConfigKey<Integer> RESIZE_UP_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class)
+ .name("autoscaler.resizeUpIterationIncrement")
+ .description("Batch size for resizing up; the size will be increased by a multiple of this value")
+ .defaultValue(1)
+ .reconfigurable(true)
+ .build();
+ @SetFromFlag("resizeUpIterationMax")
+ public static final ConfigKey<Integer> RESIZE_UP_ITERATION_MAX = BasicConfigKey.builder(Integer.class)
+ .name("autoscaler.resizeUpIterationMax")
+ .defaultValue(Integer.MAX_VALUE)
+ .description("Maximum change to the size on a single iteration when scaling up")
+ .reconfigurable(true)
+ .build();
+ @SetFromFlag("resizeDownIterationIncrement")
+ public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class)
+ .name("autoscaler.resizeDownIterationIncrement")
+ .description("Batch size for resizing down; the size will be decreased by a multiple of this value")
+ .defaultValue(1)
+ .reconfigurable(true)
+ .build();
+ @SetFromFlag("resizeDownIterationMax")
+ public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_MAX = BasicConfigKey.builder(Integer.class)
+ .name("autoscaler.resizeDownIterationMax")
+ .defaultValue(Integer.MAX_VALUE)
+ .description("Maximum change to the size on a single iteration when scaling down")
+ .reconfigurable(true)
+ .build();
+
+ @SetFromFlag("minPeriodBetweenExecs")
+ public static final ConfigKey<Duration> MIN_PERIOD_BETWEEN_EXECS = BasicConfigKey.builder(Duration.class)
+ .name("autoscaler.minPeriodBetweenExecs")
+ .defaultValue(Duration.millis(100))
+ .build();
+
+ @SetFromFlag("resizeUpStabilizationDelay")
+ public static final ConfigKey<Duration> RESIZE_UP_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
+ .name("autoscaler.resizeUpStabilizationDelay")
+ .defaultValue(Duration.ZERO)
+ .reconfigurable(true)
+ .build();
+
+ @SetFromFlag("resizeDownStabilizationDelay")
+ public static final ConfigKey<Duration> RESIZE_DOWN_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
+ .name("autoscaler.resizeDownStabilizationDelay")
+ .defaultValue(Duration.ZERO)
+ .reconfigurable(true)
+ .build();
+
+ @SetFromFlag("minPoolSize")
+ public static final ConfigKey<Integer> MIN_POOL_SIZE = BasicConfigKey.builder(Integer.class)
+ .name("autoscaler.minPoolSize")
+ .defaultValue(1)
+ .reconfigurable(true)
+ .build();
+
+ @SetFromFlag("maxPoolSize")
+ public static final ConfigKey<Integer> MAX_POOL_SIZE = BasicConfigKey.builder(Integer.class)
+ .name("autoscaler.maxPoolSize")
+ .defaultValue(Integer.MAX_VALUE)
+ .reconfigurable(true)
+ .build();
+
+ @SetFromFlag("resizeOperator")
+ public static final ConfigKey<ResizeOperator> RESIZE_OPERATOR = BasicConfigKey.builder(ResizeOperator.class)
+ .name("autoscaler.resizeOperator")
+ .defaultValue(new ResizeOperator() {
+ public Integer resize(Entity entity, Integer desiredSize) {
+ return ((Resizable)entity).resize(desiredSize);
+ }})
+ .build();
+
+ @SuppressWarnings("serial")
+ @SetFromFlag("currentSizeOperator")
+ public static final ConfigKey<Function<Entity,Integer>> CURRENT_SIZE_OPERATOR = BasicConfigKey.builder(new TypeToken<Function<Entity,Integer>>() {})
+ .name("autoscaler.currentSizeOperator")
+ .defaultValue(new Function<Entity,Integer>() {
+ public Integer apply(Entity entity) {
+ return ((Resizable)entity).getCurrentSize();
+ }})
+ .build();
+
+ @SuppressWarnings("serial")
+ @SetFromFlag("poolHotSensor")
+ public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_HOT_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {})
+ .name("autoscaler.poolHotSensor")
+ .defaultValue(DEFAULT_POOL_HOT_SENSOR)
+ .build();
+
+ @SuppressWarnings("serial")
+ @SetFromFlag("poolColdSensor")
+ public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_COLD_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {})
+ .name("autoscaler.poolColdSensor")
+ .defaultValue(DEFAULT_POOL_COLD_SENSOR)
+ .build();
+
+ @SuppressWarnings("serial")
+ @SetFromFlag("poolOkSensor")
+ public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_OK_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {})
+ .name("autoscaler.poolOkSensor")
+ .defaultValue(DEFAULT_POOL_OK_SENSOR)
+ .build();
+
+ @SuppressWarnings("serial")
+ @SetFromFlag("maxSizeReachedSensor")
+ public static final ConfigKey<BasicNotificationSensor<? super MaxPoolSizeReachedEvent>> MAX_SIZE_REACHED_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? super MaxPoolSizeReachedEvent>>() {})
+ .name("autoscaler.maxSizeReachedSensor")
+ .description("Sensor for which a notification will be emitted (on the associated entity) when " +
+ "we consistently wanted to resize the pool above the max allowed size, for " +
+ "maxReachedNotificationDelay milliseconds")
+ .build();
+
+ @SetFromFlag("maxReachedNotificationDelay")
+ public static final ConfigKey<Duration> MAX_REACHED_NOTIFICATION_DELAY = BasicConfigKey.builder(Duration.class)
+ .name("autoscaler.maxReachedNotificationDelay")
+ .description("Time that we consistently wanted to go above the maxPoolSize for, after which the " +
+ "maxSizeReachedSensor (if any) will be emitted")
+ .defaultValue(Duration.ZERO)
+ .build();
+
+ private Entity poolEntity;
+
+ private final AtomicBoolean executorQueued = new AtomicBoolean(false);
+ private volatile long executorTime = 0;
+ private volatile ScheduledExecutorService executor;
+
+ private SizeHistory recentUnboundedResizes;
+
+ private SizeHistory recentDesiredResizes;
+
+ private long maxReachedLastNotifiedTime;
+
+ private final SensorEventListener<Map> utilizationEventHandler = new SensorEventListener<Map>() {
+ public void onEvent(SensorEvent<Map> event) {
+ Map<String, ?> properties = (Map<String, ?>) event.getValue();
+ Sensor<?> sensor = event.getSensor();
+
+ if (sensor.equals(getPoolColdSensor())) {
+ onPoolCold(properties);
+ } else if (sensor.equals(getPoolHotSensor())) {
+ onPoolHot(properties);
+ } else if (sensor.equals(getPoolOkSensor())) {
+ onPoolOk(properties);
+ } else {
+ throw new IllegalStateException("Unexpected sensor type: "+sensor+"; event="+event);
+ }
+ }
+ };
+
+ private final SensorEventListener<Number> metricEventHandler = new SensorEventListener<Number>() {
+ public void onEvent(SensorEvent<Number> event) {
+ assert event.getSensor().equals(getMetric());
+ onMetricChanged(event.getValue());
+ }
+ };
+
+ public AutoScalerPolicy() {
+ this(MutableMap.<String,Object>of());
+ }
+
+ public AutoScalerPolicy(Map<String,?> props) {
+ super(props);
+ }
+
+ @Override
+ public void init() {
+ doInit();
+ }
+
+ @Override
+ public void rebind() {
+ doInit();
+ }
+
+ protected void doInit() {
+ long maxReachedNotificationDelay = getMaxReachedNotificationDelay().toMilliseconds();
+ recentUnboundedResizes = new SizeHistory(maxReachedNotificationDelay);
+
+ long maxResizeStabilizationDelay = Math.max(getResizeUpStabilizationDelay().toMilliseconds(), getResizeDownStabilizationDelay().toMilliseconds());
+ recentDesiredResizes = new SizeHistory(maxResizeStabilizationDelay);
+
+ // TODO Should re-use the execution manager's thread pool, somehow
+ executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
+ }
+
+ public void setMetricLowerBound(Number val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing metricLowerBound from {} to {}", new Object[] {this, getMetricLowerBound(), val});
+ config().set(METRIC_LOWER_BOUND, checkNotNull(val));
+ }
+
+ public void setMetricUpperBound(Number val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing metricUpperBound from {} to {}", new Object[] {this, getMetricUpperBound(), val});
+ config().set(METRIC_UPPER_BOUND, checkNotNull(val));
+ }
+
+ private <T> void setOrDefault(ConfigKey<T> key, T val) {
+ if (val==null) val = key.getDefaultValue();
+ config().set(key, val);
+ }
+ public int getResizeUpIterationIncrement() { return getConfig(RESIZE_UP_ITERATION_INCREMENT); }
+ public void setResizeUpIterationIncrement(Integer val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpIterationIncrement from {} to {}", new Object[] {this, getResizeUpIterationIncrement(), val});
+ setOrDefault(RESIZE_UP_ITERATION_INCREMENT, val);
+ }
+ public int getResizeDownIterationIncrement() { return getConfig(RESIZE_DOWN_ITERATION_INCREMENT); }
+ public void setResizeDownIterationIncrement(Integer val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownIterationIncrement from {} to {}", new Object[] {this, getResizeDownIterationIncrement(), val});
+ setOrDefault(RESIZE_DOWN_ITERATION_INCREMENT, val);
+ }
+ public int getResizeUpIterationMax() { return getConfig(RESIZE_UP_ITERATION_MAX); }
+ public void setResizeUpIterationMax(Integer val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpIterationMax from {} to {}", new Object[] {this, getResizeUpIterationMax(), val});
+ setOrDefault(RESIZE_UP_ITERATION_MAX, val);
+ }
+ public int getResizeDownIterationMax() { return getConfig(RESIZE_DOWN_ITERATION_MAX); }
+ public void setResizeDownIterationMax(Integer val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownIterationMax from {} to {}", new Object[] {this, getResizeDownIterationMax(), val});
+ setOrDefault(RESIZE_DOWN_ITERATION_MAX, val);
+ }
+
+ public void setMinPeriodBetweenExecs(Duration val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing minPeriodBetweenExecs from {} to {}", new Object[] {this, getMinPeriodBetweenExecs(), val});
+ config().set(MIN_PERIOD_BETWEEN_EXECS, val);
+ }
+
+ public void setResizeUpStabilizationDelay(Duration val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpStabilizationDelay from {} to {}", new Object[] {this, getResizeUpStabilizationDelay(), val});
+ config().set(RESIZE_UP_STABILIZATION_DELAY, val);
+ }
+
+ public void setResizeDownStabilizationDelay(Duration val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownStabilizationDelay from {} to {}", new Object[] {this, getResizeDownStabilizationDelay(), val});
+ config().set(RESIZE_DOWN_STABILIZATION_DELAY, val);
+ }
+
+ public void setMinPoolSize(int val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing minPoolSize from {} to {}", new Object[] {this, getMinPoolSize(), val});
+ config().set(MIN_POOL_SIZE, val);
+ }
+
+ public void setMaxPoolSize(int val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing maxPoolSize from {} to {}", new Object[] {this, getMaxPoolSize(), val});
+ config().set(MAX_POOL_SIZE, val);
+ }
+
+ private AttributeSensor<? extends Number> getMetric() {
+ return getConfig(METRIC);
+ }
+
+ private Entity getEntityWithMetric() {
+ return getConfig(ENTITY_WITH_METRIC);
+ }
+
+ private Number getMetricLowerBound() {
+ return getConfig(METRIC_LOWER_BOUND);
+ }
+
+ private Number getMetricUpperBound() {
+ return getConfig(METRIC_UPPER_BOUND);
+ }
+
+ private Duration getMinPeriodBetweenExecs() {
+ return getConfig(MIN_PERIOD_BETWEEN_EXECS);
+ }
+
+ private Duration getResizeUpStabilizationDelay() {
+ return getConfig(RESIZE_UP_STABILIZATION_DELAY);
+ }
+
+ private Duration getResizeDownStabilizationDelay() {
+ return getConfig(RESIZE_DOWN_STABILIZATION_DELAY);
+ }
+
+ private int getMinPoolSize() {
+ return getConfig(MIN_POOL_SIZE);
+ }
+
+ private int getMaxPoolSize() {
+ return getConfig(MAX_POOL_SIZE);
+ }
+
+ private ResizeOperator getResizeOperator() {
+ return getConfig(RESIZE_OPERATOR);
+ }
+
+ private Function<Entity,Integer> getCurrentSizeOperator() {
+ return getConfig(CURRENT_SIZE_OPERATOR);
+ }
+
+ private BasicNotificationSensor<? extends Map> getPoolHotSensor() {
+ return getConfig(POOL_HOT_SENSOR);
+ }
+
+ private BasicNotificationSensor<? extends Map> getPoolColdSensor() {
+ return getConfig(POOL_COLD_SENSOR);
+ }
+
+ private BasicNotificationSensor<? extends Map> getPoolOkSensor() {
+ return getConfig(POOL_OK_SENSOR);
+ }
+
+ private BasicNotificationSensor<? super MaxPoolSizeReachedEvent> getMaxSizeReachedSensor() {
+ return getConfig(MAX_SIZE_REACHED_SENSOR);
+ }
+
+ private Duration getMaxReachedNotificationDelay() {
+ return getConfig(MAX_REACHED_NOTIFICATION_DELAY);
+ }
+
+ @Override
+ protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) {
+ if (key.equals(RESIZE_UP_STABILIZATION_DELAY)) {
+ Duration maxResizeStabilizationDelay = Duration.max((Duration)val, getResizeDownStabilizationDelay());
+ recentDesiredResizes.setWindowSize(maxResizeStabilizationDelay);
+ } else if (key.equals(RESIZE_DOWN_STABILIZATION_DELAY)) {
+ Duration maxResizeStabilizationDelay = Duration.max((Duration)val, getResizeUpStabilizationDelay());
+ recentDesiredResizes.setWindowSize(maxResizeStabilizationDelay);
+ } else if (key.equals(METRIC_LOWER_BOUND)) {
+ // TODO If recorded what last metric value was then we could recalculate immediately
+ // Rely on next metric-change to trigger recalculation;
+ // and same for those below...
+ } else if (key.equals(METRIC_UPPER_BOUND)) {
+ // see above
+ } else if (key.equals(RESIZE_UP_ITERATION_INCREMENT) || key.equals(RESIZE_UP_ITERATION_MAX) || key.equals(RESIZE_DOWN_ITERATION_INCREMENT) || key.equals(RESIZE_DOWN_ITERATION_MAX)) {
+ // no special actions needed
+ } else if (key.equals(MIN_POOL_SIZE)) {
+ int newMin = (Integer) val;
+ if (newMin > getConfig(MAX_POOL_SIZE)) {
+ throw new IllegalArgumentException("Min pool size "+val+" must not be greater than max pool size "+getConfig(MAX_POOL_SIZE));
+ }
+ onPoolSizeLimitsChanged(newMin, getConfig(MAX_POOL_SIZE));
+ } else if (key.equals(MAX_POOL_SIZE)) {
+ int newMax = (Integer) val;
+ if (newMax < getConfig(MIN_POOL_SIZE)) {
+ throw new IllegalArgumentException("Min pool size "+val+" must not be greater than max pool size "+getConfig(MAX_POOL_SIZE));
+ }
+ onPoolSizeLimitsChanged(getConfig(MIN_POOL_SIZE), newMax);
+ } else {
+ throw new UnsupportedOperationException("reconfiguring "+key+" unsupported for "+this);
+ }
+ }
+
+ @Override
+ public void suspend() {
+ super.suspend();
+ // TODO unsubscribe from everything? And resubscribe on resume?
+ if (executor != null) executor.shutdownNow();
+ }
+
+ @Override
+ public void resume() {
+ super.resume();
+ executor = Executors.newSingleThreadScheduledExecutor(newThreadFactory());
+ }
+
+ @Override
+ public void setEntity(EntityLocal entity) {
+ if (!config().getRaw(RESIZE_OPERATOR).isPresentAndNonNull()) {
+ Preconditions.checkArgument(entity instanceof Resizable, "Provided entity "+entity+" must be an instance of Resizable, because no custom-resizer operator supplied");
+ }
+ super.setEntity(entity);
+ this.poolEntity = entity;
+
+ if (getMetric() != null) {
+ Entity entityToSubscribeTo = (getEntityWithMetric() != null) ? getEntityWithMetric() : entity;
+ subscribe(entityToSubscribeTo, getMetric(), metricEventHandler);
+ }
+ subscribe(poolEntity, getPoolColdSensor(), utilizationEventHandler);
+ subscribe(poolEntity, getPoolHotSensor(), utilizationEventHandler);
+ subscribe(poolEntity, getPoolOkSensor(), utilizationEventHandler);
+ }
+
+ private ThreadFactory newThreadFactory() {
+ return new ThreadFactoryBuilder()
+ .setNameFormat("brooklyn-autoscalerpolicy-%d")
+ .build();
+ }
+
+ /**
+ * Forces an immediate resize (without waiting for stabilization etc) if the current size is
+ * not within the min and max limits. We schedule this so that all resize operations are done
+ * by the same thread.
+ */
+ private void onPoolSizeLimitsChanged(final int min, final int max) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} checking pool size on limits changed for {} (between {} and {})", new Object[] {this, poolEntity, min, max});
+
+ if (isRunning() && isEntityUp()) {
+ executor.submit(new Runnable() {
+ @Override public void run() {
+ try {
+ int currentSize = getCurrentSizeOperator().apply(entity);
+ int desiredSize = Math.min(max, Math.max(min, currentSize));
+
+ if (currentSize != desiredSize) {
+ if (LOG.isInfoEnabled()) LOG.info("{} resizing pool {} immediateley from {} to {} (due to new pool size limits)", new Object[] {this, poolEntity, currentSize, desiredSize});
+ getResizeOperator().resize(poolEntity, desiredSize);
+ }
+
+ } catch (Exception e) {
+ if (isRunning()) {
+ LOG.error("Error resizing: "+e, e);
+ } else {
+ if (LOG.isDebugEnabled()) LOG.debug("Error resizing, but no longer running: "+e, e);
+ }
+ } catch (Throwable t) {
+ LOG.error("Error resizing: "+t, t);
+ throw Throwables.propagate(t);
+ }
+ }});
+ }
+ }
+
+ private enum ScalingType { HOT, COLD }
+ private static class ScalingData {
+ ScalingType scalingMode;
+ int currentSize;
+ double currentMetricValue;
+ Double metricUpperBound;
+ Double metricLowerBound;
+
+ public double getCurrentTotalActivity() {
+ return currentMetricValue * currentSize;
+ }
+
+ public boolean isHot() {
+ return ((scalingMode==null || scalingMode==ScalingType.HOT) && isValid(metricUpperBound) && currentMetricValue > metricUpperBound);
+ }
+ public boolean isCold() {
+ return ((scalingMode==null || scalingMode==ScalingType.COLD) && isValid(metricLowerBound) && currentMetricValue < metricLowerBound);
+ }
+ private boolean isValid(Double bound) {
+ return (bound!=null && bound>0);
+ }
+ }
+
+ private void onMetricChanged(Number val) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-metric for {}: {}", new Object[] {this, poolEntity, val});
+
+ if (val==null) {
+ // occurs e.g. if using an aggregating enricher who returns null when empty, the sensor has gone away
+ if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {}, inbound metric is null", new Object[] {this, poolEntity});
+ return;
+ }
+
+ ScalingData data = new ScalingData();
+ data.currentMetricValue = val.doubleValue();
+ data.currentSize = getCurrentSizeOperator().apply(entity);
+ data.metricUpperBound = getMetricUpperBound().doubleValue();
+ data.metricLowerBound = getMetricLowerBound().doubleValue();
+
+ analyze(data, "pool");
+ }
+
+ private void onPoolCold(Map<String, ?> properties) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-cold for {}: {}", new Object[] {this, poolEntity, properties});
+ analyzeOnHotOrColdSensor(ScalingType.COLD, "cold pool", properties);
+ }
+
+ private void onPoolHot(Map<String, ?> properties) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-hot for {}: {}", new Object[] {this, poolEntity, properties});
+ analyzeOnHotOrColdSensor(ScalingType.HOT, "hot pool", properties);
+ }
+
+ private void analyzeOnHotOrColdSensor(ScalingType scalingMode, String description, Map<String, ?> properties) {
+ ScalingData data = new ScalingData();
+ data.scalingMode = scalingMode;
+ data.currentMetricValue = (Double) properties.get(POOL_CURRENT_WORKRATE_KEY);
+ data.currentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY);
+ data.metricUpperBound = (Double) properties.get(POOL_HIGH_THRESHOLD_KEY);
+ data.metricLowerBound = (Double) properties.get(POOL_LOW_THRESHOLD_KEY);
+
+ analyze(data, description);
+ }
+
+ private void analyze(ScalingData data, String description) {
+ int desiredSizeUnconstrained;
+
+ /* We always scale out (modulo stabilization delay) if:
+ * currentTotalActivity > currentSize*metricUpperBound
+ * With newDesiredSize the smallest n such that n*metricUpperBound >= currentTotalActivity
+ * ie n >= currentTotalActiviy/metricUpperBound, thus n := Math.ceil(currentTotalActivity/metricUpperBound)
+ *
+ * Else consider scale back if:
+ * currentTotalActivity < currentSize*metricLowerBound
+ * With newDesiredSize normally the largest n such that:
+ * n*metricLowerBound <= currentTotalActivity
+ * BUT with an absolute requirement which trumps the above computation
+ * that the newDesiredSize doesn't cause immediate scale out:
+ * n*metricUpperBound >= currentTotalActivity
+ * thus n := Math.max ( floor(currentTotalActiviy/metricLowerBound), ceil(currentTotal/metricUpperBound) )
+ */
+ if (data.isHot()) {
+ // scale out
+ desiredSizeUnconstrained = (int)Math.ceil(data.getCurrentTotalActivity() / data.metricUpperBound);
+ data.scalingMode = ScalingType.HOT;
+
+ } else if (data.isCold()) {
+ // scale back
+ desiredSizeUnconstrained = (int)Math.floor(data.getCurrentTotalActivity() / data.metricLowerBound);
+ data.scalingMode = ScalingType.COLD;
+
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} within range {}..{})", new Object[] {this, poolEntity, data.currentSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound});
+ abortResize(data.currentSize);
+ return; // within the healthy range; no-op
+ }
+
+ if (LOG.isTraceEnabled()) LOG.debug("{} detected unconstrained desired size {}", new Object[] {this, desiredSizeUnconstrained});
+ int desiredSize = applyMinMaxConstraints(desiredSizeUnconstrained);
+
+ if ((data.scalingMode==ScalingType.COLD) && (desiredSize < data.currentSize)) {
+
+ int delta = data.currentSize - desiredSize;
+ int scaleIncrement = getResizeDownIterationIncrement();
+ int scaleMax = getResizeDownIterationMax();
+ if (delta>scaleMax) {
+ delta=scaleMax;
+ } else if (delta % scaleIncrement != 0) {
+ // keep scaling to the increment
+ delta += scaleIncrement - (delta % scaleIncrement);
+ }
+ desiredSize = data.currentSize - delta;
+
+ if (data.metricUpperBound!=null) {
+ // if upper bound supplied, check that this desired scale-back size
+ // is not going to cause scale-out on next run; i.e. anti-thrashing
+ while (desiredSize < data.currentSize && data.getCurrentTotalActivity() > data.metricUpperBound * desiredSize) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} when resizing back pool {} from {}, tweaking from {} to prevent thrashing", new Object[] {this, poolEntity, data.currentSize, desiredSize });
+ desiredSize += scaleIncrement;
+ }
+ }
+ desiredSize = applyMinMaxConstraints(desiredSize);
+ if (desiredSize >= data.currentSize) data.scalingMode = null;
+
+ } else if ((data.scalingMode==ScalingType.HOT) && (desiredSize > data.currentSize)) {
+
+ int delta = desiredSize - data.currentSize;
+ int scaleIncrement = getResizeUpIterationIncrement();
+ int scaleMax = getResizeUpIterationMax();
+ if (delta>scaleMax) {
+ delta=scaleMax;
+ } else if (delta % scaleIncrement != 0) {
+ // keep scaling to the increment
+ delta += scaleIncrement - (delta % scaleIncrement);
+ }
+ desiredSize = data.currentSize + delta;
+ desiredSize = applyMinMaxConstraints(desiredSize);
+ if (desiredSize <= data.currentSize) data.scalingMode = null;
+
+ } else {
+ data.scalingMode = null;
+ }
+
+ if (data.scalingMode!=null) {
+ if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing {} {} from {} to {} ({} < {}; ideal size {})", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, desiredSizeUnconstrained});
+ scheduleResize(desiredSize);
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("{} not resizing {} {} from {} to {}, {} out of healthy range {}..{} but unconstrained size {} blocked by bounds/check", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound, desiredSizeUnconstrained});
+ abortResize(data.currentSize);
+ // but add to the unbounded record for future consideration
+ }
+
+ onNewUnboundedPoolSize(desiredSizeUnconstrained);
+ }
+
+ private int applyMinMaxConstraints(int desiredSize) {
+ desiredSize = Math.max(getMinPoolSize(), desiredSize);
+ desiredSize = Math.min(getMaxPoolSize(), desiredSize);
+ return desiredSize;
+ }
+
+ private void onPoolOk(Map<String, ?> properties) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-ok for {}: {}", new Object[] {this, poolEntity, properties});
+
+ int poolCurrentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY);
+
+ if (LOG.isTraceEnabled()) LOG.trace("{} not resizing ok pool {} from {}", new Object[] {this, poolEntity, poolCurrentSize});
+ abortResize(poolCurrentSize);
+ }
+
+ /**
+ * Schedules a resize, if there is not already a resize operation queued up. When that resize
+ * executes, it will resize to whatever the latest value is to be (rather than what it was told
+ * to do at the point the job was queued).
+ */
+ private void scheduleResize(final int newSize) {
+ recentDesiredResizes.add(newSize);
+
+ scheduleResize();
+ }
+
+ /**
+ * If a listener is registered to be notified of the max-pool-size cap being reached, then record
+ * what our unbounded size would be and schedule a check to see if this unbounded size is sustained.
+ *
+ * Piggy-backs off the existing scheduleResize execution, which now also checks if the listener
+ * needs to be called.
+ */
+ private void onNewUnboundedPoolSize(final int val) {
+ if (getMaxSizeReachedSensor() != null) {
+ recentUnboundedResizes.add(val);
+ scheduleResize();
+ }
+ }
+
+ private void abortResize(final int currentSize) {
+ recentDesiredResizes.add(currentSize);
+ recentUnboundedResizes.add(currentSize);
+ }
+
+ private boolean isEntityUp() {
+ if (entity == null) {
+ return false;
+ } else if (entity.getEntityType().getSensors().contains(Startable.SERVICE_UP)) {
+ return Boolean.TRUE.equals(entity.getAttribute(Startable.SERVICE_UP));
+ } else {
+ return true;
+ }
+ }
+
+ private void scheduleResize() {
+ // TODO Make scale-out calls concurrent, rather than waiting for first resize to entirely
+ // finish. On ec2 for example, this can cause us to grow very slowly if first request is for
+ // just one new VM to be provisioned.
+
+ if (isRunning() && isEntityUp() && executorQueued.compareAndSet(false, true)) {
+ long now = System.currentTimeMillis();
+ long delay = Math.max(0, (executorTime + getMinPeriodBetweenExecs().toMilliseconds()) - now);
+ if (LOG.isTraceEnabled()) LOG.trace("{} scheduling resize in {}ms", this, delay);
+
+ executor.schedule(new Runnable() {
+ @Override public void run() {
+ try {
+ executorTime = System.currentTimeMillis();
+ executorQueued.set(false);
+
+ resizeNow();
+ notifyMaxReachedIfRequiredNow();
+
+ } catch (Exception e) {
+ if (isRunning()) {
+ LOG.error("Error resizing: "+e, e);
+ } else {
+ if (LOG.isDebugEnabled()) LOG.debug("Error resizing, but no longer running: "+e, e);
+ }
+ } catch (Throwable t) {
+ LOG.error("Error resizing: "+t, t);
+ throw Throwables.propagate(t);
+ }
+ }},
+ delay,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Looks at the values for "unbounded pool size" (i.e. if we ignore caps of minSize and maxSize) to report what
+ * those values have been within a time window. The time window used is the "maxReachedNotificationDelay",
+ * which determines how many milliseconds after being consistently above the max-size will it take before
+ * we emit the sensor event (if any).
+ */
+ private void notifyMaxReachedIfRequiredNow() {
+ BasicNotificationSensor<? super MaxPoolSizeReachedEvent> maxSizeReachedSensor = getMaxSizeReachedSensor();
+ if (maxSizeReachedSensor == null) {
+ return;
+ }
+
+ WindowSummary valsSummary = recentUnboundedResizes.summarizeWindow(getMaxReachedNotificationDelay());
+ long timeWindowSize = getMaxReachedNotificationDelay().toMilliseconds();
+ long currentPoolSize = getCurrentSizeOperator().apply(poolEntity);
+ int maxAllowedPoolSize = getMaxPoolSize();
+ long unboundedSustainedMaxPoolSize = valsSummary.min; // The sustained maximum (i.e. the smallest it's dropped down to)
+ long unboundedCurrentPoolSize = valsSummary.latest;
+
+ if (maxReachedLastNotifiedTime > 0) {
+ // already notified the listener; don't do it again
+ // TODO Could have max period for notifications, or a step increment to warn when exceeded by ever bigger amounts
+
+ } else if (unboundedSustainedMaxPoolSize > maxAllowedPoolSize) {
+ // We have consistently wanted to be bigger than the max allowed; tell the listener
+ if (LOG.isDebugEnabled()) LOG.debug("{} notifying listener of max pool size reached; current {}, max {}, unbounded current {}, unbounded max {}",
+ new Object[] {this, currentPoolSize, maxAllowedPoolSize, unboundedCurrentPoolSize, unboundedSustainedMaxPoolSize});
+
+ maxReachedLastNotifiedTime = System.currentTimeMillis();
+ MaxPoolSizeReachedEvent event = MaxPoolSizeReachedEvent.builder()
+ .currentPoolSize(currentPoolSize)
+ .maxAllowed(maxAllowedPoolSize)
+ .currentUnbounded(unboundedCurrentPoolSize)
+ .maxUnbounded(unboundedSustainedMaxPoolSize)
+ .timeWindow(timeWindowSize)
+ .build();
+ entity.emit(maxSizeReachedSensor, event);
+
+ } else if (valsSummary.max > maxAllowedPoolSize) {
+ // We temporarily wanted to be bigger than the max allowed; check back later to see if consistent
+ // TODO Could check if there has been anything bigger than "min" since min happened (would be more efficient)
+ if (LOG.isTraceEnabled()) LOG.trace("{} re-scheduling max-reached check for {}, as unbounded size not stable (min {}, max {}, latest {})",
+ new Object[] {this, poolEntity, valsSummary.min, valsSummary.max, valsSummary.latest});
+ scheduleResize();
+
+ } else {
+ // nothing to write home about; continually below maxAllowed
+ }
+ }
+
+ private void resizeNow() {
+ long currentPoolSize = getCurrentSizeOperator().apply(poolEntity);
+ CalculatedDesiredPoolSize calculatedDesiredPoolSize = calculateDesiredPoolSize(currentPoolSize);
+ final long desiredPoolSize = calculatedDesiredPoolSize.size;
+ boolean stable = calculatedDesiredPoolSize.stable;
+
+ if (!stable) {
+ // the desired size fluctuations are not stable; ensure we check again later (due to time-window)
+ // even if no additional events have been received
+ // (note we continue now with as "good" a resize as we can given the instability)
+ if (LOG.isTraceEnabled()) LOG.trace("{} re-scheduling resize check for {}, as desired size not stable (current {}, desired {}); continuing with resize...",
+ new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize});
+ scheduleResize();
+ }
+ if (currentPoolSize == desiredPoolSize) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} to {}",
+ new Object[] {this, poolEntity, currentPoolSize, desiredPoolSize});
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) LOG.debug("{} requesting resize to {}; current {}, min {}, max {}",
+ new Object[] {this, desiredPoolSize, currentPoolSize, getMinPoolSize(), getMaxPoolSize()});
+
+ Entities.submit(entity, Tasks.<Void>builder().name("Auto-scaler")
+ .description("Auto-scaler recommending resize from "+currentPoolSize+" to "+desiredPoolSize)
+ .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
+ .body(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ // TODO Should we use int throughout, rather than casting here?
+ getResizeOperator().resize(poolEntity, (int) desiredPoolSize);
+ return null;
+ }
+ }).build())
+ .blockUntilEnded();
+ }
+
+ /**
+ * Complicated logic for stabilization-delay...
+ * Only grow if we have consistently been asked to grow for the resizeUpStabilizationDelay period;
+ * Only shrink if we have consistently been asked to shrink for the resizeDownStabilizationDelay period.
+ *
+ * @return tuple of desired pool size, and whether this is "stable" (i.e. if we receive no more events
+ * will this continue to be the desired pool size)
+ */
+ private CalculatedDesiredPoolSize calculateDesiredPoolSize(long currentPoolSize) {
+ long now = System.currentTimeMillis();
+ WindowSummary downsizeSummary = recentDesiredResizes.summarizeWindow(getResizeDownStabilizationDelay());
+ WindowSummary upsizeSummary = recentDesiredResizes.summarizeWindow(getResizeUpStabilizationDelay());
+
+ // this is the _sustained_ growth value; the smallest size that has been requested in the "stable-for-growing" period
+ long maxDesiredPoolSize = upsizeSummary.min;
+ boolean stableForGrowing = upsizeSummary.stableForGrowth;
+
+ // this is the _sustained_ shrink value; largest size that has been requested in the "stable-for-shrinking" period:
+ long minDesiredPoolSize = downsizeSummary.max;
+ boolean stableForShrinking = downsizeSummary.stableForShrinking;
+
+ // (it is a logical consequence of the above that minDesired >= maxDesired -- this is correct, if confusing:
+ // think of minDesired as the minimum size we are allowed to resize to, and similarly for maxDesired;
+ // if min > max we can scale to max if current < max, or scale to min if current > min)
+
+ long desiredPoolSize;
+
+ boolean stable;
+
+ if (currentPoolSize < maxDesiredPoolSize) {
+ // we have valid request to grow
+ // (we'll never have a valid request to grow and a valid to shrink simultaneously, btw)
+ desiredPoolSize = maxDesiredPoolSize;
+ stable = stableForGrowing;
+ } else if (currentPoolSize > minDesiredPoolSize) {
+ // we have valid request to shrink
+ desiredPoolSize = minDesiredPoolSize;
+ stable = stableForShrinking;
+ } else {
+ desiredPoolSize = currentPoolSize;
+ stable = stableForGrowing && stableForShrinking;
+ }
+
+ if (LOG.isTraceEnabled()) LOG.trace("{} calculated desired pool size: from {} to {}; minDesired {}, maxDesired {}; " +
+ "stable {}; now {}; downsizeHistory {}; upsizeHistory {}",
+ new Object[] {this, currentPoolSize, desiredPoolSize, minDesiredPoolSize, maxDesiredPoolSize, stable, now, downsizeSummary, upsizeSummary});
+
+ return new CalculatedDesiredPoolSize(desiredPoolSize, stable);
+ }
+
+ private static class CalculatedDesiredPoolSize {
+ final long size;
+ final boolean stable;
+
+ CalculatedDesiredPoolSize(long size, boolean stable) {
+ this.size = size;
+ this.stable = stable;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + (groovyTruth(name) ? "("+name+")" : "");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java
new file mode 100644
index 0000000..6e97771
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/MaxPoolSizeReachedEvent.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.autoscaling;
+
+import java.io.Serializable;
+
+import com.google.common.base.Objects;
+
+public class MaxPoolSizeReachedEvent implements Serializable {
+ private static final long serialVersionUID = 1602627701360505190L;
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ protected long maxAllowed;
+ protected long currentPoolSize;
+ protected long currentUnbounded;
+ protected long maxUnbounded;
+ protected long timeWindow;
+
+ public Builder maxAllowed(long val) {
+ this.maxAllowed = val; return this;
+ }
+
+ public Builder currentPoolSize(long val) {
+ this.currentPoolSize = val; return this;
+ }
+
+ public Builder currentUnbounded(long val) {
+ this.currentUnbounded = val; return this;
+ }
+
+ public Builder maxUnbounded(long val) {
+ this.maxUnbounded = val; return this;
+ }
+
+ public Builder timeWindow(long val) {
+ this.timeWindow = val; return this;
+ }
+ public MaxPoolSizeReachedEvent build() {
+ return new MaxPoolSizeReachedEvent(this);
+ }
+ }
+
+ private final long maxAllowed;
+ private final long currentPoolSize;
+ private final long currentUnbounded;
+ private final long maxUnbounded;
+ private final long timeWindow;
+
+ protected MaxPoolSizeReachedEvent(Builder builder) {
+ maxAllowed = builder.maxAllowed;
+ currentPoolSize = builder.currentPoolSize;
+ currentUnbounded = builder.currentUnbounded;
+ maxUnbounded = builder.maxUnbounded;
+ timeWindow = builder.timeWindow;
+ }
+
+ public long getMaxAllowed() {
+ return maxAllowed;
+ }
+
+ public long getCurrentPoolSize() {
+ return currentPoolSize;
+ }
+
+ public long getCurrentUnbounded() {
+ return currentUnbounded;
+ }
+
+ public long getMaxUnbounded() {
+ return maxUnbounded;
+ }
+
+ public long getTimeWindow() {
+ return timeWindow;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).add("maxAllowed", maxAllowed).add("currentPoolSize", currentPoolSize)
+ .add("currentUnbounded", currentUnbounded).add("maxUnbounded", maxUnbounded)
+ .add("timeWindow", timeWindow).toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java
new file mode 100644
index 0000000..4f4fbb0
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/ResizeOperator.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.autoscaling;
+
+import org.apache.brooklyn.api.entity.Entity;
+
+public interface ResizeOperator {
+
+ /**
+ * Resizes the given entity to the desired size, if possible.
+ *
+ * @return the new size of the entity
+ */
+ public Integer resize(Entity entity, Integer desiredSize);
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java
new file mode 100644
index 0000000..0aa8801
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/autoscaling/SizeHistory.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.autoscaling;
+
+import java.util.List;
+
+import brooklyn.util.collections.MutableMap;
+import brooklyn.util.collections.TimeWindowedList;
+import brooklyn.util.collections.TimestampedValue;
+import brooklyn.util.time.Duration;
+
+import com.google.common.base.Objects;
+
+/**
+ * Using a {@link TimeWindowedList}, tracks the recent history of values to allow a summary of
+ * those values to be obtained.
+ *
+ * @author aled
+ */
+public class SizeHistory {
+
+ public static class WindowSummary {
+ /** The most recent value (or -1 if there has been no value) */
+ public final long latest;
+
+ /** The minimum vaule within the given time period */
+ public final long min;
+
+ /** The maximum vaule within the given time period */
+ public final long max;
+
+ /** true if, since that max value, there have not been any higher values */
+ public final boolean stableForGrowth;
+
+ /** true if, since that low value, there have not been any lower values */
+ public final boolean stableForShrinking;
+
+ public WindowSummary(long latest, long min, long max, boolean stableForGrowth, boolean stableForShrinking) {
+ this.latest = latest;
+ this.min = min;
+ this.max = max;
+ this.stableForGrowth = stableForGrowth;
+ this.stableForShrinking = stableForShrinking;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this).add("latest", latest).add("min", min).add("max", max)
+ .add("stableForGrowth", stableForGrowth).add("stableForShrinking", stableForShrinking).toString();
+ }
+ }
+
+ private final TimeWindowedList<Number> recentDesiredResizes;
+
+ public SizeHistory(long windowSize) {
+ recentDesiredResizes = new TimeWindowedList<Number>(MutableMap.of("timePeriod", windowSize, "minExpiredVals", 1));
+ }
+
+ public void add(final int val) {
+ recentDesiredResizes.add(val);
+ }
+
+ public void setWindowSize(Duration newWindowSize) {
+ recentDesiredResizes.setTimePeriod(newWindowSize);
+ }
+
+ /**
+ * Summarises the history of values in this time window, with a few special things:
+ * <ul>
+ * <li>If entire time-window is not covered by the given values, then min is Integer.MIN_VALUE and max is Integer.MAX_VALUE
+ * <li>If no values, then latest is -1
+ * <li>If no recent values, then keeps last-seen value (no matter how old), to use that
+ * <li>"stable for growth" means that since that max value, there have not been any higher values
+ * <li>"stable for shrinking" means that since that low value, there have not been any lower values
+ * </ul>
+ */
+ public WindowSummary summarizeWindow(Duration windowSize) {
+ long now = System.currentTimeMillis();
+ List<TimestampedValue<Number>> windowVals = recentDesiredResizes.getValuesInWindow(now, windowSize);
+
+ Number latestObj = latestInWindow(windowVals);
+ long latest = (latestObj == null) ? -1: latestObj.longValue();
+ long max = maxInWindow(windowVals, windowSize).longValue();
+ long min = minInWindow(windowVals, windowSize).longValue();
+
+ // TODO Could do more sophisticated "stable" check; this is the easiest code - correct but not most efficient
+ // in terms of the caller having to schedule additional stability checks.
+ boolean stable = (min == max);
+
+ return new WindowSummary(latest, min, max, stable, stable);
+ }
+
+ /**
+ * If the entire time-window is not covered by the given values, then returns Integer.MAX_VALUE.
+ */
+ private <T extends Number> T maxInWindow(List<TimestampedValue<T>> vals, Duration timeWindow) {
+ // TODO bad casting from Integer default result to T
+ long now = System.currentTimeMillis();
+ long epoch = now - timeWindow.toMilliseconds();
+ T result = null;
+ double resultAsDouble = Integer.MAX_VALUE;
+ for (TimestampedValue<T> val : vals) {
+ T valAsNum = val.getValue();
+ double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0;
+ if (result == null && val.getTimestamp() > epoch) {
+ result = withDefault(null, Integer.MAX_VALUE);
+ resultAsDouble = result.doubleValue();
+ }
+ if (result == null || (valAsNum != null && valAsDouble > resultAsDouble)) {
+ result = valAsNum;
+ resultAsDouble = valAsDouble;
+ }
+ }
+ return withDefault(result, Integer.MAX_VALUE);
+ }
+
+ /**
+ * If the entire time-window is not covered by the given values, then returns Integer.MIN_VALUE
+ */
+ private <T extends Number> T minInWindow(List<TimestampedValue<T>> vals, Duration timeWindow) {
+ long now = System.currentTimeMillis();
+ long epoch = now - timeWindow.toMilliseconds();
+ T result = null;
+ double resultAsDouble = Integer.MIN_VALUE;
+ for (TimestampedValue<T> val : vals) {
+ T valAsNum = val.getValue();
+ double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0;
+ if (result == null && val.getTimestamp() > epoch) {
+ result = withDefault(null, Integer.MIN_VALUE);
+ resultAsDouble = result.doubleValue();
+ }
+ if (result == null || (val.getValue() != null && valAsDouble < resultAsDouble)) {
+ result = valAsNum;
+ resultAsDouble = valAsDouble;
+ }
+ }
+ return withDefault(result, Integer.MIN_VALUE);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> T withDefault(T result, Integer defaultValue) {
+ return result!=null ? result : (T) defaultValue;
+ }
+ /**
+ * @return null if empty, or the most recent value
+ */
+ private <T extends Number> T latestInWindow(List<TimestampedValue<T>> vals) {
+ return vals.isEmpty() ? null : vals.get(vals.size()-1).getValue();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java
new file mode 100644
index 0000000..fef3d7f
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/DefaultFollowTheSunModel.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.followthesun;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.brooklyn.api.location.Location;
+import org.apache.brooklyn.location.basic.AbstractLocation;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+
+public class DefaultFollowTheSunModel<ContainerType, ItemType> implements FollowTheSunModel<ContainerType, ItemType> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultFollowTheSunModel.class);
+
+ // Concurrent maps cannot have null value; use this to represent when no container is supplied for an item
+ private static final String NULL = "null-val";
+ private static final Location NULL_LOCATION = new AbstractLocation(newHashMap("name","null-location")) {};
+
+ private final String name;
+ private final Set<ContainerType> containers = Collections.newSetFromMap(new ConcurrentHashMap<ContainerType,Boolean>());
+ private final Map<ItemType, ContainerType> itemToContainer = new ConcurrentHashMap<ItemType, ContainerType>();
+ private final Map<ContainerType, Location> containerToLocation = new ConcurrentHashMap<ContainerType, Location>();
+ private final Map<ItemType, Location> itemToLocation = new ConcurrentHashMap<ItemType, Location>();
+ private final Map<ItemType, Map<? extends ItemType, Double>> itemUsage = new ConcurrentHashMap<ItemType, Map<? extends ItemType,Double>>();
+ private final Set<ItemType> immovableItems = Collections.newSetFromMap(new ConcurrentHashMap<ItemType, Boolean>());
+
+ public DefaultFollowTheSunModel(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public Set<ItemType> getItems() {
+ return itemToContainer.keySet();
+ }
+
+ @Override
+ public ContainerType getItemContainer(ItemType item) {
+ ContainerType result = itemToContainer.get(item);
+ return (isNull(result) ? null : result);
+ }
+
+ @Override
+ public Location getItemLocation(ItemType item) {
+ Location result = itemToLocation.get(item);
+ return (isNull(result) ? null : result);
+ }
+
+ @Override
+ public Location getContainerLocation(ContainerType container) {
+ Location result = containerToLocation.get(container);
+ return (isNull(result) ? null : result);
+ }
+
+ // Provider methods.
+
+ @Override public String getName() {
+ return name;
+ }
+
+ // TODO: delete?
+ @Override public String getName(ItemType item) {
+ return item.toString();
+ }
+
+ @Override public boolean isItemMoveable(ItemType item) {
+ // If don't know about item, then assume not movable; otherwise has this item been explicitly flagged as immovable?
+ return hasItem(item) && !immovableItems.contains(item);
+ }
+
+ @Override public boolean isItemAllowedIn(ItemType item, Location location) {
+ return true; // TODO?
+ }
+
+ @Override public boolean hasActiveMigration(ItemType item) {
+ return false; // TODO?
+ }
+
+ @Override
+ // FIXME Too expensive to compute; store in a different data structure?
+ public Map<ItemType, Map<Location, Double>> getDirectSendsToItemByLocation() {
+ Map<ItemType, Map<Location, Double>> result = new LinkedHashMap<ItemType, Map<Location,Double>>(getNumItems());
+
+ for (Map.Entry<ItemType, Map<? extends ItemType, Double>> entry : itemUsage.entrySet()) {
+ ItemType targetItem = entry.getKey();
+ Map<? extends ItemType, Double> sources = entry.getValue();
+ if (sources.isEmpty()) continue; // no-one talking to us
+
+ Map<Location, Double> targetUsageByLocation = new LinkedHashMap<Location, Double>();
+ result.put(targetItem, targetUsageByLocation);
+
+ for (Map.Entry<? extends ItemType, Double> entry2 : sources.entrySet()) {
+ ItemType sourceItem = entry2.getKey();
+ Location sourceLocation = getItemLocation(sourceItem);
+ double usageVal = (entry.getValue() != null) ? entry2.getValue() : 0d;
+ if (sourceLocation == null) continue; // don't know where to attribute this load; e.g. item may have just terminated
+ if (sourceItem.equals(targetItem)) continue; // ignore msgs to self
+
+ Double usageValTotal = targetUsageByLocation.get(sourceLocation);
+ double newUsageValTotal = (usageValTotal != null ? usageValTotal : 0d) + usageVal;
+ targetUsageByLocation.put(sourceLocation, newUsageValTotal);
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public Set<ContainerType> getAvailableContainersFor(ItemType item, Location location) {
+ checkNotNull(location);
+ return getContainersInLocation(location);
+ }
+
+
+ // Mutators.
+
+ @Override
+ public void onItemMoved(ItemType item, ContainerType newContainer) {
+ // idempotent, as may be called multiple times
+ Location newLocation = (newContainer != null) ? containerToLocation.get(newContainer) : null;
+ ContainerType newContainerNonNull = toNonNullContainer(newContainer);
+ Location newLocationNonNull = toNonNullLocation(newLocation);
+ ContainerType oldContainer = itemToContainer.put(item, newContainerNonNull);
+ Location oldLocation = itemToLocation.put(item, newLocationNonNull);
+ }
+
+ @Override
+ public void onContainerAdded(ContainerType container, Location location) {
+ Location locationNonNull = toNonNullLocation(location);
+ containers.add(container);
+ containerToLocation.put(container, locationNonNull);
+ for (ItemType item : getItemsOnContainer(container)) {
+ itemToLocation.put(item, locationNonNull);
+ }
+ }
+
+ @Override
+ public void onContainerRemoved(ContainerType container) {
+ containers.remove(container);
+ containerToLocation.remove(container);
+ }
+
+ public void onContainerLocationUpdated(ContainerType container, Location location) {
+ if (!containers.contains(container)) {
+ // unknown container; probably just stopped?
+ // If this overtook onContainerAdded, then assume we'll lookup the location and get it right in onContainerAdded
+ if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of location for unknown container {}, to {}", container, location);
+ return;
+ }
+ Location locationNonNull = toNonNullLocation(location);
+ containerToLocation.put(container, locationNonNull);
+ for (ItemType item : getItemsOnContainer(container)) {
+ itemToLocation.put(item, locationNonNull);
+ }
+ }
+
+ @Override
+ public void onItemAdded(ItemType item, ContainerType container, boolean immovable) {
+ // idempotent, as may be called multiple times
+
+ if (immovable) {
+ immovableItems.add(item);
+ }
+ Location location = (container != null) ? containerToLocation.get(container) : null;
+ ContainerType containerNonNull = toNonNullContainer(container);
+ Location locationNonNull = toNonNullLocation(location);
+ ContainerType oldContainer = itemToContainer.put(item, containerNonNull);
+ Location oldLocation = itemToLocation.put(item, locationNonNull);
+ }
+
+ @Override
+ public void onItemRemoved(ItemType item) {
+ itemToContainer.remove(item);
+ itemToLocation.remove(item);
+ itemUsage.remove(item);
+ immovableItems.remove(item);
+ }
+
+ @Override
+ public void onItemUsageUpdated(ItemType item, Map<? extends ItemType, Double> newValue) {
+ if (hasItem(item)) {
+ itemUsage.put(item, newValue);
+ } else {
+ // Can happen when item removed - get notification of removal and workrate from group and item
+ // respectively, so can overtake each other
+ if (LOG.isDebugEnabled()) LOG.debug("Ignoring setting of usage for unknown item {}, to {}", item, newValue);
+ }
+ }
+
+
+ // Additional methods for tests.
+
+ /**
+ * Warning: this can be an expensive (time and memory) operation if there are a lot of items/containers.
+ */
+ @VisibleForTesting
+ public String itemDistributionToString() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ dumpItemDistribution(new PrintStream(baos));
+ return new String(baos.toByteArray());
+ }
+
+ @VisibleForTesting
+ public void dumpItemDistribution() {
+ dumpItemDistribution(System.out);
+ }
+
+ @VisibleForTesting
+ public void dumpItemDistribution(PrintStream out) {
+ Map<ItemType, Map<Location, Double>> directSendsToItemByLocation = getDirectSendsToItemByLocation();
+
+ out.println("Follow-The-Sun dump: ");
+ for (Location location: getLocations()) {
+ out.println("\t"+"Location "+location);
+ for (ContainerType container : getContainersInLocation(location)) {
+ out.println("\t\t"+"Container "+container);
+ for (ItemType item : getItemsOnContainer(container)) {
+ Map<Location, Double> inboundUsage = directSendsToItemByLocation.get(item);
+ Map<? extends ItemType, Double> outboundUsage = itemUsage.get(item);
+ double totalInboundByLocation = (inboundUsage != null) ? sum(inboundUsage.values()) : 0d;
+ double totalInboundByActor = (outboundUsage != null) ? sum(outboundUsage.values()) : 0d;
+ out.println("\t\t\t"+"Item "+item);
+ out.println("\t\t\t\t"+"Inbound-by-location: "+totalInboundByLocation+": "+inboundUsage);
+ out.println("\t\t\t\t"+"Inbound-by-actor: "+totalInboundByActor+": "+outboundUsage);
+ }
+ }
+ }
+ out.flush();
+ }
+
+ private boolean hasItem(ItemType item) {
+ return itemToContainer.containsKey(item);
+ }
+
+ private Set<Location> getLocations() {
+ return ImmutableSet.copyOf(containerToLocation.values());
+ }
+
+ private Set<ContainerType> getContainersInLocation(Location location) {
+ Set<ContainerType> result = new LinkedHashSet<ContainerType>();
+ for (Map.Entry<ContainerType, Location> entry : containerToLocation.entrySet()) {
+ if (location.equals(entry.getValue())) {
+ result.add(entry.getKey());
+ }
+ }
+ return result;
+ }
+
+ private Set<ItemType> getItemsOnContainer(ContainerType container) {
+ Set<ItemType> result = new LinkedHashSet<ItemType>();
+ for (Map.Entry<ItemType, ContainerType> entry : itemToContainer.entrySet()) {
+ if (container.equals(entry.getValue())) {
+ result.add(entry.getKey());
+ }
+ }
+ return result;
+ }
+
+ private int getNumItems() {
+ return itemToContainer.size();
+ }
+
+ @SuppressWarnings("unchecked")
+ private ContainerType nullContainer() {
+ return (ContainerType) NULL; // relies on erasure
+ }
+
+ private Location nullLocation() {
+ return NULL_LOCATION;
+ }
+
+ private ContainerType toNonNullContainer(ContainerType val) {
+ return (val != null) ? val : nullContainer();
+ }
+
+ private Location toNonNullLocation(Location val) {
+ return (val != null) ? val : nullLocation();
+ }
+
+ private boolean isNull(Object val) {
+ return val == NULL || val == NULL_LOCATION;
+ }
+
+ // TODO Move to utils; or stop AbstractLocation from removing things from the map!
+ public static <K,V> Map<K,V> newHashMap(K k, V v) {
+ Map<K,V> result = Maps.newLinkedHashMap();
+ result.put(k, v);
+ return result;
+ }
+
+ public static double sum(Collection<? extends Number> values) {
+ double total = 0;
+ for (Number d : values) {
+ total += d.doubleValue();
+ }
+ return total;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java
new file mode 100644
index 0000000..07c6ed0
--- /dev/null
+++ b/policy/src/main/java/org/apache/brooklyn/policy/followthesun/FollowTheSunModel.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.policy.followthesun;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.brooklyn.api.location.Location;
+
+/**
+ * Captures the state of items, containers and locations for the purpose of moving items around
+ * to minimise latency. For consumption by a {@link FollowTheSunStrategy}.
+ */
+public interface FollowTheSunModel<ContainerType, ItemType> {
+
+ // Attributes of the pool.
+ public String getName();
+
+ // Attributes of containers and items.
+ public String getName(ItemType item);
+ public Set<ItemType> getItems();
+ public Map<ItemType, Map<Location, Double>> getDirectSendsToItemByLocation();
+ public Location getItemLocation(ItemType item);
+ public ContainerType getItemContainer(ItemType item);
+ public Location getContainerLocation(ContainerType container);
+ public boolean hasActiveMigration(ItemType item);
+ public Set<ContainerType> getAvailableContainersFor(ItemType item, Location location);
+ public boolean isItemMoveable(ItemType item);
+ public boolean isItemAllowedIn(ItemType item, Location location);
+
+ // Mutators for keeping the model in-sync with the observed world
+ public void onContainerAdded(ContainerType container, Location location);
+ public void onContainerRemoved(ContainerType container);
+ public void onContainerLocationUpdated(ContainerType container, Location location);
+
+ public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable);
+ public void onItemRemoved(ItemType item);
+ public void onItemUsageUpdated(ItemType item, Map<? extends ItemType, Double> newValues);
+ public void onItemMoved(ItemType item, ContainerType newContainer);
+}