You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/18 13:06:16 UTC
[18/24] incubator-brooklyn git commit: [BROOKLYN-162] Renaming
package policy
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
deleted file mode 100644
index ff5d60e..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/ConnectionFailureDetector.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.guava.Maybe;
-import brooklyn.util.net.Networking;
-import brooklyn.util.time.Duration;
-
-import com.google.common.net.HostAndPort;
-
-/**
- * Monitors a given {@link HostAndPort}, to emit HASensors.CONNECTION_FAILED and HASensors.CONNECTION_RECOVERED
- * if the connection is lost/restored.
- */
-@Catalog(name="Connection Failure Detector", description="HA policy for monitoring a host:port, "
- + "emitting an event if the connection is lost/restored")
-public class ConnectionFailureDetector extends AbstractFailureDetector {
-
- public static final ConfigKey<HostAndPort> ENDPOINT = ConfigKeys.newConfigKey(HostAndPort.class, "connectionFailureDetector.endpoint");
-
- public static final ConfigKey<Duration> POLL_PERIOD = ConfigKeys.newConfigKey(Duration.class, "connectionFailureDetector.pollPeriod", "", Duration.ONE_SECOND);
-
- public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
-
- public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
-
- @SetFromFlag("connectionFailedStabilizationDelay")
- public static final ConfigKey<Duration> CONNECTION_FAILED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
- .name("connectionFailureDetector.serviceFailedStabilizationDelay")
- .description("Time period for which the connection must be consistently down for "
- + "(e.g. doesn't report down-up-down) before concluding failure. "
- + "Note that long TCP timeouts mean there can be long (e.g. 70 second) "
- + "delays in noticing a connection refused condition.")
- .defaultValue(Duration.ZERO)
- .build();
-
- @SetFromFlag("connectionRecoveredStabilizationDelay")
- public static final ConfigKey<Duration> CONNECTION_RECOVERED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
- .name("connectionFailureDetector.serviceRecoveredStabilizationDelay")
- .description("For a failed connection, time period for which the connection must be consistently up for (e.g. doesn't report up-down-up) before concluding recovered")
- .defaultValue(Duration.ZERO)
- .build();
-
- @Override
- public void init() {
- super.init();
- getRequiredConfig(ENDPOINT); // just to confirm it's set, failing fast
- if (config().getRaw(SENSOR_FAILED).isAbsent()) {
- config().set(SENSOR_FAILED, CONNECTION_FAILED);
- }
- if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
- config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
- }
- }
-
- @Override
- protected CalculatedStatus calculateStatus() {
- HostAndPort endpoint = getConfig(ENDPOINT);
- boolean isHealthy = Networking.isReachable(endpoint);
- return new BasicCalculatedStatus(isHealthy, "endpoint=" + endpoint);
- }
-
- //Persistence compatibility overrides
- @Override
- protected Duration getPollPeriod() {
- return getConfig(POLL_PERIOD);
- }
-
- @Override
- protected Duration getFailedStabilizationDelay() {
- return getConfig(CONNECTION_FAILED_STABILIZATION_DELAY);
- }
-
- @Override
- protected Duration getRecoveredStabilizationDelay() {
- return getConfig(CONNECTION_RECOVERED_STABILIZATION_DELAY);
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected Sensor<FailureDescriptor> getSensorFailed() {
- Maybe<Object> sensorFailed = config().getRaw(SENSOR_FAILED);
- if (sensorFailed.isPresent()) {
- return (Sensor<FailureDescriptor>)sensorFailed.get();
- } else {
- return CONNECTION_FAILED;
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- protected Sensor<FailureDescriptor> getSensorRecovered() {
- Maybe<Object> sensorRecovered = config().getRaw(SENSOR_RECOVERED);
- if (sensorRecovered.isPresent()) {
- return (Sensor<FailureDescriptor>)sensorRecovered.get();
- } else {
- return CONNECTION_RECOVERED;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/HASensors.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/HASensors.java b/policy/src/main/java/brooklyn/policy/ha/HASensors.java
deleted file mode 100644
index b940aa0..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/HASensors.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import brooklyn.event.basic.BasicNotificationSensor;
-
-import com.google.common.base.Objects;
-
-public class HASensors {
-
- public static final BasicNotificationSensor<FailureDescriptor> ENTITY_FAILED = new BasicNotificationSensor<FailureDescriptor>(
- FailureDescriptor.class, "ha.entityFailed", "Indicates that an entity has failed");
-
- public static final BasicNotificationSensor<FailureDescriptor> ENTITY_RECOVERED = new BasicNotificationSensor<FailureDescriptor>(
- FailureDescriptor.class, "ha.entityRecovered", "Indicates that a previously failed entity has recovered");
-
- public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = new BasicNotificationSensor<FailureDescriptor>(
- FailureDescriptor.class, "ha.connectionFailed", "Indicates that a connection has failed");
-
- public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = new BasicNotificationSensor<FailureDescriptor>(
- FailureDescriptor.class, "ha.connectionRecovered", "Indicates that a previously failed connection has recovered");
-
- // TODO How to make this serializable with the entity reference
- public static class FailureDescriptor {
- private final Object component;
- private final String description;
-
- public FailureDescriptor(Object component, String description) {
- this.component = component;
- this.description = description;
- }
-
- public Object getComponent() {
- return component;
- }
-
- public String getDescription() {
- return description;
- }
-
- @Override
- public String toString() {
- return Objects.toStringHelper(this).add("component", component).add("description", description).toString();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
deleted file mode 100644
index 2e4b719..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceFailureDetector.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.core.util.config.ConfigBag;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-import org.apache.brooklyn.core.util.task.BasicTask;
-import org.apache.brooklyn.core.util.task.ScheduledTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.Attributes;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.Lifecycle;
-import brooklyn.entity.basic.ServiceStateLogic;
-import brooklyn.entity.basic.ServiceStateLogic.ComputeServiceState;
-import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-/**
- * Emits {@link HASensors#ENTITY_FAILED} whenever the parent's default logic ({@link ComputeServiceState}) would detect a problem,
- * and similarly {@link HASensors#ENTITY_RECOVERED} when recovered.
- * <p>
- * gives more control over suppressing {@link Lifecycle#ON_FIRE},
- * for some period of time
- * (or until another process manually sets {@link Attributes#SERVICE_STATE_ACTUAL} to {@value Lifecycle#ON_FIRE},
- * which this enricher will not clear until all problems have gone away)
- */
-//@Catalog(name="Service Failure Detector", description="HA policy for deteting failure of a service")
-public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceState {
-
- // TODO Remove duplication between this and MemberFailureDetectionPolicy.
- // The latter could be re-written to use this. Or could even be deprecated
- // in favour of this.
-
- public enum LastPublished {
- NONE,
- FAILED,
- RECOVERED;
- }
-
- private static final Logger LOG = LoggerFactory.getLogger(ServiceFailureDetector.class);
-
- private static final long MIN_PERIOD_BETWEEN_EXECS_MILLIS = 100;
-
- public static final BasicNotificationSensor<FailureDescriptor> ENTITY_FAILED = HASensors.ENTITY_FAILED;
-
- @SetFromFlag("onlyReportIfPreviouslyUp")
- public static final ConfigKey<Boolean> ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP = ConfigKeys.newBooleanConfigKey("onlyReportIfPreviouslyUp",
- "Prevents the policy from emitting ENTITY_FAILED if the entity fails on startup (ie has never been up)", true);
-
- public static final ConfigKey<Boolean> MONITOR_SERVICE_PROBLEMS = ConfigKeys.newBooleanConfigKey("monitorServiceProblems",
- "Whether to monitor service problems, and emit on failures there (if set to false, this monitors only service up)", true);
-
- @SetFromFlag("serviceOnFireStabilizationDelay")
- public static final ConfigKey<Duration> SERVICE_ON_FIRE_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
- .name("serviceOnFire.stabilizationDelay")
- .description("Time period for which the service must be consistently down for (e.g. doesn't report down-up-down) before concluding ON_FIRE")
- .defaultValue(Duration.ZERO)
- .build();
-
- @SetFromFlag("entityFailedStabilizationDelay")
- public static final ConfigKey<Duration> ENTITY_FAILED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
- .name("entityFailed.stabilizationDelay")
- .description("Time period for which the service must be consistently down for (e.g. doesn't report down-up-down) before emitting ENTITY_FAILED")
- .defaultValue(Duration.ZERO)
- .build();
-
- @SetFromFlag("entityRecoveredStabilizationDelay")
- public static final ConfigKey<Duration> ENTITY_RECOVERED_STABILIZATION_DELAY = BasicConfigKey.builder(Duration.class)
- .name("entityRecovered.stabilizationDelay")
- .description("For a failed entity, time period for which the service must be consistently up for (e.g. doesn't report up-down-up) before emitting ENTITY_RECOVERED")
- .defaultValue(Duration.ZERO)
- .build();
-
- @SetFromFlag("entityFailedRepublishTime")
- public static final ConfigKey<Duration> ENTITY_FAILED_REPUBLISH_TIME = BasicConfigKey.builder(Duration.class)
- .name("entityFailed.republishTime")
- .description("Publish failed state periodically at the specified intervals, null to disable.")
- .build();
-
- protected Long firstUpTime;
-
- protected Long currentFailureStartTime = null;
- protected Long currentRecoveryStartTime = null;
-
- protected Long publishEntityFailedTime = null;
- protected Long publishEntityRecoveredTime = null;
- protected Long setEntityOnFireTime = null;
-
- protected LastPublished lastPublished = LastPublished.NONE;
-
- private final AtomicBoolean executorQueued = new AtomicBoolean(false);
- private volatile long executorTime = 0;
-
- /**
- * TODO Really don't want this mutex!
- * ServiceStateLogic.setExpectedState() will call into `onEvent(null)`, so could get concurrent calls.
- * How to handle that? I don't think `ServiceStateLogic.setExpectedState` should be making the call, but
- * presumably that is their to remove a race condition so it is set before method returns. Caller shouldn't
- * rely on that though.
- * e.g. see `ServiceFailureDetectorTest.testNotifiedOfFailureOnStateOnFire`, where we get two notifications.
- */
- private final Object mutex = new Object();
-
- public ServiceFailureDetector() {
- this(new ConfigBag());
- }
-
- public ServiceFailureDetector(Map<String,?> flags) {
- this(new ConfigBag().putAll(flags));
- }
-
- public ServiceFailureDetector(ConfigBag configBag) {
- // TODO hierarchy should use ConfigBag, and not change flags
- super(configBag.getAllConfigMutable());
- }
-
- @Override
- public void onEvent(SensorEvent<Object> event) {
- if (firstUpTime==null) {
- if (event!=null && Attributes.SERVICE_UP.equals(event.getSensor()) && Boolean.TRUE.equals(event.getValue())) {
- firstUpTime = event.getTimestamp();
- } else if (event == null && Boolean.TRUE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
- // If this enricher is registered after the entity is up, then we'll get a "synthetic" onEvent(null)
- firstUpTime = System.currentTimeMillis();
- }
- }
-
- super.onEvent(event);
- }
-
- @Override
- protected void setActualState(Lifecycle state) {
- long now = System.currentTimeMillis();
-
- synchronized (mutex) {
- if (state==Lifecycle.ON_FIRE) {
- if (lastPublished == LastPublished.FAILED) {
- if (currentRecoveryStartTime != null) {
- if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component was recovering, now failing: {}", new Object[] {this, entity, getExplanation(state)});
- currentRecoveryStartTime = null;
- publishEntityRecoveredTime = null;
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component still failed: {}", new Object[] {this, entity, getExplanation(state)});
- }
- } else {
- if (firstUpTime == null && getConfig(ENTITY_FAILED_ONLY_IF_PREVIOUSLY_UP)) {
- // suppress; won't publish
- } else if (currentFailureStartTime == null) {
- if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component now failing: {}", new Object[] {this, entity, getExplanation(state)});
- currentFailureStartTime = now;
- publishEntityFailedTime = currentFailureStartTime + getConfig(ENTITY_FAILED_STABILIZATION_DELAY).toMilliseconds();
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component continuing failing: {}", new Object[] {this, entity, getExplanation(state)});
- }
- }
- if (setEntityOnFireTime == null) {
- setEntityOnFireTime = now + getConfig(SERVICE_ON_FIRE_STABILIZATION_DELAY).toMilliseconds();
- }
- currentRecoveryStartTime = null;
- publishEntityRecoveredTime = null;
-
- } else if (state == Lifecycle.RUNNING) {
- if (lastPublished == LastPublished.FAILED) {
- if (currentRecoveryStartTime == null) {
- if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component now recovering: {}", new Object[] {this, entity, getExplanation(state)});
- currentRecoveryStartTime = now;
- publishEntityRecoveredTime = currentRecoveryStartTime + getConfig(ENTITY_RECOVERED_STABILIZATION_DELAY).toMilliseconds();
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component continuing recovering: {}", new Object[] {this, entity, getExplanation(state)});
- }
- } else {
- if (currentFailureStartTime != null) {
- if (LOG.isDebugEnabled()) LOG.debug("{} health-check for {}, component was failing, now healthy: {}", new Object[] {this, entity, getExplanation(state)});
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, component still healthy: {}", new Object[] {this, entity, getExplanation(state)});
- }
- }
- currentFailureStartTime = null;
- publishEntityFailedTime = null;
- setEntityOnFireTime = null;
-
- } else {
- if (LOG.isTraceEnabled()) LOG.trace("{} health-check for {}, in unconfirmed sate: {}", new Object[] {this, entity, getExplanation(state)});
- }
-
- long recomputeIn = Long.MAX_VALUE; // For whether to call recomputeAfterDelay
-
- if (publishEntityFailedTime != null) {
- long delayBeforeCheck = publishEntityFailedTime - now;
- if (delayBeforeCheck<=0) {
- if (LOG.isDebugEnabled()) LOG.debug("{} publishing failed (state={}; currentFailureStartTime={}; now={}",
- new Object[] {this, state, Time.makeDateString(currentFailureStartTime), Time.makeDateString(now)});
- Duration republishDelay = getConfig(ENTITY_FAILED_REPUBLISH_TIME);
- if (republishDelay == null) {
- publishEntityFailedTime = null;
- } else {
- publishEntityFailedTime = now + republishDelay.toMilliseconds();
- recomputeIn = Math.min(recomputeIn, republishDelay.toMilliseconds());
- }
- lastPublished = LastPublished.FAILED;
- entity.emit(HASensors.ENTITY_FAILED, new HASensors.FailureDescriptor(entity, getFailureDescription(now)));
- } else {
- recomputeIn = Math.min(recomputeIn, delayBeforeCheck);
- }
- } else if (publishEntityRecoveredTime != null) {
- long delayBeforeCheck = publishEntityRecoveredTime - now;
- if (delayBeforeCheck<=0) {
- if (LOG.isDebugEnabled()) LOG.debug("{} publishing recovered (state={}; currentRecoveryStartTime={}; now={}",
- new Object[] {this, state, Time.makeDateString(currentRecoveryStartTime), Time.makeDateString(now)});
- publishEntityRecoveredTime = null;
- lastPublished = LastPublished.RECOVERED;
- entity.emit(HASensors.ENTITY_RECOVERED, new HASensors.FailureDescriptor(entity, null));
- } else {
- recomputeIn = Math.min(recomputeIn, delayBeforeCheck);
- }
- }
-
- if (setEntityOnFireTime != null) {
- long delayBeforeCheck = setEntityOnFireTime - now;
- if (delayBeforeCheck<=0) {
- if (LOG.isDebugEnabled()) LOG.debug("{} setting on-fire, now that deferred period has passed (state={})",
- new Object[] {this, state});
- setEntityOnFireTime = null;
- super.setActualState(state);
- } else {
- recomputeIn = Math.min(recomputeIn, delayBeforeCheck);
- }
- } else {
- super.setActualState(state);
- }
-
- if (recomputeIn < Long.MAX_VALUE) {
- recomputeAfterDelay(recomputeIn);
- }
- }
- }
-
- protected String getExplanation(Lifecycle state) {
- Duration serviceFailedStabilizationDelay = getConfig(ENTITY_FAILED_STABILIZATION_DELAY);
- Duration serviceRecoveredStabilizationDelay = getConfig(ENTITY_RECOVERED_STABILIZATION_DELAY);
-
- return String.format("location=%s; status=%s; lastPublished=%s; timeNow=%s; "+
- "currentFailurePeriod=%s; currentRecoveryPeriod=%s",
- entity.getLocations(),
- (state != null ? state : "<unreported>"),
- lastPublished,
- Time.makeDateString(System.currentTimeMillis()),
- (currentFailureStartTime != null ? getTimeStringSince(currentFailureStartTime) : "<none>") + " (stabilization "+Time.makeTimeStringRounded(serviceFailedStabilizationDelay) + ")",
- (currentRecoveryStartTime != null ? getTimeStringSince(currentRecoveryStartTime) : "<none>") + " (stabilization "+Time.makeTimeStringRounded(serviceRecoveredStabilizationDelay) + ")");
- }
-
- private String getFailureDescription(long now) {
- String description = null;
- Map<String, Object> serviceProblems = entity.getAttribute(Attributes.SERVICE_PROBLEMS);
- if (serviceProblems!=null && !serviceProblems.isEmpty()) {
- Entry<String, Object> problem = serviceProblems.entrySet().iterator().next();
- description = problem.getKey()+": "+problem.getValue();
- if (serviceProblems.size()>1) {
- description = serviceProblems.size()+" service problems, including "+description;
- } else {
- description = "service problem: "+description;
- }
- } else if (Boolean.FALSE.equals(entity.getAttribute(Attributes.SERVICE_UP))) {
- description = "service not up";
- } else {
- description = "service failure detected";
- }
- if (publishEntityFailedTime!=null && currentFailureStartTime!=null && publishEntityFailedTime > currentFailureStartTime)
- description = " (stabilized for "+Duration.of(now - currentFailureStartTime, TimeUnit.MILLISECONDS)+")";
- return description;
- }
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- protected void recomputeAfterDelay(long delay) {
- if (isRunning() && executorQueued.compareAndSet(false, true)) {
- long now = System.currentTimeMillis();
- delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now));
- if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay);
-
- Runnable job = new Runnable() {
- @Override public void run() {
- try {
- executorTime = System.currentTimeMillis();
- executorQueued.set(false);
-
- onEvent(null);
-
- } catch (Exception e) {
- if (isRunning()) {
- LOG.error("Error in enricher "+this+": "+e, e);
- } else {
- if (LOG.isDebugEnabled()) LOG.debug("Error in enricher "+this+" (but no longer running): "+e, e);
- }
- } catch (Throwable t) {
- LOG.error("Error in enricher "+this+": "+t, t);
- throw Exceptions.propagate(t);
- }
- }
- };
-
- ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job));
- ((EntityInternal)entity).getExecutionContext().submit(task);
- }
- }
-
- private String getTimeStringSince(Long time) {
- return time == null ? null : Time.makeTimeStringRounded(System.currentTimeMillis() - time);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java b/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java
deleted file mode 100644
index 6bea1d4..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceReplacer.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.entity.basic.EntityLocal;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.api.event.SensorEventListener;
-import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
-import org.apache.brooklyn.core.util.config.ConfigBag;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.ServiceStateLogic.ServiceProblemsLogic;
-import brooklyn.entity.group.StopFailedRuntimeException;
-import brooklyn.entity.trait.MemberReplaceable;
-import brooklyn.event.basic.BasicConfigKey;
-import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.exceptions.Exceptions;
-
-import com.google.common.base.Ticker;
-import com.google.common.collect.Lists;
-
-/** attaches to a DynamicCluster and replaces a failed member in response to HASensors.ENTITY_FAILED or other sensor;
- * if this fails, it sets the Cluster state to on-fire */
-@Catalog(name="Service Replacer", description="HA policy for replacing a failed member of a group")
-public class ServiceReplacer extends AbstractPolicy {
-
- private static final Logger LOG = LoggerFactory.getLogger(ServiceReplacer.class);
-
- // TODO if there are multiple failures perhaps we should abort quickly
-
- public static final BasicNotificationSensor<FailureDescriptor> ENTITY_REPLACEMENT_FAILED = new BasicNotificationSensor<FailureDescriptor>(
- FailureDescriptor.class, "ha.entityFailed.replacement", "Indicates that an entity replacement attempt has failed");
-
- @SetFromFlag("setOnFireOnFailure")
- public static final ConfigKey<Boolean> SET_ON_FIRE_ON_FAILURE = ConfigKeys.newBooleanConfigKey("setOnFireOnFailure", "", true);
-
- /** monitors this sensor, by default ENTITY_RESTART_FAILED */
- @SetFromFlag("failureSensorToMonitor")
- @SuppressWarnings("rawtypes")
- public static final ConfigKey<Sensor> FAILURE_SENSOR_TO_MONITOR = new BasicConfigKey<Sensor>(Sensor.class, "failureSensorToMonitor", "", ServiceRestarter.ENTITY_RESTART_FAILED);
-
- /** skips replace if replacement has failed this many times failure re-occurs within this time interval */
- @SetFromFlag("failOnRecurringFailuresInThisDuration")
- public static final ConfigKey<Long> FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION = ConfigKeys.newLongConfigKey(
- "failOnRecurringFailuresInThisDuration",
- "abandon replace if replacement has failed many times within this time interval",
- 5*60*1000L);
-
- /** skips replace if replacement has failed this many times failure re-occurs within this time interval */
- @SetFromFlag("failOnNumRecurringFailures")
- public static final ConfigKey<Integer> FAIL_ON_NUM_RECURRING_FAILURES = ConfigKeys.newIntegerConfigKey(
- "failOnNumRecurringFailures",
- "abandon replace if replacement has failed this many times (100% of attempts) within the time interval",
- 5);
-
- @SetFromFlag("ticker")
- public static final ConfigKey<Ticker> TICKER = ConfigKeys.newConfigKey(Ticker.class,
- "ticker",
- "A time source (defaults to system-clock, which is almost certainly what's wanted, except in tests)",
- null);
-
- protected final List<Long> consecutiveReplacementFailureTimes = Lists.newCopyOnWriteArrayList();
-
- public ServiceReplacer() {
- this(new ConfigBag());
- }
-
- public ServiceReplacer(Map<String,?> flags) {
- this(new ConfigBag().putAll(flags));
- }
-
- public ServiceReplacer(ConfigBag configBag) {
- // TODO hierarchy should use ConfigBag, and not change flags
- super(configBag.getAllConfigMutable());
- }
-
- public ServiceReplacer(Sensor<?> failureSensorToMonitor) {
- this(new ConfigBag().configure(FAILURE_SENSOR_TO_MONITOR, failureSensorToMonitor));
- }
-
- @Override
- public void setEntity(final EntityLocal entity) {
- checkArgument(entity instanceof MemberReplaceable, "ServiceReplacer must take a MemberReplaceable, not %s", entity);
- Sensor<?> failureSensorToMonitor = checkNotNull(getConfig(FAILURE_SENSOR_TO_MONITOR), "failureSensorToMonitor");
-
- super.setEntity(entity);
-
- subscribeToMembers((Group)entity, failureSensorToMonitor, new SensorEventListener<Object>() {
- @Override public void onEvent(final SensorEvent<Object> event) {
- // Must execute in another thread - if we called entity.replaceMember in the event-listener's thread
- // then we'd block all other events being delivered to this entity's other subscribers.
- // Relies on synchronization of `onDetectedFailure`.
- // See same pattern used in ServiceRestarter.
-
- // TODO Could use BasicExecutionManager.setTaskSchedulerForTag to prevent race of two
- // events being received in rapid succession, and onDetectedFailure being executed out-of-order
- // for them; or could write events to a blocking queue and have onDetectedFailure read from that.
-
- if (isRunning()) {
- LOG.warn("ServiceReplacer notified; dispatching job for "+entity+" ("+event.getValue()+")");
- ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
- @Override public void run() {
- onDetectedFailure(event);
- }});
- } else {
- LOG.warn("ServiceReplacer not running, so not acting on failure detected at "+entity+" ("+event.getValue()+", child of "+entity+")");
- }
- }
- });
- }
-
- // TODO semaphores would be better to allow at-most-one-blocking behaviour
- protected synchronized void onDetectedFailure(SensorEvent<Object> event) {
- final Entity failedEntity = event.getSource();
- final Object reason = event.getValue();
-
- if (isSuspended()) {
- LOG.warn("ServiceReplacer suspended, so not acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")");
- return;
- }
-
- if (isRepeatedlyFailingTooMuch()) {
- LOG.error("ServiceReplacer not acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+"), because too many recent replacement failures");
- return;
- }
-
- LOG.warn("ServiceReplacer acting on failure detected at "+failedEntity+" ("+reason+", child of "+entity+")");
- ((EntityInternal)entity).getManagementSupport().getExecutionContext().submit(MutableMap.of(), new Runnable() {
-
- @Override
- public void run() {
- try {
- Entities.invokeEffectorWithArgs(entity, entity, MemberReplaceable.REPLACE_MEMBER, failedEntity.getId()).get();
- consecutiveReplacementFailureTimes.clear();
- } catch (Exception e) {
- if (Exceptions.getFirstThrowableOfType(e, StopFailedRuntimeException.class) != null) {
- LOG.info("ServiceReplacer: ignoring error reported from stopping failed node "+failedEntity);
- return;
- }
- onReplacementFailed("Replace failure ("+Exceptions.collapseText(e)+") at "+entity+": "+reason);
- }
- }
- });
- }
-
- private boolean isRepeatedlyFailingTooMuch() {
- Integer failOnNumRecurringFailures = getConfig(FAIL_ON_NUM_RECURRING_FAILURES);
- long failOnRecurringFailuresInThisDuration = getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION);
- long oldestPermitted = currentTimeMillis() - failOnRecurringFailuresInThisDuration;
-
- // trim old ones
- for (Iterator<Long> iter = consecutiveReplacementFailureTimes.iterator(); iter.hasNext();) {
- Long timestamp = iter.next();
- if (timestamp < oldestPermitted) {
- iter.remove();
- } else {
- break;
- }
- }
-
- return (consecutiveReplacementFailureTimes.size() >= failOnNumRecurringFailures);
- }
-
- protected long currentTimeMillis() {
- Ticker ticker = getConfig(TICKER);
- return (ticker == null) ? System.currentTimeMillis() : TimeUnit.NANOSECONDS.toMillis(ticker.read());
- }
-
- protected void onReplacementFailed(String msg) {
- LOG.warn("ServiceReplacer failed for "+entity+": "+msg);
- consecutiveReplacementFailureTimes.add(currentTimeMillis());
-
- if (getConfig(SET_ON_FIRE_ON_FAILURE)) {
- ServiceProblemsLogic.updateProblemsIndicator(entity, "ServiceReplacer", "replacement failed: "+msg);
- }
- entity.emit(ENTITY_REPLACEMENT_FAILED, new FailureDescriptor(entity, msg));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java b/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
deleted file mode 100644
index ab1359d..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/ServiceRestarter.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.api.entity.basic.EntityLocal;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.api.event.SensorEventListener;
-import org.apache.brooklyn.core.policy.basic.AbstractPolicy;
-import org.apache.brooklyn.core.util.config.ConfigBag;
-import org.apache.brooklyn.core.util.flags.SetFromFlag;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.entity.basic.Entities;
-import brooklyn.entity.basic.EntityInternal;
-import brooklyn.entity.basic.Lifecycle;
-import brooklyn.entity.basic.ServiceStateLogic;
-import brooklyn.entity.trait.Startable;
-import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.javalang.JavaClassNames;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
-
-import com.google.common.base.Preconditions;
-
-/** attaches to a SoftwareProcess (or anything Startable, emitting ENTITY_FAILED or other configurable sensor),
- * and invokes restart on failure;
- * if there is a subsequent failure within a configurable time interval, or if the restart fails,
- * this gives up and emits {@link #ENTITY_RESTART_FAILED}
- */
-@Catalog(name="Service Restarter", description="HA policy for restarting a service automatically, "
- + "and for emitting an events if the service repeatedly fails")
-public class ServiceRestarter extends AbstractPolicy {
-
- private static final Logger LOG = LoggerFactory.getLogger(ServiceRestarter.class);
-
- public static final BasicNotificationSensor<FailureDescriptor> ENTITY_RESTART_FAILED = new BasicNotificationSensor<FailureDescriptor>(
- FailureDescriptor.class, "ha.entityFailed.restart", "Indicates that an entity restart attempt has failed");
-
- /** skips retry if a failure re-occurs within this time interval */
- @SetFromFlag("failOnRecurringFailuresInThisDuration")
- public static final ConfigKey<Duration> FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION = ConfigKeys.newConfigKey(
- Duration.class,
- "failOnRecurringFailuresInThisDuration",
- "Reports entity as failed if it fails two or more times in this time window",
- Duration.minutes(3));
-
- @SetFromFlag("setOnFireOnFailure")
- public static final ConfigKey<Boolean> SET_ON_FIRE_ON_FAILURE = ConfigKeys.newBooleanConfigKey("setOnFireOnFailure", "", true);
-
- /** monitors this sensor, by default ENTITY_FAILED */
- @SetFromFlag("failureSensorToMonitor")
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public static final ConfigKey<Sensor<?>> FAILURE_SENSOR_TO_MONITOR = (ConfigKey) ConfigKeys.newConfigKey(Sensor.class, "failureSensorToMonitor", "", HASensors.ENTITY_FAILED);
-
- protected final AtomicReference<Long> lastFailureTime = new AtomicReference<Long>();
-
- public ServiceRestarter() {
- this(new ConfigBag());
- }
-
- public ServiceRestarter(Map<String,?> flags) {
- this(new ConfigBag().putAll(flags));
- }
-
- public ServiceRestarter(ConfigBag configBag) {
- // TODO hierarchy should use ConfigBag, and not change flags
- super(configBag.getAllConfigMutable());
- uniqueTag = JavaClassNames.simpleClassName(getClass())+":"+getConfig(FAILURE_SENSOR_TO_MONITOR).getName();
- }
-
- public ServiceRestarter(Sensor<?> failureSensorToMonitor) {
- this(new ConfigBag().configure(FAILURE_SENSOR_TO_MONITOR, failureSensorToMonitor));
- }
-
- @Override
- public void setEntity(final EntityLocal entity) {
- Preconditions.checkArgument(entity instanceof Startable, "Restarter must take a Startable, not "+entity);
-
- super.setEntity(entity);
-
- subscribe(entity, getConfig(FAILURE_SENSOR_TO_MONITOR), new SensorEventListener<Object>() {
- @Override public void onEvent(final SensorEvent<Object> event) {
- // Must execute in another thread - if we called entity.restart in the event-listener's thread
- // then we'd block all other events being delivered to this entity's other subscribers.
- // Relies on synchronization of `onDetectedFailure`.
- // See same pattern used in ServiceReplacer.
-
- // TODO Could use BasicExecutionManager.setTaskSchedulerForTag to prevent race of two
- // events being received in rapid succession, and onDetectedFailure being executed out-of-order
- // for them; or could write events to a blocking queue and have onDetectedFailure read from that.
-
- if (isRunning()) {
- LOG.info("ServiceRestarter notified; dispatching job for "+entity+" ("+event.getValue()+")");
- ((EntityInternal)entity).getExecutionContext().submit(MutableMap.of(), new Runnable() {
- @Override public void run() {
- onDetectedFailure(event);
- }});
- } else {
- LOG.warn("ServiceRestarter not running, so not acting on failure detected at "+entity+" ("+event.getValue()+")");
- }
- }
- });
- }
-
- // TODO semaphores would be better to allow at-most-one-blocking behaviour
- // FIXME as this is called in message-dispatch (single threaded) we should do most of this in a new submitted task
- // (as has been done in ServiceReplacer)
- protected synchronized void onDetectedFailure(SensorEvent<Object> event) {
- if (isSuspended()) {
- LOG.warn("ServiceRestarter suspended, so not acting on failure detected at "+entity+" ("+event.getValue()+")");
- return;
- }
-
- LOG.warn("ServiceRestarter acting on failure detected at "+entity+" ("+event.getValue()+")");
- long current = System.currentTimeMillis();
- Long last = lastFailureTime.getAndSet(current);
- long elapsed = last==null ? -1 : current-last;
- if (elapsed>=0 && elapsed <= getConfig(FAIL_ON_RECURRING_FAILURES_IN_THIS_DURATION).toMilliseconds()) {
- onRestartFailed("Restart failure (failed again after "+Time.makeTimeStringRounded(elapsed)+") at "+entity+": "+event.getValue());
- return;
- }
- try {
- ServiceStateLogic.setExpectedState(entity, Lifecycle.STARTING);
- Entities.invokeEffector(entity, entity, Startable.RESTART).get();
- } catch (Exception e) {
- onRestartFailed("Restart failure (error "+e+") at "+entity+": "+event.getValue());
- }
- }
-
- protected void onRestartFailed(String msg) {
- LOG.warn("ServiceRestarter failed for "+entity+": "+msg);
- if (getConfig(SET_ON_FIRE_ON_FAILURE)) {
- ServiceStateLogic.setExpectedState(entity, Lifecycle.ON_FIRE);
- }
- entity.emit(ENTITY_RESTART_FAILED, new FailureDescriptor(entity, msg));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java b/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
deleted file mode 100644
index 1fa9982..0000000
--- a/policy/src/main/java/brooklyn/policy/ha/SshMachineFailureDetector.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.ha;
-
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.brooklyn.api.catalog.Catalog;
-import org.apache.brooklyn.core.util.internal.ssh.SshTool;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.event.basic.BasicNotificationSensor;
-
-import org.apache.brooklyn.location.basic.Machines;
-import org.apache.brooklyn.location.basic.SshMachineLocation;
-
-import brooklyn.policy.ha.HASensors.FailureDescriptor;
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.guava.Maybe;
-import brooklyn.util.time.Duration;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-
-@Catalog(name="Ssh Connectivity Failure Detector", description="HA policy for monitoring an SshMachine, "
- + "emitting an event if the connection is lost/restored")
-public class SshMachineFailureDetector extends AbstractFailureDetector {
- private static final Logger LOG = LoggerFactory.getLogger(SshMachineFailureDetector.class);
- public static final String DEFAULT_UNIQUE_TAG = "failureDetector.sshMachine.tag";
-
- public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_FAILED = HASensors.CONNECTION_FAILED;
-
- public static final BasicNotificationSensor<FailureDescriptor> CONNECTION_RECOVERED = HASensors.CONNECTION_RECOVERED;
-
- public static final ConfigKey<Duration> CONNECT_TIMEOUT = ConfigKeys.newDurationConfigKey(
- "ha.sshConnection.timeout", "How long to wait for conneciton before declaring failure", Duration.TEN_SECONDS);
-
- @Override
- public void init() {
- super.init();
- if (config().getRaw(SENSOR_FAILED).isAbsent()) {
- config().set(SENSOR_FAILED, CONNECTION_FAILED);
- }
- if (config().getRaw(SENSOR_RECOVERED).isAbsent()) {
- config().set(SENSOR_RECOVERED, CONNECTION_RECOVERED);
- }
- if (config().getRaw(POLL_PERIOD).isAbsent()) {
- config().set(POLL_PERIOD, Duration.ONE_MINUTE);
- }
- uniqueTag = DEFAULT_UNIQUE_TAG;
- }
-
- @Override
- protected CalculatedStatus calculateStatus() {
- Maybe<SshMachineLocation> sshMachineOption = Machines.findUniqueSshMachineLocation(entity.getLocations());
- if (sshMachineOption.isPresent()) {
- SshMachineLocation sshMachine = sshMachineOption.get();
- try {
- Duration timeout = config().get(CONNECT_TIMEOUT);
- Map<String, ?> flags = ImmutableMap.of(
- SshTool.PROP_CONNECT_TIMEOUT.getName(), timeout.toMilliseconds(),
- SshTool.PROP_SESSION_TIMEOUT.getName(), timeout.toMilliseconds(),
- SshTool.PROP_SSH_TRIES.getName(), 1);
- int exitCode = sshMachine.execCommands(flags, SshMachineFailureDetector.class.getName(), ImmutableList.of("exit"));
- return new BasicCalculatedStatus(exitCode == 0, sshMachine.toString());
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- boolean isFirstFailure = lastPublished != LastPublished.FAILED && currentFailureStartTime == null;
- if (isFirstFailure) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed connecting to machine " + sshMachine, e);
- }
- } else {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Failed connecting to machine " + sshMachine, e);
- }
- }
- return new BasicCalculatedStatus(false, e.getMessage());
- }
- } else {
- return new BasicCalculatedStatus(true, "no machine started, not complaining");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java
deleted file mode 100644
index 0039685..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableContainer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-
-import brooklyn.config.ConfigKey;
-import brooklyn.entity.basic.AbstractGroup;
-import brooklyn.entity.basic.ConfigKeys;
-import brooklyn.event.basic.BasicNotificationSensor;
-import brooklyn.util.collections.QuorumCheck;
-import brooklyn.util.collections.QuorumCheck.QuorumChecks;
-
-/**
- * Contains worker items that can be moved between this container and others to effect load balancing.
- * Membership of a balanceable container does not imply a parent-child relationship in the Brooklyn
- * management sense.
- */
-public interface BalanceableContainer<ItemType extends Movable> extends Entity, AbstractGroup {
-
- public static BasicNotificationSensor<Entity> ITEM_ADDED = new BasicNotificationSensor<Entity>(
- Entity.class, "balanceablecontainer.item.added", "Movable item added to balanceable container");
- public static BasicNotificationSensor<Entity> ITEM_REMOVED = new BasicNotificationSensor<Entity>(
- Entity.class, "balanceablecontainer.item.removed", "Movable item removed from balanceable container");
-
- public static final ConfigKey<QuorumCheck> UP_QUORUM_CHECK = ConfigKeys.newConfigKeyWithDefault(AbstractGroup.UP_QUORUM_CHECK,
- "Up check from members; default one for container overrides usual check to always return true, "
- + "i.e. not block service up simply because the container is empty or something in the container has failed",
- QuorumChecks.alwaysTrue());
-
- public Set<ItemType> getBalanceableItems();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java
deleted file mode 100644
index 33f9e0b..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceablePoolModel.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.api.location.Location;
-
-/**
- * Captures the state of a balanceable cluster of containers and all their constituent items, including workrates,
- * for consumption by a {@link BalancingStrategy}.
- */
-public interface BalanceablePoolModel<ContainerType, ItemType> {
-
- // Attributes of the pool.
- public String getName();
- public int getPoolSize();
- public Set<ContainerType> getPoolContents();
- public double getPoolLowThreshold();
- public double getPoolHighThreshold();
- public double getCurrentPoolWorkrate();
- public boolean isHot();
- public boolean isCold();
-
-
- // Attributes of containers and items.
- public String getName(ContainerType container);
- public Location getLocation(ContainerType container);
- public double getLowThreshold(ContainerType container); // -1 for not known / invalid
- public double getHighThreshold(ContainerType container); // -1 for not known / invalid
- public double getTotalWorkrate(ContainerType container); // -1 for not known / invalid
- public Map<ContainerType, Double> getContainerWorkrates(); // contains -1 for items which are unknown
- /** contains -1 instead of actual item workrate, for items which cannot be moved */
- // @Nullable("null if the node is prevented from reporting and/or being adjusted, or has no data yet")
- public Map<ItemType, Double> getItemWorkrates(ContainerType container);
- public boolean isItemMoveable(ItemType item);
- public boolean isItemAllowedIn(ItemType item, Location location);
-
- // Mutators for keeping the model in-sync with the observed world
- public void onContainerAdded(ContainerType newContainer, double lowThreshold, double highThreshold);
- public void onContainerRemoved(ContainerType oldContainer);
- public void onItemAdded(ItemType item, ContainerType parentContainer);
- public void onItemAdded(ItemType item, ContainerType parentContainer, boolean immovable);
- public void onItemRemoved(ItemType item);
- public void onItemWorkrateUpdated(ItemType item, double newValue);
- public void onItemMoved(ItemType item, ContainerType targetContainer);
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java
deleted file mode 100644
index c8d3a8b..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPool.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Serializable;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.entity.proxying.ImplementedBy;
-
-import brooklyn.entity.trait.Resizable;
-import brooklyn.event.basic.BasicNotificationSensor;
-
-/**
- * Represents an elastic group of "container" entities, each of which is capable of hosting "item" entities that perform
- * work and consume the container's available resources (e.g. CPU or bandwidth). Auto-scaling and load-balancing policies can
- * be attached to this pool to provide dynamic elasticity based on workrates reported by the individual item entities.
- * <p>
- * The containers must be "up" in order to receive work, thus they must NOT follow the default enricher pattern
- * for groups which says that the group must be up to receive work.
- */
-@ImplementedBy(BalanceableWorkerPoolImpl.class)
-public interface BalanceableWorkerPool extends Entity, Resizable {
-
- // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing
- // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`.
-
- /** Encapsulates an item and a container; emitted for {@code ITEM_ADDED}, {@code ITEM_REMOVED} and
- * {@code ITEM_MOVED} sensors.
- */
- public static class ContainerItemPair implements Serializable {
- private static final long serialVersionUID = 1L;
- public final BalanceableContainer<?> container;
- public final Entity item;
-
- public ContainerItemPair(BalanceableContainer<?> container, Entity item) {
- this.container = container;
- this.item = checkNotNull(item);
- }
-
- @Override
- public String toString() {
- return ""+item+" @ "+container;
- }
- }
-
- // Pool constituent notifications.
- public static BasicNotificationSensor<Entity> CONTAINER_ADDED = new BasicNotificationSensor<Entity>(
- Entity.class, "balanceablepool.container.added", "Container added to balanceable pool");
- public static BasicNotificationSensor<Entity> CONTAINER_REMOVED = new BasicNotificationSensor<Entity>(
- Entity.class, "balanceablepool.container.removed", "Container removed from balanceable pool");
- public static BasicNotificationSensor<ContainerItemPair> ITEM_ADDED = new BasicNotificationSensor<ContainerItemPair>(
- ContainerItemPair.class, "balanceablepool.item.added", "Item added to balanceable pool");
- public static BasicNotificationSensor<ContainerItemPair> ITEM_REMOVED = new BasicNotificationSensor<ContainerItemPair>(
- ContainerItemPair.class, "balanceablepool.item.removed", "Item removed from balanceable pool");
- public static BasicNotificationSensor<ContainerItemPair> ITEM_MOVED = new BasicNotificationSensor<ContainerItemPair>(
- ContainerItemPair.class, "balanceablepool.item.moved", "Item moved in balanceable pool to the given container");
-
- public void setResizable(Resizable resizable);
-
- public void setContents(Group containerGroup, Group itemGroup);
-
- public Group getContainerGroup();
-
- public Group getItemGroup();
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d30ff597/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java
----------------------------------------------------------------------
diff --git a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java b/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java
deleted file mode 100644
index b3f9633..0000000
--- a/policy/src/main/java/brooklyn/policy/loadbalancing/BalanceableWorkerPoolImpl.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.policy.loadbalancing;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.entity.Group;
-import org.apache.brooklyn.api.event.Sensor;
-import org.apache.brooklyn.api.event.SensorEvent;
-import org.apache.brooklyn.api.event.SensorEventListener;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.AbstractEntity;
-import brooklyn.entity.basic.AbstractGroup;
-import brooklyn.entity.trait.Resizable;
-import brooklyn.entity.trait.Startable;
-
-/**
- * @see BalanceableWorkerPool
- */
-public class BalanceableWorkerPoolImpl extends AbstractEntity implements BalanceableWorkerPool {
-
- // FIXME Asymmetry between loadbalancing and followTheSun: ITEM_ADDED and ITEM_REMOVED in loadbalancing
- // are of type ContainerItemPair, but in followTheSun it is just the `Entity item`.
-
- private static final Logger LOG = LoggerFactory.getLogger(BalanceableWorkerPool.class);
-
- private Group containerGroup;
- private Group itemGroup;
- private Resizable resizable;
-
- private final Set<Entity> containers = Collections.synchronizedSet(new HashSet<Entity>());
- private final Set<Entity> items = Collections.synchronizedSet(new HashSet<Entity>());
-
- private final SensorEventListener<Object> eventHandler = new SensorEventListener<Object>() {
- @Override
- public void onEvent(SensorEvent<Object> event) {
- if (LOG.isTraceEnabled()) LOG.trace("{} received event {}", BalanceableWorkerPoolImpl.this, event);
- Entity source = event.getSource();
- Object value = event.getValue();
- Sensor<?> sensor = event.getSensor();
-
- if (sensor.equals(AbstractGroup.MEMBER_ADDED)) {
- if (source.equals(containerGroup)) {
- onContainerAdded((BalanceableContainer<?>) value);
- } else if (source.equals(itemGroup)) {
- onItemAdded((Entity)value);
- } else {
- throw new IllegalStateException("unexpected event source="+source);
- }
- } else if (sensor.equals(AbstractGroup.MEMBER_REMOVED)) {
- if (source.equals(containerGroup)) {
- onContainerRemoved((BalanceableContainer<?>) value);
- } else if (source.equals(itemGroup)) {
- onItemRemoved((Entity) value);
- } else {
- throw new IllegalStateException("unexpected event source="+source);
- }
- } else if (sensor.equals(Startable.SERVICE_UP)) {
- // TODO What if start has failed? Is there a sensor to indicate that?
- if ((Boolean)value) {
- onContainerUp((BalanceableContainer<?>) source);
- } else {
- onContainerDown((BalanceableContainer<?>) source);
- }
- } else if (sensor.equals(Movable.CONTAINER)) {
- onItemMoved(source, (BalanceableContainer<?>) value);
- } else {
- throw new IllegalStateException("Unhandled event type "+sensor+": "+event);
- }
- }
- };
-
- public BalanceableWorkerPoolImpl() {
- }
-
- @Override
- public void setResizable(Resizable resizable) {
- this.resizable = resizable;
- }
-
- @Override
- public void setContents(Group containerGroup, Group itemGroup) {
- this.containerGroup = containerGroup;
- this.itemGroup = itemGroup;
- if (resizable == null && containerGroup instanceof Resizable) resizable = (Resizable) containerGroup;
-
- subscribe(containerGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
- subscribe(containerGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
- subscribe(itemGroup, AbstractGroup.MEMBER_ADDED, eventHandler);
- subscribe(itemGroup, AbstractGroup.MEMBER_REMOVED, eventHandler);
-
- // Process extant containers and items
- for (Entity existingContainer : containerGroup.getMembers()) {
- onContainerAdded((BalanceableContainer<?>)existingContainer);
- }
- for (Entity existingItem : itemGroup.getMembers()) {
- onItemAdded(existingItem);
- }
- }
-
- @Override
- public Group getContainerGroup() {
- return containerGroup;
- }
-
- @Override
- public Group getItemGroup() {
- return itemGroup;
- }
-
- @Override
- public Integer getCurrentSize() {
- return containerGroup.getCurrentSize();
- }
-
- @Override
- public Integer resize(Integer desiredSize) {
- if (resizable != null) return resizable.resize(desiredSize);
-
- throw new UnsupportedOperationException("Container group is not resizable, and no resizable supplied: "+containerGroup+" of type "+(containerGroup != null ? containerGroup.getClass().getCanonicalName() : null));
- }
-
- private void onContainerAdded(BalanceableContainer<?> newContainer) {
- subscribe(newContainer, Startable.SERVICE_UP, eventHandler);
- if (!(newContainer instanceof Startable) || Boolean.TRUE.equals(newContainer.getAttribute(Startable.SERVICE_UP))) {
- onContainerUp(newContainer);
- }
- }
-
- private void onContainerUp(BalanceableContainer<?> newContainer) {
- if (containers.add(newContainer)) {
- emit(CONTAINER_ADDED, newContainer);
- }
- }
-
- private void onContainerDown(BalanceableContainer<?> oldContainer) {
- if (containers.remove(oldContainer)) {
- emit(CONTAINER_REMOVED, oldContainer);
- }
- }
-
- private void onContainerRemoved(BalanceableContainer<?> oldContainer) {
- unsubscribe(oldContainer);
- onContainerDown(oldContainer);
- }
-
- private void onItemAdded(Entity item) {
- if (items.add(item)) {
- subscribe(item, Movable.CONTAINER, eventHandler);
- emit(ITEM_ADDED, new ContainerItemPair(item.getAttribute(Movable.CONTAINER), item));
- }
- }
-
- private void onItemRemoved(Entity item) {
- if (items.remove(item)) {
- unsubscribe(item);
- emit(ITEM_REMOVED, new ContainerItemPair(null, item));
- }
- }
-
- private void onItemMoved(Entity item, BalanceableContainer<?> container) {
- emit(ITEM_MOVED, new ContainerItemPair(container, item));
- }
-}