You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/06/10 20:07:11 UTC
[1/3] incubator-brooklyn git commit: AutoScaler now takes config keys
for iteration increment and iteration max
Repository: incubator-brooklyn
Updated Branches:
refs/heads/master f7b90474a -> 284b763d2
AutoScaler now takes config keys for iteration increment and iteration max
e.g. to say increase in batches of 10, decrease in batches of 5
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/c1a5df7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/c1a5df7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/c1a5df7d
Branch: refs/heads/master
Commit: c1a5df7dda1bd2215975b2717ad3579ada99bb77
Parents: 54af993
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Tue May 26 06:26:04 2015 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Tue Jun 2 19:18:09 2015 +0200
----------------------------------------------------------------------
.../policy/basic/AbstractEntityAdjunct.java | 6 +-
.../policy/autoscaling/AutoScalerPolicy.java | 289 +++++++++++++------
.../policy/autoscaling/SizeHistory.java | 12 +-
.../autoscaling/AutoScalerPolicyTest.java | 114 +++++++-
.../autoscaling/LocallyResizableEntity.java | 1 +
5 files changed, 320 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c1a5df7d/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java b/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java
index 638818e..b14fd3e 100644
--- a/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java
+++ b/core/src/main/java/brooklyn/policy/basic/AbstractEntityAdjunct.java
@@ -19,7 +19,6 @@
package brooklyn.policy.basic;
import static brooklyn.util.GroovyJavaMethods.truth;
-import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import java.util.Collection;
@@ -326,6 +325,11 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple
return configsInternal;
}
+ /**
+ * Invoked whenever a config change is applied after management is started.
+ * Default implementation throws an exception to disallow the change.
+ * Can be overridden to return (allowing the change) or to make other changes
+ * (if necessary), and of course it can do this selectively and call the super to disallow any others. */
protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) {
throw new UnsupportedOperationException("reconfiguring "+key+" unsupported for "+this);
}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c1a5df7d/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
index 8727f54..bbf1249 100644
--- a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
+++ b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
@@ -90,6 +90,10 @@ public class AutoScalerPolicy extends AbstractPolicy {
private Number metricLowerBound;
private int minPoolSize = 0;
private int maxPoolSize = Integer.MAX_VALUE;
+ private Integer resizeDownIterationIncrement;
+ private Integer resizeDownIterationMax;
+ private Integer resizeUpIterationIncrement;
+ private Integer resizeUpIterationMax;
private Duration minPeriodBetweenExecs;
private Duration resizeUpStabilizationDelay;
private Duration resizeDownStabilizationDelay;
@@ -135,6 +139,20 @@ public class AutoScalerPolicy extends AbstractPolicy {
maxPoolSize = max;
return this;
}
+
+ public Builder resizeUpIterationIncrement(Integer val) {
+ this.resizeUpIterationIncrement = val; return this;
+ }
+ public Builder resizeUpIterationMax(Integer val) {
+ this.resizeUpIterationMax = val; return this;
+ }
+ public Builder resizeDownIterationIncrement(Integer val) {
+ this.resizeUpIterationIncrement = val; return this;
+ }
+ public Builder resizeDownIterationMax(Integer val) {
+ this.resizeUpIterationMax = val; return this;
+ }
+
/**
* @deprecated since 0.7; use {@link #minPeriodBetweenExecs(Duration)}
*/
@@ -210,6 +228,10 @@ public class AutoScalerPolicy extends AbstractPolicy {
.putIfNotNull("metricLowerBound", metricLowerBound)
.putIfNotNull("minPoolSize", minPoolSize)
.putIfNotNull("maxPoolSize", maxPoolSize)
+ .putIfNotNull("resizeUpIterationMax", resizeUpIterationMax)
+ .putIfNotNull("resizeUpIterationIncrement", resizeUpIterationIncrement)
+ .putIfNotNull("resizeDownIterationMax", resizeDownIterationMax)
+ .putIfNotNull("resizeDownIterationIncrement", resizeDownIterationIncrement)
.putIfNotNull("minPeriodBetweenExecs", minPeriodBetweenExecs)
.putIfNotNull("resizeUpStabilizationDelay", resizeUpStabilizationDelay)
.putIfNotNull("resizeDownStabilizationDelay", resizeDownStabilizationDelay)
@@ -260,6 +282,7 @@ public class AutoScalerPolicy extends AbstractPolicy {
public static final String POOL_LOW_THRESHOLD_KEY = "pool.low.threshold";
public static final String POOL_CURRENT_WORKRATE_KEY = "pool.current.workrate";
+ @SuppressWarnings("serial")
@SetFromFlag("metric")
public static final ConfigKey<AttributeSensor<? extends Number>> METRIC = BasicConfigKey.builder(new TypeToken<AttributeSensor<? extends Number>>() {})
.name("autoscaler.metric")
@@ -282,6 +305,35 @@ public class AutoScalerPolicy extends AbstractPolicy {
.reconfigurable(true)
.build();
+ @SetFromFlag("resizeUpIterationIncrement")
+ public static final ConfigKey<Integer> RESIZE_UP_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class)
+ .name("autoscaler.resizeUpIterationIncrement")
+ .description("Batch size for resizing up; the size will be increased by a mulitiple of this value")
+ .defaultValue(1)
+ .reconfigurable(true)
+ .build();
+ @SetFromFlag("resizeUpIterationMax")
+ public static final ConfigKey<Integer> RESIZE_UP_ITERATION_MAX = BasicConfigKey.builder(Integer.class)
+ .name("autoscaler.resizeUpIterationMax")
+ .defaultValue(Integer.MAX_VALUE)
+ .description("Maximum change to the size on a single iteration when scaling up")
+ .reconfigurable(true)
+ .build();
+ @SetFromFlag("resizeDownIterationIncrement")
+ public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class)
+ .name("autoscaler.resizeDownIterationIncrement")
+ .description("Batch size for resizing down; the size will be decreased by a mulitiple of this value")
+ .defaultValue(1)
+ .reconfigurable(true)
+ .build();
+ @SetFromFlag("resizeDownIterationMax")
+ public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_MAX = BasicConfigKey.builder(Integer.class)
+ .name("autoscaler.resizeDownIterationMax")
+ .defaultValue(Integer.MAX_VALUE)
+ .description("Maximum change to the size on a single iteration when scaling down")
+ .reconfigurable(true)
+ .build();
+
@SetFromFlag("minPeriodBetweenExecs")
public static final ConfigKey<Duration> MIN_PERIOD_BETWEEN_EXECS = BasicConfigKey.builder(Duration.class)
.name("autoscaler.minPeriodBetweenExecs")
@@ -325,6 +377,7 @@ public class AutoScalerPolicy extends AbstractPolicy {
}})
.build();
+ @SuppressWarnings("serial")
@SetFromFlag("currentSizeOperator")
public static final ConfigKey<Function<Entity,Integer>> CURRENT_SIZE_OPERATOR = BasicConfigKey.builder(new TypeToken<Function<Entity,Integer>>() {})
.name("autoscaler.currentSizeOperator")
@@ -334,24 +387,28 @@ public class AutoScalerPolicy extends AbstractPolicy {
}})
.build();
+ @SuppressWarnings("serial")
@SetFromFlag("poolHotSensor")
public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_HOT_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {})
.name("autoscaler.poolHotSensor")
.defaultValue(DEFAULT_POOL_HOT_SENSOR)
.build();
+ @SuppressWarnings("serial")
@SetFromFlag("poolColdSensor")
public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_COLD_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {})
.name("autoscaler.poolColdSensor")
.defaultValue(DEFAULT_POOL_COLD_SENSOR)
.build();
+ @SuppressWarnings("serial")
@SetFromFlag("poolOkSensor")
public static final ConfigKey<BasicNotificationSensor<? extends Map>> POOL_OK_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? extends Map>>() {})
.name("autoscaler.poolOkSensor")
.defaultValue(DEFAULT_POOL_OK_SENSOR)
.build();
+ @SuppressWarnings("serial")
@SetFromFlag("maxSizeReachedSensor")
public static final ConfigKey<BasicNotificationSensor<? super MaxPoolSizeReachedEvent>> MAX_SIZE_REACHED_SENSOR = BasicConfigKey.builder(new TypeToken<BasicNotificationSensor<? super MaxPoolSizeReachedEvent>>() {})
.name("autoscaler.maxSizeReachedSensor")
@@ -443,6 +500,31 @@ public class AutoScalerPolicy extends AbstractPolicy {
config().set(METRIC_UPPER_BOUND, checkNotNull(val));
}
+ private <T> void setOrDefault(ConfigKey<T> key, T val) {
+ if (val==null) val = key.getDefaultValue();
+ config().set(key, val);
+ }
+ public int getResizeUpIterationIncrement() { return getConfig(RESIZE_UP_ITERATION_INCREMENT); }
+ public void setResizeUpIterationIncrement(Integer val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpIterationIncrement from {} to {}", new Object[] {this, getResizeUpIterationIncrement(), val});
+ setOrDefault(RESIZE_UP_ITERATION_INCREMENT, val);
+ }
+ public int getResizeDownIterationIncrement() { return getConfig(RESIZE_DOWN_ITERATION_INCREMENT); }
+ public void setResizeDownIterationIncrement(Integer val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownIterationIncrement from {} to {}", new Object[] {this, getResizeDownIterationIncrement(), val});
+ setOrDefault(RESIZE_DOWN_ITERATION_INCREMENT, val);
+ }
+ public int getResizeUpIterationMax() { return getConfig(RESIZE_UP_ITERATION_MAX); }
+ public void setResizeUpIterationMax(Integer val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing resizeUpIterationMax from {} to {}", new Object[] {this, getResizeUpIterationMax(), val});
+ setOrDefault(RESIZE_UP_ITERATION_MAX, val);
+ }
+ public int getResizeDownIterationMax() { return getConfig(RESIZE_DOWN_ITERATION_MAX); }
+ public void setResizeDownIterationMax(Integer val) {
+ if (LOG.isInfoEnabled()) LOG.info("{} changing resizeDownIterationMax from {} to {}", new Object[] {this, getResizeDownIterationMax(), val});
+ setOrDefault(RESIZE_DOWN_ITERATION_MAX, val);
+ }
+
/**
* @deprecated since 0.7.0; use {@link #setMinPeriodBetweenExecs(Duration)}
*/
@@ -566,7 +648,9 @@ public class AutoScalerPolicy extends AbstractPolicy {
// Rely on next metric-change to trigger recalculation;
// and same for those below...
} else if (key.equals(METRIC_UPPER_BOUND)) {
-
+ // see above
+ } else if (key.equals(RESIZE_UP_ITERATION_INCREMENT) || key.equals(RESIZE_UP_ITERATION_MAX) || key.equals(RESIZE_DOWN_ITERATION_INCREMENT) || key.equals(RESIZE_DOWN_ITERATION_MAX)) {
+ // no special actions needed
} else if (key.equals(MIN_POOL_SIZE)) {
int newMin = (Integer) val;
if (newMin > getConfig(MAX_POOL_SIZE)) {
@@ -653,6 +737,29 @@ public class AutoScalerPolicy extends AbstractPolicy {
}});
}
}
+
+ private enum ScalingType { HOT, COLD }
+ private static class ScalingData {
+ ScalingType scalingMode;
+ int currentSize;
+ double currentMetricValue;
+ Double metricUpperBound;
+ Double metricLowerBound;
+
+ public double getCurrentTotalActivity() {
+ return currentMetricValue * currentSize;
+ }
+
+ public boolean isHot() {
+ return ((scalingMode==null || scalingMode==ScalingType.HOT) && isValid(metricUpperBound) && currentMetricValue > metricUpperBound);
+ }
+ public boolean isCold() {
+ return ((scalingMode==null || scalingMode==ScalingType.COLD) && isValid(metricLowerBound) && currentMetricValue < metricLowerBound);
+ }
+ private boolean isValid(Double bound) {
+ return (bound!=null && bound>0);
+ }
+ }
private void onMetricChanged(Number val) {
if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-metric for {}: {}", new Object[] {this, poolEntity, val});
@@ -663,13 +770,38 @@ public class AutoScalerPolicy extends AbstractPolicy {
return;
}
- double currentMetricD = val.doubleValue();
- double metricUpperBoundD = getMetricUpperBound().doubleValue();
- double metricLowerBoundD = getMetricLowerBound().doubleValue();
- int currentSize = getCurrentSizeOperator().apply(entity);
- double currentTotalActivity = currentSize * currentMetricD;
- int unboundedSize;
- int desiredSize;
+ ScalingData data = new ScalingData();
+ data.currentMetricValue = val.doubleValue();
+ data.currentSize = getCurrentSizeOperator().apply(entity);
+ data.metricUpperBound = getMetricUpperBound().doubleValue();
+ data.metricLowerBound = getMetricLowerBound().doubleValue();
+
+ analyze(data, "pool");
+ }
+
+ private void onPoolCold(Map<String, ?> properties) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-cold for {}: {}", new Object[] {this, poolEntity, properties});
+ analyzeOnHotOrColdSensor(ScalingType.COLD, "cold pool", properties);
+ }
+
+ private void onPoolHot(Map<String, ?> properties) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-hot for {}: {}", new Object[] {this, poolEntity, properties});
+ analyzeOnHotOrColdSensor(ScalingType.HOT, "hot pool", properties);
+ }
+
+ private void analyzeOnHotOrColdSensor(ScalingType scalingMode, String description, Map<String, ?> properties) {
+ ScalingData data = new ScalingData();
+ data.scalingMode = scalingMode;
+ data.currentMetricValue = (Double) properties.get(POOL_CURRENT_WORKRATE_KEY);
+ data.currentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY);
+ data.metricUpperBound = (Double) properties.get(POOL_HIGH_THRESHOLD_KEY);
+ data.metricLowerBound = (Double) properties.get(POOL_LOW_THRESHOLD_KEY);
+
+ analyze(data, description);
+ }
+
+ private void analyze(ScalingData data, String description) {
+ int desiredSizeUnconstrained;
/* We always scale out (modulo stabilization delay) if:
* currentTotalActivity > currentSize*metricUpperBound
@@ -685,87 +817,86 @@ public class AutoScalerPolicy extends AbstractPolicy {
* n*metricUpperBound >= currentTotalActivity
* thus n := Math.max ( floor(currentTotalActiviy/metricLowerBound), ceil(currentTotal/metricUpperBound) )
*/
- if (currentMetricD > metricUpperBoundD) {
+ if (data.isHot()) {
// scale out
- unboundedSize = (int)Math.ceil(currentTotalActivity/metricUpperBoundD);
- desiredSize = toBoundedDesiredPoolSize(unboundedSize);
- if (desiredSize > currentSize) {
- if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing out pool {} from {} to {} ({} > {})", new Object[] {this, poolEntity, currentSize, desiredSize, currentMetricD, metricUpperBoundD});
- scheduleResize(desiredSize);
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} > {} > {}, but scale-out blocked eg by bounds/check)", new Object[] {this, poolEntity, currentSize, currentMetricD, metricUpperBoundD, metricLowerBoundD});
- }
- onNewUnboundedPoolSize(unboundedSize);
+ desiredSizeUnconstrained = (int)Math.ceil(data.getCurrentTotalActivity() / data.metricUpperBound);
+ data.scalingMode = ScalingType.HOT;
- } else if (currentMetricD < metricLowerBoundD) {
+ } else if (data.isCold()) {
// scale back
- unboundedSize = (int)Math.floor(currentTotalActivity/metricLowerBoundD);
- desiredSize = toBoundedDesiredPoolSize(unboundedSize);
- if (desiredSize < currentTotalActivity/metricUpperBoundD) {
- // this desired size would cause scale-out on next run, ie thrashing, so tweak
- if (LOG.isTraceEnabled()) LOG.trace("{} resizing back pool {} from {}, tweaking from {} to prevent thrashing", new Object[] {this, poolEntity, currentSize, desiredSize });
- desiredSize = (int)Math.ceil(currentTotalActivity/metricUpperBoundD);
- desiredSize = toBoundedDesiredPoolSize(desiredSize);
+ desiredSizeUnconstrained = (int)Math.floor(data.getCurrentTotalActivity() / data.metricLowerBound);
+ data.scalingMode = ScalingType.COLD;
+
+ } else {
+ if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} within range {}..{})", new Object[] {this, poolEntity, data.currentSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound});
+ abortResize(data.currentSize);
+ return; // within the healthy range; no-op
+ }
+
+ if (LOG.isTraceEnabled()) LOG.debug("{} detected unconstrained desired size {}", new Object[] {this, desiredSizeUnconstrained});
+ int desiredSize = applyMinMaxConstraints(desiredSizeUnconstrained);
+
+ if ((data.scalingMode==ScalingType.COLD) && (desiredSize < data.currentSize)) {
+
+ int delta = data.currentSize - desiredSize;
+ int scaleIncrement = getResizeDownIterationIncrement();
+ int scaleMax = getResizeDownIterationMax();
+ if (delta>scaleMax) {
+ delta=scaleMax;
+ } else if (delta % scaleIncrement != 0) {
+ // keep scaling to the increment
+ delta += scaleIncrement - (delta % scaleIncrement);
}
- if (desiredSize < currentSize) {
- if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing back pool {} from {} to {} ({} < {})", new Object[] {this, poolEntity, currentSize, desiredSize, currentMetricD, metricLowerBoundD});
- scheduleResize(desiredSize);
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} < {} < {}, but scale-back blocked eg by bounds/check)", new Object[] {this, poolEntity, currentSize, currentMetricD, metricLowerBoundD, metricUpperBoundD});
+ desiredSize = data.currentSize - delta;
+
+ if (data.metricUpperBound!=null) {
+ // if upper bound supplied, check that this desired scale-back size
+ // is not going to cause scale-out on next run; i.e. anti-thrashing
+ while (desiredSize < data.currentSize && data.getCurrentTotalActivity() > data.metricUpperBound * desiredSize) {
+ if (LOG.isTraceEnabled()) LOG.trace("{} when resizing back pool {} from {}, tweaking from {} to prevent thrashing", new Object[] {this, poolEntity, data.currentSize, desiredSize });
+ desiredSize += scaleIncrement;
+ }
}
- onNewUnboundedPoolSize(unboundedSize);
+ desiredSize = applyMinMaxConstraints(desiredSize);
+ if (desiredSize >= data.currentSize) data.scalingMode = null;
+ } else if ((data.scalingMode==ScalingType.HOT) && (desiredSize > data.currentSize)) {
+
+ int delta = desiredSize - data.currentSize;
+ int scaleIncrement = getResizeUpIterationIncrement();
+ int scaleMax = getResizeUpIterationMax();
+ if (delta>scaleMax) {
+ delta=scaleMax;
+ } else if (delta % scaleIncrement != 0) {
+ // keep scaling to the increment
+ delta += scaleIncrement - (delta % scaleIncrement);
+ }
+ desiredSize = data.currentSize + delta;
+ desiredSize = applyMinMaxConstraints(desiredSize);
+ if (desiredSize <= data.currentSize) data.scalingMode = null;
+
} else {
- if (LOG.isTraceEnabled()) LOG.trace("{} not resizing pool {} from {} ({} within range {}..{})", new Object[] {this, poolEntity, currentSize, currentMetricD, metricLowerBoundD, metricUpperBoundD});
- abortResize(currentSize);
- return; // within a health range; no-op
+ data.scalingMode = null;
}
- }
- private void onPoolCold(Map<String, ?> properties) {
- if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-cold for {}: {}", new Object[] {this, poolEntity, properties});
-
- int poolCurrentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY);
- double poolCurrentWorkrate = (Double) properties.get(POOL_CURRENT_WORKRATE_KEY);
- double poolLowThreshold = (Double) properties.get(POOL_LOW_THRESHOLD_KEY);
-
- // Shrink the pool to force its low threshold to fall below the current workrate.
- // NOTE: assumes the pool is homogeneous for now.
- int unboundedPoolSize = (int) Math.ceil(poolCurrentWorkrate / (poolLowThreshold/poolCurrentSize));
- int desiredPoolSize = toBoundedDesiredPoolSize(unboundedPoolSize);
-
- if (desiredPoolSize < poolCurrentSize) {
- if (LOG.isTraceEnabled()) LOG.trace("{} resizing cold pool {} from {} to {}", new Object[] {this, poolEntity, poolCurrentSize, desiredPoolSize});
- scheduleResize(desiredPoolSize);
+ if (data.scalingMode!=null) {
+ if (LOG.isDebugEnabled()) LOG.debug("{} provisionally resizing {} {} from {} to {} ({} < {}; ideal size {})", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, desiredSizeUnconstrained});
+ scheduleResize(desiredSize);
} else {
- if (LOG.isTraceEnabled()) LOG.trace("{} not resizing cold pool {} from {} to {}", new Object[] {this, poolEntity, poolCurrentSize, desiredPoolSize});
- abortResize(poolCurrentSize);
+ if (LOG.isTraceEnabled()) LOG.trace("{} not resizing {} {} from {} to {}, {} out of healthy range {}..{} but unconstrained size {} blocked by bounds/check", new Object[] {this, description, poolEntity, data.currentSize, desiredSize, data.currentMetricValue, data.metricLowerBound, data.metricUpperBound, desiredSizeUnconstrained});
+ abortResize(data.currentSize);
+ // but add to the unbounded record for future consideration
}
- onNewUnboundedPoolSize(unboundedPoolSize);
+ onNewUnboundedPoolSize(desiredSizeUnconstrained);
}
-
- private void onPoolHot(Map<String, ?> properties) {
- if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-hot for {}: {}", new Object[] {this, poolEntity, properties});
-
- int poolCurrentSize = (Integer) properties.get(POOL_CURRENT_SIZE_KEY);
- double poolCurrentWorkrate = (Double) properties.get(POOL_CURRENT_WORKRATE_KEY);
- double poolHighThreshold = (Double) properties.get(POOL_HIGH_THRESHOLD_KEY);
-
- // Grow the pool to force its high threshold to rise above the current workrate.
- // FIXME: assumes the pool is homogeneous for now.
- int unboundedPoolSize = (int) Math.ceil(poolCurrentWorkrate / (poolHighThreshold/poolCurrentSize));
- int desiredPoolSize = toBoundedDesiredPoolSize(unboundedPoolSize);
- if (desiredPoolSize > poolCurrentSize) {
- if (LOG.isTraceEnabled()) LOG.trace("{} resizing hot pool {} from {} to {}", new Object[] {this, poolEntity, poolCurrentSize, desiredPoolSize});
- scheduleResize(desiredPoolSize);
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} not resizing hot pool {} from {} to {}", new Object[] {this, poolEntity, poolCurrentSize, desiredPoolSize});
- abortResize(poolCurrentSize);
- }
- onNewUnboundedPoolSize(unboundedPoolSize);
+
+ private int applyMinMaxConstraints(int desiredSize) {
+ desiredSize = Math.max(getMinPoolSize(), desiredSize);
+ desiredSize = Math.min(getMaxPoolSize(), desiredSize);
+ return desiredSize;
}
-
+
private void onPoolOk(Map<String, ?> properties) {
if (LOG.isTraceEnabled()) LOG.trace("{} recording pool-ok for {}: {}", new Object[] {this, poolEntity, properties});
@@ -774,12 +905,6 @@ public class AutoScalerPolicy extends AbstractPolicy {
if (LOG.isTraceEnabled()) LOG.trace("{} not resizing ok pool {} from {}", new Object[] {this, poolEntity, poolCurrentSize});
abortResize(poolCurrentSize);
}
-
- private int toBoundedDesiredPoolSize(int size) {
- int result = Math.max(getMinPoolSize(), size);
- result = Math.min(getMaxPoolSize(), result);
- return result;
- }
/**
* Schedules a resize, if there is not already a resize operation queued up. When that resize
@@ -796,7 +921,7 @@ public class AutoScalerPolicy extends AbstractPolicy {
* If a listener is registered to be notified of the max-pool-size cap being reached, then record
* what our unbounded size would be and schedule a check to see if this unbounded size is sustained.
*
- * Piggie backs off the existing scheduleResize execution, which now also checks if the listener
+ * Piggy-backs off the existing scheduleResize execution, which now also checks if the listener
* needs to be called.
*/
private void onNewUnboundedPoolSize(final int val) {
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c1a5df7d/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java b/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java
index 5b633de..525ba20 100644
--- a/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java
+++ b/policy/src/main/java/brooklyn/policy/autoscaling/SizeHistory.java
@@ -119,7 +119,7 @@ public class SizeHistory {
T valAsNum = val.getValue();
double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0;
if (result == null && val.getTimestamp() > epoch) {
- result = (T) Integer.valueOf(Integer.MAX_VALUE);
+ result = withDefault(null, Integer.MAX_VALUE);
resultAsDouble = result.doubleValue();
}
if (result == null || (valAsNum != null && valAsDouble > resultAsDouble)) {
@@ -127,7 +127,7 @@ public class SizeHistory {
resultAsDouble = valAsDouble;
}
}
- return (T) (result != null ? result : Integer.MAX_VALUE);
+ return withDefault(result, Integer.MAX_VALUE);
}
/**
@@ -142,7 +142,7 @@ public class SizeHistory {
T valAsNum = val.getValue();
double valAsDouble = (valAsNum != null) ? valAsNum.doubleValue() : 0;
if (result == null && val.getTimestamp() > epoch) {
- result = (T) Integer.valueOf(Integer.MIN_VALUE);
+ result = withDefault(null, Integer.MIN_VALUE);
resultAsDouble = result.doubleValue();
}
if (result == null || (val.getValue() != null && valAsDouble < resultAsDouble)) {
@@ -150,9 +150,13 @@ public class SizeHistory {
resultAsDouble = valAsDouble;
}
}
- return (T) (result != null ? result : Integer.MIN_VALUE);
+ return withDefault(result, Integer.MIN_VALUE);
}
+ @SuppressWarnings("unchecked")
+ private <T> T withDefault(T result, Integer defaultValue) {
+ return result!=null ? result : (T) defaultValue;
+ }
/**
* @return null if empty, or the most recent value
*/
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c1a5df7d/policy/src/test/java/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java b/policy/src/test/java/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java
index 14f2f6f..1885e9a 100644
--- a/policy/src/test/java/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java
+++ b/policy/src/test/java/brooklyn/policy/autoscaling/AutoScalerPolicyTest.java
@@ -27,10 +27,13 @@ import java.lang.management.MemoryUsage;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -41,9 +44,11 @@ import brooklyn.entity.basic.Entities;
import brooklyn.entity.proxying.EntitySpec;
import brooklyn.entity.trait.Resizable;
import brooklyn.event.basic.BasicNotificationSensor;
+import brooklyn.policy.PolicySpec;
import brooklyn.test.Asserts;
import brooklyn.test.entity.TestApplication;
import brooklyn.test.entity.TestCluster;
+import brooklyn.util.collections.MutableList;
import brooklyn.util.collections.MutableMap;
import brooklyn.util.time.Duration;
@@ -53,24 +58,38 @@ import com.google.common.collect.ImmutableMap;
public class AutoScalerPolicyTest {
+ private static final Logger log = LoggerFactory.getLogger(AutoScalerPolicyTest.class);
+
private static long TIMEOUT_MS = 10*1000;
private static long SHORT_WAIT_MS = 250;
private static long OVERHEAD_DURATION_MS = 500;
private static long EARLY_RETURN_MS = 10;
+
+ private static final int MANY_TIMES_INVOCATION_COUNT = 10;
AutoScalerPolicy policy;
TestCluster cluster;
LocallyResizableEntity resizable;
TestApplication app;
+ List<Integer> policyResizes = MutableList.of();
@BeforeMethod(alwaysRun=true)
public void setUp() throws Exception {
+ log.info("resetting "+getClass().getSimpleName());
app = TestApplication.Factory.newManagedInstanceForTests();
cluster = app.createAndManageChild(EntitySpec.create(TestCluster.class).configure(TestCluster.INITIAL_SIZE, 1));
resizable = new LocallyResizableEntity(cluster, cluster);
Entities.manage(resizable);
- policy = new AutoScalerPolicy();
- resizable.addPolicy(policy);
+ PolicySpec<AutoScalerPolicy> policySpec = PolicySpec.create(AutoScalerPolicy.class).configure(AutoScalerPolicy.RESIZE_OPERATOR, new ResizeOperator() {
+ @Override
+ public Integer resize(Entity entity, Integer desiredSize) {
+ log.info("resizing to "+desiredSize);
+ policyResizes.add(desiredSize);
+ return ((Resizable)entity).resize(desiredSize);
+ }
+ });
+ policy = resizable.addPolicy(policySpec);
+ policyResizes.clear();
}
@AfterMethod(alwaysRun=true)
@@ -82,19 +101,35 @@ public class AutoScalerPolicyTest {
policy = null;
}
+ public void assertSizeEventually(Integer targetSize) {
+ Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, targetSize));
+ assertEquals(policyResizes.get(policyResizes.size()-1), targetSize);
+ }
+
@Test
public void testShrinkColdPool() throws Exception {
resizable.resize(4);
- resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(4, 30L, 4*10L, 4*20L));
+ // all metrics as per-node here
+ resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(30d/4, 10, 20));
+
+ // expect pool to shrink to 3 (i.e. maximum to have >= 10 per container)
+ assertSizeEventually(3);
+ }
+
+ @Test
+ public void testShrinkColdPoolTotals() throws Exception {
+ resizable.resize(4);
+ // all metrics as totals here
+ resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(30L, 4*10L, 4*20L));
- // expect pool to shrink to 3 (i.e. maximum to have >= 40 per container)
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 3));
+ // expect pool to shrink to 3 (i.e. maximum to have >= 10 per container)
+ assertSizeEventually(3);
}
@Test
public void testShrinkColdPoolRoundsUpDesiredNumberOfContainers() throws Exception {
resizable.resize(4);
- resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(4, 1L, 4*10L, 4*20L));
+ resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1L, 4*10L, 4*20L));
Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 1));
}
@@ -102,10 +137,53 @@ public class AutoScalerPolicyTest {
@Test
public void testGrowHotPool() throws Exception {
resizable.resize(2);
- resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(2, 41L, 2*10L, 2*20L));
+ // all metrics as per-node here
+ resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(21L, 10L, 20L));
+
+ // expect pool to grow to 3 (i.e. minimum to have <= 20 per container)
+ assertSizeEventually(3);
+ }
+
+ @Test
+ public void testGrowHotPoolTotals() throws Exception {
+ resizable.resize(2);
+ // all metrics as totals here
+ resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(41L, 2*10L, 2*20L));
- // expect pool to grow to 3 (i.e. minimum to have <= 80 per container)
- Asserts.succeedsEventually(ImmutableMap.of("timeout", TIMEOUT_MS), currentSizeAsserter(resizable, 3));
+ // expect pool to grow to 3 (i.e. minimum to have <= 20 per container)
+ assertSizeEventually(3);
+ }
+
+ @Test
+ public void testGrowShrinkRespectsResizeIterationIncrementAndResizeIterationMax() throws Exception {
+ resizable.resize(2);
+ policy.config().set(AutoScalerPolicy.RESIZE_UP_ITERATION_INCREMENT, 2);
+ policy.config().set(AutoScalerPolicy.RESIZE_UP_ITERATION_MAX, 4);
+ policy.config().set(AutoScalerPolicy.RESIZE_DOWN_ITERATION_INCREMENT, 3);
+ policy.config().set(AutoScalerPolicy.RESIZE_DOWN_ITERATION_MAX, 3);
+
+ resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(42/2, 10, 20));
+ // expect pool to grow to 4 (i.e. to have <= 20 per container we need 3, but increment is 2)
+ assertSizeEventually(4);
+
+ resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(200/4, 10, 20));
+ // a single hot message can only make it go to 8
+ assertSizeEventually(8);
+ assertEquals(policyResizes, MutableList.of(4, 8));
+ resizable.emit(AutoScalerPolicy.DEFAULT_POOL_HOT_SENSOR, message(200/8, 10, 20));
+ assertSizeEventually(10);
+ assertEquals(policyResizes, MutableList.of(4, 8, 10));
+
+ // now shrink
+ policyResizes.clear();
+ policy.config().set(AutoScalerPolicy.MIN_POOL_SIZE, 2);
+ resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1, 10, 20));
+ assertSizeEventually(7);
+ resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1, 10, 20));
+ assertSizeEventually(4);
+ resizable.emit(AutoScalerPolicy.DEFAULT_POOL_COLD_SENSOR, message(1, 10, 20));
+ assertSizeEventually(2);
+ assertEquals(policyResizes, MutableList.of(7, 4, 2));
}
@Test
@@ -262,8 +340,11 @@ public class AutoScalerPolicyTest {
public void testUsesCustomSensorOverride() throws Exception {
resizable.removePolicy(policy);
+ @SuppressWarnings("rawtypes")
BasicNotificationSensor<Map> customPoolHotSensor = new BasicNotificationSensor<Map>(Map.class, "custom.hot", "");
+ @SuppressWarnings("rawtypes")
BasicNotificationSensor<Map> customPoolColdSensor = new BasicNotificationSensor<Map>(Map.class, "custom.cold", "");
+ @SuppressWarnings("rawtypes")
BasicNotificationSensor<Map> customPoolOkSensor = new BasicNotificationSensor<Map>(Map.class, "custom.ok", "");
policy = AutoScalerPolicy.builder()
.poolHotSensor(customPoolHotSensor)
@@ -304,15 +385,16 @@ public class AutoScalerPolicyTest {
}});
}
- // FIXME decreased invocationCount from 100, because was failing in jenkins occassionally.
+ // FIXME failing in jenkins occassionally - have put it in the "Acceptance" group for now
+ //
// Error was things like it taking a couple of seconds too long to scale-up. This is *not*
// just caused by a slow GC (running with -verbose:gc shows during a failure several
// incremental GCs that usually don't amount to more than 0.2 of a second at most, often less).
// Doing a thread-dump etc immediately after the too-long delay shows no strange thread usage,
// and shows releng3 system load averages of numbers like 1.73, 2.87 and 1.22.
//
- // Have put it in the "Acceptance" group for now.
- @Test(groups={"Integration", "Acceptance"}, invocationCount=100)
+ // Is healthy on normal machines.
+ @Test(groups={"Integration", "Acceptance"}, invocationCount=MANY_TIMES_INVOCATION_COUNT)
public void testRepeatedResizeUpStabilizationDelayTakesMaxSustainedDesired() throws Throwable {
try {
testResizeUpStabilizationDelayTakesMaxSustainedDesired();
@@ -429,9 +511,8 @@ public class AutoScalerPolicyTest {
}});
}
- // FIXME decreased invocationCount from 100; see comment against testRepeatedResizeUpStabilizationDelayTakesMaxSustainedDesired
- // Have put it in the "Acceptance" group for now.
- @Test(groups={"Integration", "Acceptance"}, invocationCount=100)
+ // FIXME Acceptance -- see comment against testRepeatedResizeUpStabilizationDelayTakesMaxSustainedDesired
+ @Test(groups={"Integration", "Acceptance"}, invocationCount=MANY_TIMES_INVOCATION_COUNT)
public void testRepeatedResizeDownStabilizationDelayTakesMinSustainedDesired() throws Throwable {
try {
testResizeDownStabilizationDelayTakesMinSustainedDesired();
@@ -525,6 +606,9 @@ public class AutoScalerPolicyTest {
assertTrue(resizeDelay >= (resizeDownStabilizationDelay-EARLY_RETURN_MS), "resizeDelay="+resizeDelay);
}
+ Map<String, Object> message(double currentWorkrate, double lowThreshold, double highThreshold) {
+ return message(resizable.getCurrentSize(), currentWorkrate, lowThreshold, highThreshold);
+ }
static Map<String, Object> message(int currentSize, double currentWorkrate, double lowThreshold, double highThreshold) {
return ImmutableMap.<String,Object>of(
AutoScalerPolicy.POOL_CURRENT_SIZE_KEY, currentSize,
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c1a5df7d/policy/src/test/java/brooklyn/policy/autoscaling/LocallyResizableEntity.java
----------------------------------------------------------------------
diff --git a/policy/src/test/java/brooklyn/policy/autoscaling/LocallyResizableEntity.java b/policy/src/test/java/brooklyn/policy/autoscaling/LocallyResizableEntity.java
index 6dea88b..05c6e5b 100644
--- a/policy/src/test/java/brooklyn/policy/autoscaling/LocallyResizableEntity.java
+++ b/policy/src/test/java/brooklyn/policy/autoscaling/LocallyResizableEntity.java
@@ -41,6 +41,7 @@ public class LocallyResizableEntity extends AbstractEntity implements Resizable
public LocallyResizableEntity (TestCluster tc) {
this(null, tc);
}
+ @SuppressWarnings("deprecation")
public LocallyResizableEntity (Entity parent, TestCluster tc) {
super(parent);
this.cluster = tc;
[2/3] incubator-brooklyn git commit: fix spelling
Posted by he...@apache.org.
fix spelling
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/6498982b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/6498982b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/6498982b
Branch: refs/heads/master
Commit: 6498982bea80d2c3355c467bb66c418ef4b160d0
Parents: c1a5df7
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Jun 10 19:06:25 2015 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Jun 10 19:06:25 2015 +0100
----------------------------------------------------------------------
.../main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/6498982b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
index bbf1249..52bb943 100644
--- a/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
+++ b/policy/src/main/java/brooklyn/policy/autoscaling/AutoScalerPolicy.java
@@ -308,7 +308,7 @@ public class AutoScalerPolicy extends AbstractPolicy {
@SetFromFlag("resizeUpIterationIncrement")
public static final ConfigKey<Integer> RESIZE_UP_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class)
.name("autoscaler.resizeUpIterationIncrement")
- .description("Batch size for resizing up; the size will be increased by a mulitiple of this value")
+ .description("Batch size for resizing up; the size will be increased by a multiple of this value")
.defaultValue(1)
.reconfigurable(true)
.build();
@@ -322,7 +322,7 @@ public class AutoScalerPolicy extends AbstractPolicy {
@SetFromFlag("resizeDownIterationIncrement")
public static final ConfigKey<Integer> RESIZE_DOWN_ITERATION_INCREMENT = BasicConfigKey.builder(Integer.class)
.name("autoscaler.resizeDownIterationIncrement")
- .description("Batch size for resizing down; the size will be decreased by a mulitiple of this value")
+ .description("Batch size for resizing down; the size will be decreased by a multiple of this value")
.defaultValue(1)
.reconfigurable(true)
.build();
[3/3] incubator-brooklyn git commit: This closes #673
Posted by he...@apache.org.
This closes #673
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/284b763d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/284b763d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/284b763d
Branch: refs/heads/master
Commit: 284b763d2dd5df91379355ebeb3a999d01621211
Parents: f7b9047 6498982
Author: Alex Heneveld <al...@cloudsoftcorp.com>
Authored: Wed Jun 10 19:06:59 2015 +0100
Committer: Alex Heneveld <al...@cloudsoftcorp.com>
Committed: Wed Jun 10 19:06:59 2015 +0100
----------------------------------------------------------------------
.../policy/basic/AbstractEntityAdjunct.java | 6 +-
.../policy/autoscaling/AutoScalerPolicy.java | 289 +++++++++++++------
.../policy/autoscaling/SizeHistory.java | 12 +-
.../autoscaling/AutoScalerPolicyTest.java | 114 +++++++-
.../autoscaling/LocallyResizableEntity.java | 1 +
5 files changed, 320 insertions(+), 102 deletions(-)
----------------------------------------------------------------------