You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/19 13:09:46 UTC
[28/72] [abbrv] incubator-brooklyn git commit: BROOKLYN-162 - apply
org.apache package prefix to software-base, tidying package names,
and moving a few sensory things to core
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/entity/software/winrm/WindowsPerformanceCounterSensors.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/software/winrm/WindowsPerformanceCounterSensors.java b/software/base/src/main/java/brooklyn/entity/software/winrm/WindowsPerformanceCounterSensors.java
deleted file mode 100644
index 05a0482..0000000
--- a/software/base/src/main/java/brooklyn/entity/software/winrm/WindowsPerformanceCounterSensors.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.entity.software.winrm;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.brooklyn.api.entity.EntityInitializer;
-import org.apache.brooklyn.api.internal.EntityLocal;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.entity.core.EntityInternal;
-import org.apache.brooklyn.sensor.core.Sensors;
-import org.apache.brooklyn.sensor.feed.windows.WindowsPerformanceCounterFeed;
-import org.apache.brooklyn.util.core.config.ConfigBag;
-import org.apache.brooklyn.util.text.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.reflect.TypeToken;
-
-public class WindowsPerformanceCounterSensors implements EntityInitializer {
-
- private static final Logger LOG = LoggerFactory.getLogger(WindowsPerformanceCounterSensors.class);
-
- public final static ConfigKey<Set<Map<String, String>>> PERFORMANCE_COUNTERS = ConfigKeys.newConfigKey(new TypeToken<Set<Map<String, String>>>(){}, "performance.counters");
-
- protected final Set<Map<String, String>> sensors;
-
- public WindowsPerformanceCounterSensors(ConfigBag params) {
- sensors = params.get(PERFORMANCE_COUNTERS);
- }
-
- public WindowsPerformanceCounterSensors(Map<String, String> params) {
- this(ConfigBag.newInstance(params));
- }
-
- @Override
- public void apply(EntityLocal entity) {
- WindowsPerformanceCounterFeed.Builder builder = WindowsPerformanceCounterFeed.builder()
- .entity(entity);
- for (Map<String, String> sensorConfig : sensors) {
- String name = sensorConfig.get("name");
- String sensorType = sensorConfig.get("sensorType");
- Class<?> clazz;
- try {
- clazz = Strings.isNonEmpty(sensorType)
- ? ((EntityInternal)entity).getManagementContext().getCatalog().getRootClassLoader().loadClass(sensorType)
- : String.class;
- } catch (ClassNotFoundException e) {
- throw new IllegalStateException("Could not load type "+sensorType+" for sensor "+name, e);
- }
- builder.addSensor(sensorConfig.get("counter"), Sensors.newSensor(clazz, name, sensorConfig.get("description")));
- }
- builder.build();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxAttributePollConfig.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxAttributePollConfig.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxAttributePollConfig.java
deleted file mode 100644
index ac8a27d..0000000
--- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxAttributePollConfig.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.event.feed.jmx;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.sensor.feed.PollConfig;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-
-public class JmxAttributePollConfig<T> extends PollConfig<Object, T, JmxAttributePollConfig<T>>{
-
- private ObjectName objectName;
- private String attributeName;
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public JmxAttributePollConfig(AttributeSensor<T> sensor) {
- super(sensor);
- onSuccess((Function)Functions.identity());
- }
-
- public JmxAttributePollConfig(JmxAttributePollConfig<T> other) {
- super(other);
- this.objectName = other.objectName;
- this.attributeName = other.attributeName;
- }
-
- public ObjectName getObjectName() {
- return objectName;
- }
-
- public String getAttributeName() {
- return attributeName;
- }
-
- public JmxAttributePollConfig<T> objectName(ObjectName val) {
- this.objectName = val; return this;
- }
-
- public JmxAttributePollConfig<T> objectName(String val) {
- try {
- return objectName(new ObjectName(val));
- } catch (MalformedObjectNameException e) {
- throw new IllegalArgumentException("Invalid object name ("+val+")", e);
- }
- }
-
- public JmxAttributePollConfig<T> attributeName(String val) {
- this.attributeName = val; return this;
- }
-
- @Override protected String toStringBaseName() { return "jmx"; }
- @Override protected String toStringPollSource() { return objectName+":"+attributeName; }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java
deleted file mode 100644
index 82e953b..0000000
--- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.event.feed.jmx;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import javax.management.Notification;
-import javax.management.NotificationFilter;
-import javax.management.NotificationListener;
-import javax.management.ObjectName;
-
-import org.apache.brooklyn.api.internal.EntityLocal;
-import org.apache.brooklyn.config.ConfigKey;
-import org.apache.brooklyn.core.config.ConfigKeys;
-import org.apache.brooklyn.sensor.feed.AbstractFeed;
-import org.apache.brooklyn.sensor.feed.AttributePollHandler;
-import org.apache.brooklyn.sensor.feed.DelegatingPollHandler;
-import org.apache.brooklyn.sensor.feed.PollHandler;
-import org.apache.brooklyn.sensor.feed.Poller;
-import org.apache.brooklyn.util.time.Duration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.basic.SoftwareProcessImpl;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
-import com.google.common.reflect.TypeToken;
-
-
-/**
- * Provides a feed of attribute values, by polling or subscribing over jmx.
- *
- * Example usage (e.g. in an entity that extends {@link SoftwareProcessImpl}):
- * <pre>
- * {@code
- * private JmxFeed feed;
- *
- * //@Override
- * protected void connectSensors() {
- * super.connectSensors();
- *
- * feed = JmxFeed.builder()
- * .entity(this)
- * .period(500, TimeUnit.MILLISECONDS)
- * .pollAttribute(new JmxAttributePollConfig<Integer>(ERROR_COUNT)
- * .objectName(requestProcessorMbeanName)
- * .attributeName("errorCount"))
- * .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP)
- * .objectName(serverMbeanName)
- * .attributeName("Started")
- * .onError(Functions.constant(false)))
- * .build();
- * }
- *
- * {@literal @}Override
- * protected void disconnectSensors() {
- * super.disconnectSensors();
- * if (feed != null) feed.stop();
- * }
- * }
- * </pre>
- *
- * @author aled
- */
-public class JmxFeed extends AbstractFeed {
-
- public static final Logger log = LoggerFactory.getLogger(JmxFeed.class);
-
- public static final long JMX_CONNECTION_TIMEOUT_MS = 120*1000;
-
- public static final ConfigKey<JmxHelper> HELPER = ConfigKeys.newConfigKey(JmxHelper.class, "helper");
- public static final ConfigKey<Boolean> OWN_HELPER = ConfigKeys.newBooleanConfigKey("ownHelper");
- public static final ConfigKey<String> JMX_URI = ConfigKeys.newStringConfigKey("jmxUri");
- public static final ConfigKey<Long> JMX_CONNECTION_TIMEOUT = ConfigKeys.newLongConfigKey("jmxConnectionTimeout");
-
- @SuppressWarnings("serial")
- public static final ConfigKey<SetMultimap<String, JmxAttributePollConfig<?>>> ATTRIBUTE_POLLS = ConfigKeys.newConfigKey(
- new TypeToken<SetMultimap<String, JmxAttributePollConfig<?>>>() {},
- "attributePolls");
-
- @SuppressWarnings("serial")
- public static final ConfigKey<SetMultimap<List<?>, JmxOperationPollConfig<?>>> OPERATION_POLLS = ConfigKeys.newConfigKey(
- new TypeToken<SetMultimap<List<?>, JmxOperationPollConfig<?>>>() {},
- "operationPolls");
-
- @SuppressWarnings("serial")
- public static final ConfigKey<SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>>> NOTIFICATION_SUBSCRIPTIONS = ConfigKeys.newConfigKey(
- new TypeToken<SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>>>() {},
- "notificationPolls");
-
- public static Builder builder() {
- return new Builder();
- }
-
- public static class Builder {
- private EntityLocal entity;
- private JmxHelper helper;
- private long jmxConnectionTimeout = JMX_CONNECTION_TIMEOUT_MS;
- private long period = 500;
- private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
- private List<JmxAttributePollConfig<?>> attributePolls = Lists.newArrayList();
- private List<JmxOperationPollConfig<?>> operationPolls = Lists.newArrayList();
- private List<JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = Lists.newArrayList();
- private String uniqueTag;
- private volatile boolean built;
-
- public Builder entity(EntityLocal val) {
- this.entity = val;
- return this;
- }
- public Builder helper(JmxHelper val) {
- this.helper = val;
- return this;
- }
- public Builder period(Duration duration) {
- return period(duration.toMilliseconds(), TimeUnit.MILLISECONDS);
- }
- public Builder period(long millis) {
- return period(millis, TimeUnit.MILLISECONDS);
- }
- public Builder period(long val, TimeUnit units) {
- this.period = val;
- this.periodUnits = units;
- return this;
- }
- public Builder pollAttribute(JmxAttributePollConfig<?> config) {
- attributePolls.add(config);
- return this;
- }
- public Builder pollOperation(JmxOperationPollConfig<?> config) {
- operationPolls.add(config);
- return this;
- }
- public Builder subscribeToNotification(JmxNotificationSubscriptionConfig<?> config) {
- notificationSubscriptions.add(config);
- return this;
- }
- public Builder uniqueTag(String uniqueTag) {
- this.uniqueTag = uniqueTag;
- return this;
- }
- public JmxFeed build() {
- built = true;
- JmxFeed result = new JmxFeed(this);
- result.setEntity(checkNotNull(entity, "entity"));
- result.start();
- return result;
- }
- @Override
- protected void finalize() {
- if (!built) log.warn("JmxFeed.Builder created, but build() never called");
- }
- }
-
- private final SetMultimap<ObjectName, NotificationListener> notificationListeners = HashMultimap.create();
-
- /**
- * For rebind; do not call directly; use builder
- */
- public JmxFeed() {
- }
-
- protected JmxFeed(Builder builder) {
- super();
- if (builder.helper != null) {
- JmxHelper helper = builder.helper;
- setConfig(HELPER, helper);
- setConfig(OWN_HELPER, false);
- setConfig(JMX_URI, helper.getUrl());
- }
- setConfig(JMX_CONNECTION_TIMEOUT, builder.jmxConnectionTimeout);
-
- SetMultimap<String, JmxAttributePollConfig<?>> attributePolls = HashMultimap.<String,JmxAttributePollConfig<?>>create();
- for (JmxAttributePollConfig<?> config : builder.attributePolls) {
- if (!config.isEnabled()) continue;
- @SuppressWarnings({ "rawtypes", "unchecked" })
- JmxAttributePollConfig<?> configCopy = new JmxAttributePollConfig(config);
- if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
- attributePolls.put(configCopy.getObjectName().getCanonicalName() + configCopy.getAttributeName(), configCopy);
- }
- setConfig(ATTRIBUTE_POLLS, attributePolls);
-
- SetMultimap<List<?>, JmxOperationPollConfig<?>> operationPolls = HashMultimap.<List<?>,JmxOperationPollConfig<?>>create();
- for (JmxOperationPollConfig<?> config : builder.operationPolls) {
- if (!config.isEnabled()) continue;
- @SuppressWarnings({ "rawtypes", "unchecked" })
- JmxOperationPollConfig<?> configCopy = new JmxOperationPollConfig(config);
- if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
- operationPolls.put(configCopy.buildOperationIdentity(), configCopy);
- }
- setConfig(OPERATION_POLLS, operationPolls);
-
- SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = HashMultimap.create();
- for (JmxNotificationSubscriptionConfig<?> config : builder.notificationSubscriptions) {
- if (!config.isEnabled()) continue;
- notificationSubscriptions.put(config.getNotificationFilter(), config);
- }
- setConfig(NOTIFICATION_SUBSCRIPTIONS, notificationSubscriptions);
- initUniqueTag(builder.uniqueTag, attributePolls, operationPolls, notificationSubscriptions);
- }
-
- @Override
- public void setEntity(EntityLocal entity) {
- if (getConfig(HELPER) == null) {
- JmxHelper helper = new JmxHelper(entity);
- setConfig(HELPER, helper);
- setConfig(OWN_HELPER, true);
- setConfig(JMX_URI, helper.getUrl());
- }
- super.setEntity(entity);
- }
-
- public String getJmxUri() {
- return getConfig(JMX_URI);
- }
-
- protected JmxHelper getHelper() {
- return getConfig(HELPER);
- }
-
- @SuppressWarnings("unchecked")
- protected Poller<Object> getPoller() {
- return (Poller<Object>) super.getPoller();
- }
-
- @Override
- protected boolean isConnected() {
- return super.isConnected() && getHelper().isConnected();
- }
-
- @Override
- protected void preStart() {
- /*
- * All actions on the JmxHelper are done async (through the poller's threading) so we don't
- * block on start for a long time (e.g. if the entity is not contactable and doing a rebind
- * on restart of brooklyn). Without that, one gets a 120 second pause with it stuck in a
- * stack trace like:
- *
- * at brooklyn.event.feed.jmx.JmxHelper.sleep(JmxHelper.java:640)
- * at brooklyn.event.feed.jmx.JmxHelper.connect(JmxHelper.java:320)
- * at brooklyn.event.feed.jmx.JmxFeed.preStart(JmxFeed.java:172)
- * at brooklyn.event.feed.AbstractFeed.start(AbstractFeed.java:68)
- * at brooklyn.event.feed.jmx.JmxFeed$Builder.build(JmxFeed.java:119)
- * at brooklyn.entity.java.JavaAppUtils.connectMXBeanSensors(JavaAppUtils.java:109)
- * at brooklyn.entity.java.VanillaJavaApp.connectSensors(VanillaJavaApp.java:97)
- * at brooklyn.entity.basic.SoftwareProcessImpl.callRebindHooks(SoftwareProcessImpl.java:189)
- * at brooklyn.entity.basic.SoftwareProcessImpl.rebind(SoftwareProcessImpl.java:235)
- * ...
- * at brooklyn.entity.rebind.RebindManagerImpl.rebind(RebindManagerImpl.java:184)
- */
- final SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = getConfig(NOTIFICATION_SUBSCRIPTIONS);
- final SetMultimap<List<?>, JmxOperationPollConfig<?>> operationPolls = getConfig(OPERATION_POLLS);
- final SetMultimap<String, JmxAttributePollConfig<?>> attributePolls = getConfig(ATTRIBUTE_POLLS);
-
- getPoller().submit(new Callable<Void>() {
- public Void call() {
- getHelper().connect(getConfig(JMX_CONNECTION_TIMEOUT));
- return null;
- }
- @Override public String toString() { return "Connect JMX "+getHelper().getUrl(); }
- });
-
- for (final NotificationFilter filter : notificationSubscriptions.keySet()) {
- getPoller().submit(new Callable<Void>() {
- public Void call() {
- // TODO Could config.getObjectName have wildcards? Is this code safe?
- Set<JmxNotificationSubscriptionConfig<?>> configs = notificationSubscriptions.get(filter);
- NotificationListener listener = registerNotificationListener(configs);
- ObjectName objectName = Iterables.get(configs, 0).getObjectName();
- notificationListeners.put(objectName, listener);
- return null;
- }
- @Override public String toString() { return "Register JMX notifications: "+notificationSubscriptions.get(filter); }
- });
- }
-
- // Setup polling of sensors
- for (final String jmxAttributeName : attributePolls.keys()) {
- registerAttributePoller(attributePolls.get(jmxAttributeName));
- }
-
- // Setup polling of operations
- for (final List<?> operationIdentifier : operationPolls.keys()) {
- registerOperationPoller(operationPolls.get(operationIdentifier));
- }
- }
-
- @Override
- protected void preStop() {
- super.preStop();
-
- for (Map.Entry<ObjectName, NotificationListener> entry : notificationListeners.entries()) {
- unregisterNotificationListener(entry.getKey(), entry.getValue());
- }
- notificationListeners.clear();
- }
-
- @Override
- protected void postStop() {
- super.postStop();
- JmxHelper helper = getHelper();
- Boolean ownHelper = getConfig(OWN_HELPER);
- if (helper != null && ownHelper) helper.terminate();
- }
-
- /**
- * Registers to poll a jmx-operation for an ObjectName, where all the given configs are for the same ObjectName + operation + parameters.
- */
- private void registerOperationPoller(Set<JmxOperationPollConfig<?>> configs) {
- Set<AttributePollHandler<? super Object>> handlers = Sets.newLinkedHashSet();
- long minPeriod = Integer.MAX_VALUE;
-
- final ObjectName objectName = Iterables.get(configs, 0).getObjectName();
- final String operationName = Iterables.get(configs, 0).getOperationName();
- final List<String> signature = Iterables.get(configs, 0).getSignature();
- final List<?> params = Iterables.get(configs, 0).getParams();
-
- for (JmxOperationPollConfig<?> config : configs) {
- handlers.add(new AttributePollHandler<Object>(config, getEntity(), this));
- if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
- }
-
- getPoller().scheduleAtFixedRate(
- new Callable<Object>() {
- public Object call() throws Exception {
- if (log.isDebugEnabled()) log.debug("jmx operation polling for {} sensors at {} -> {}", new Object[] {getEntity(), getJmxUri(), operationName});
- if (signature.size() == params.size()) {
- return getHelper().operation(objectName, operationName, signature, params);
- } else {
- return getHelper().operation(objectName, operationName, params.toArray());
- }
- }
- },
- new DelegatingPollHandler<Object>(handlers), minPeriod);
- }
-
- /**
- * Registers to poll a jmx-attribute for an ObjectName, where all the given configs are for that same ObjectName + attribute.
- */
- private void registerAttributePoller(Set<JmxAttributePollConfig<?>> configs) {
- Set<AttributePollHandler<? super Object>> handlers = Sets.newLinkedHashSet();
- long minPeriod = Integer.MAX_VALUE;
-
- final ObjectName objectName = Iterables.get(configs, 0).getObjectName();
- final String jmxAttributeName = Iterables.get(configs, 0).getAttributeName();
-
- for (JmxAttributePollConfig<?> config : configs) {
- handlers.add(new AttributePollHandler<Object>(config, getEntity(), this));
- if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
- }
-
- // TODO Not good calling this holding the synchronization lock
- getPoller().scheduleAtFixedRate(
- new Callable<Object>() {
- public Object call() throws Exception {
- if (log.isTraceEnabled()) log.trace("jmx attribute polling for {} sensors at {} -> {}", new Object[] {getEntity(), getJmxUri(), jmxAttributeName});
- return getHelper().getAttribute(objectName, jmxAttributeName);
- }
- },
- new DelegatingPollHandler<Object>(handlers), minPeriod);
- }
-
- /**
- * Registers to subscribe to notifications for an ObjectName, where all the given configs are for that same ObjectName + filter.
- */
- private NotificationListener registerNotificationListener(Set<JmxNotificationSubscriptionConfig<?>> configs) {
- final List<AttributePollHandler<? super javax.management.Notification>> handlers = Lists.newArrayList();
-
- final ObjectName objectName = Iterables.get(configs, 0).getObjectName();
- final NotificationFilter filter = Iterables.get(configs, 0).getNotificationFilter();
-
- for (final JmxNotificationSubscriptionConfig<?> config : configs) {
- AttributePollHandler<javax.management.Notification> handler = new AttributePollHandler<javax.management.Notification>(config, getEntity(), this) {
- @Override protected Object transformValueOnSuccess(javax.management.Notification val) {
- if (config.getOnNotification() != null) {
- return config.getOnNotification().apply(val);
- } else {
- Object result = super.transformValueOnSuccess(val);
- if (result instanceof javax.management.Notification)
- return ((javax.management.Notification)result).getUserData();
- return result;
- }
- }
- };
- handlers.add(handler);
- }
- final PollHandler<javax.management.Notification> compoundHandler = new DelegatingPollHandler<javax.management.Notification>(handlers);
-
- NotificationListener listener = new NotificationListener() {
- @Override public void handleNotification(Notification notification, Object handback) {
- compoundHandler.onSuccess(notification);
- }
- };
- getHelper().addNotificationListener(objectName, listener, filter);
-
- return listener;
- }
-
- private void unregisterNotificationListener(ObjectName objectName, NotificationListener listener) {
- try {
- getHelper().removeNotificationListener(objectName, listener);
- } catch (RuntimeException e) {
- log.warn("Failed to unregister listener: "+objectName+", "+listener+"; continuing...", e);
- }
- }
-
- @Override
- public String toString() {
- return "JmxFeed["+(getManagementContext()!=null&&getManagementContext().isRunning()?getJmxUri():"mgmt-not-running")+"]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxHelper.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxHelper.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxHelper.java
deleted file mode 100644
index cfc405e..0000000
--- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxHelper.java
+++ /dev/null
@@ -1,725 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.event.feed.jmx;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
-import groovy.time.TimeDuration;
-
-import java.io.IOException;
-import java.security.KeyStore;
-import java.security.PrivateKey;
-import java.security.cert.Certificate;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.WeakHashMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.management.AttributeNotFoundException;
-import javax.management.InstanceAlreadyExistsException;
-import javax.management.InstanceNotFoundException;
-import javax.management.InvalidAttributeValueException;
-import javax.management.JMX;
-import javax.management.ListenerNotFoundException;
-import javax.management.MBeanServerConnection;
-import javax.management.MalformedObjectNameException;
-import javax.management.NotCompliantMBeanException;
-import javax.management.NotificationFilter;
-import javax.management.NotificationListener;
-import javax.management.ObjectInstance;
-import javax.management.ObjectName;
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLSocketFactory;
-import javax.net.ssl.TrustManager;
-
-import org.apache.brooklyn.api.internal.EntityLocal;
-import org.apache.brooklyn.util.core.crypto.SecureKeys;
-import org.apache.brooklyn.util.crypto.SslTrustUtils;
-import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
-import org.apache.brooklyn.util.jmx.jmxmp.JmxmpAgent;
-import org.apache.brooklyn.util.repeat.Repeater;
-import org.apache.brooklyn.util.time.Duration;
-import org.apache.brooklyn.util.time.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.entity.java.JmxSupport;
-import brooklyn.entity.java.UsesJmx;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-
-public class JmxHelper {
-
- private static final Logger LOG = LoggerFactory.getLogger(JmxHelper.class);
-
- public static final String JMX_URL_FORMAT = "service:jmx:rmi:///jndi/rmi://%s:%d/%s";
- // first host:port may be ignored, so above is sufficient, but not sure
- public static final String RMI_JMX_URL_FORMAT = "service:jmx:rmi://%s:%d/jndi/rmi://%s:%d/%s";
- // jmxmp
- public static final String JMXMP_URL_FORMAT = "service:jmx:jmxmp://%s:%d";
-
- // Tracks the MBeans we have failed to find, with a set keyed off the url
- private static final Map<String, Set<ObjectName>> notFoundMBeansByUrl = Collections.synchronizedMap(new WeakHashMap<String, Set<ObjectName>>());
-
- public static final Map<String, String> CLASSES = ImmutableMap.<String,String>builder()
- .put("Integer", Integer.TYPE.getName())
- .put("Long", Long.TYPE.getName())
- .put("Boolean", Boolean.TYPE.getName())
- .put("Byte", Byte.TYPE.getName())
- .put("Character", Character.TYPE.getName())
- .put("Double", Double.TYPE.getName())
- .put("Float", Float.TYPE.getName())
- .put("GStringImpl", String.class.getName())
- .put("LinkedHashMap", Map.class.getName())
- .put("TreeMap", Map.class.getName())
- .put("HashMap", Map.class.getName())
- .put("ConcurrentHashMap", Map.class.getName())
- .put("TabularDataSupport", TabularData.class.getName())
- .put("CompositeDataSupport", CompositeData.class.getName())
- .build();
-
- /** constructs a JMX URL suitable for connecting to the given entity, being smart about JMX/RMI vs JMXMP */
- public static String toJmxUrl(EntityLocal entity) {
- String url = entity.getAttribute(UsesJmx.JMX_URL);
- if (url != null) {
- return url;
- } else {
- new JmxSupport(entity, null).setJmxUrl();
- url = entity.getAttribute(UsesJmx.JMX_URL);
- return Preconditions.checkNotNull(url, "Could not find URL for "+entity);
- }
- }
-
- /** constructs an RMI/JMX URL with the given inputs
- * (where the RMI Registry Port should be non-null, and at least one must be non-null) */
- public static String toRmiJmxUrl(String host, Integer jmxRmiServerPort, Integer rmiRegistryPort, String context) {
- if (rmiRegistryPort != null && rmiRegistryPort > 0) {
- if (jmxRmiServerPort!=null && jmxRmiServerPort > 0 && jmxRmiServerPort!=rmiRegistryPort) {
- // we have an explicit known JMX RMI server port (e.g. because we are using the agent),
- // distinct from the RMI registry port
- // (if the ports are the same, it is a short-hand, and don't use this syntax!)
- return String.format(RMI_JMX_URL_FORMAT, host, jmxRmiServerPort, host, rmiRegistryPort, context);
- }
- return String.format(JMX_URL_FORMAT, host, rmiRegistryPort, context);
- } else if (jmxRmiServerPort!=null && jmxRmiServerPort > 0) {
- LOG.warn("No RMI registry port set for "+host+"; attempting to use JMX port for RMI lookup");
- return String.format(JMX_URL_FORMAT, host, jmxRmiServerPort, context);
- } else {
- LOG.warn("No RMI/JMX details set for "+host+"; returning null");
- return null;
- }
- }
-
- /** constructs a JMXMP URL for connecting to the given host and port */
- public static String toJmxmpUrl(String host, Integer jmxmpPort) {
- return "service:jmx:jmxmp://"+host+(jmxmpPort!=null ? ":"+jmxmpPort : "");
- }
-
- final EntityLocal entity;
- final String url;
- final String user;
- final String password;
-
- private volatile transient JMXConnector connector;
- private volatile transient MBeanServerConnection connection;
- private transient boolean triedConnecting;
- private transient boolean failedReconnecting;
- private transient long failedReconnectingTime;
- private int minTimeBetweenReconnectAttempts = 1000;
- private final AtomicBoolean terminated = new AtomicBoolean();
-
- // Tracks the MBeans we have failed to find for this JmsHelper's connection URL (so can log just once for each)
- private final Set<ObjectName> notFoundMBeans;
-
- public JmxHelper(EntityLocal entity) {
- this(toJmxUrl(entity), entity, entity.getAttribute(UsesJmx.JMX_USER), entity.getAttribute(UsesJmx.JMX_PASSWORD));
-
- if (entity.getAttribute(UsesJmx.JMX_URL) == null) {
- entity.setAttribute(UsesJmx.JMX_URL, url);
- }
- }
-
- // TODO split this in to two classes, one for entities, and one entity-neutral
- // (simplifying set of constructors below)
-
- public JmxHelper(String url) {
- this(url, null, null);
- }
-
- public JmxHelper(String url, String user, String password) {
- this(url, null, user, password);
- }
-
- public JmxHelper(String url, EntityLocal entity, String user, String password) {
- this.url = url;
- this.entity = entity;
- this.user = user;
- this.password = password;
-
- synchronized (notFoundMBeansByUrl) {
- Set<ObjectName> set = notFoundMBeansByUrl.get(url);
- if (set == null) {
- set = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<ObjectName, Boolean>()));
- notFoundMBeansByUrl.put(url, set);
- }
- notFoundMBeans = set;
- }
- }
-
- public void setMinTimeBetweenReconnectAttempts(int val) {
- minTimeBetweenReconnectAttempts = val;
- }
-
- public String getUrl(){
- return url;
- }
-
- // ============== connection related calls =======================
-
- //for tesing purposes
- protected MBeanServerConnection getConnection() {
- return connection;
- }
-
- /**
- * Checks if the JmxHelper is connected. Returned value could be stale as soon
- * as it is received.
- *
- * This method is thread safe.
- *
- * @return true if connected, false otherwise.
- */
- public boolean isConnected() {
- return connection!=null;
- }
-
- /**
- * Reconnects. If it already is connected, it disconnects first.
- *
- * @throws IOException
- */
- public synchronized void reconnectWithRetryDampened() throws IOException {
- // If we've already tried reconnecting very recently, don't try again immediately
- if (failedReconnecting) {
- long timeSince = (System.currentTimeMillis() - failedReconnectingTime);
- if (timeSince < minTimeBetweenReconnectAttempts) {
- String msg = "Not reconnecting to JMX at "+url+" because attempt failed "+Time.makeTimeStringRounded(timeSince)+" ago";
- throw new IllegalStateException(msg);
- }
- }
-
- reconnect();
- }
-
- public synchronized void reconnect() throws IOException {
- disconnect();
-
- try {
- connect();
- failedReconnecting = false;
- } catch (Exception e) {
- if (failedReconnecting) {
- if (LOG.isDebugEnabled()) LOG.debug("unable to re-connect to JMX url (repeated failure): {}: {}", url, e);
- } else {
- LOG.debug("unable to re-connect to JMX url {} (rethrowing): {}", url, e);
- failedReconnecting = true;
- }
- failedReconnectingTime = System.currentTimeMillis();
- throw Throwables.propagate(e);
- }
- }
-
- /** attempts to connect immediately */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public synchronized void connect() throws IOException {
- if (terminated.get()) throw new IllegalStateException("JMX Helper "+this+" already terminated");
- if (connection != null) return;
-
- triedConnecting = true;
- if (connector != null) connector.close();
- JMXServiceURL serviceUrl = new JMXServiceURL(url);
- Map env = getConnectionEnvVars();
- try {
- connector = JMXConnectorFactory.connect(serviceUrl, env);
- } catch (NullPointerException npe) {
- //some software -- eg WSO2 -- will throw an NPE exception if the JMX connection can't be created, instead of an IOException.
- //this is a break of contract with the JMXConnectorFactory.connect method, so this code verifies if the NPE is
- //thrown by a known offender (wso2) and if so replaces the bad exception by a new IOException.
- //ideally WSO2 will fix this bug and we can remove this code.
- boolean thrownByWso2 = npe.getStackTrace()[0].toString().contains("org.wso2.carbon.core.security.CarbonJMXAuthenticator.authenticate");
- if (thrownByWso2) {
- throw new IOException("Failed to connect to url "+url+". NullPointerException is thrown, but replaced by an IOException to fix a WSO2 JMX problem", npe);
- } else {
- throw npe;
- }
- } catch (IOException e) {
- Exceptions.propagateIfFatal(e);
- if (terminated.get()) {
- throw new IllegalStateException("JMX Helper "+this+" already terminated", e);
- } else {
- throw e;
- }
- }
- connection = connector.getMBeanServerConnection();
-
- if (terminated.get()) {
- disconnectNow();
- throw new IllegalStateException("JMX Helper "+this+" already terminated");
- }
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- public Map getConnectionEnvVars() {
- Map env = new LinkedHashMap();
-
- if (groovyTruth(user) && groovyTruth(password)) {
- String[] creds = new String[] {user, password};
- env.put(JMXConnector.CREDENTIALS, creds);
- }
-
- if (entity!=null && groovyTruth(entity.getConfig(UsesJmx.JMX_SSL_ENABLED))) {
- env.put("jmx.remote.profiles", JmxmpAgent.TLS_JMX_REMOTE_PROFILES);
-
- PrivateKey key = entity.getConfig(UsesJmx.JMX_SSL_ACCESS_KEY);
- Certificate cert = entity.getConfig(UsesJmx.JMX_SSL_ACCESS_CERT);
- KeyStore ks = SecureKeys.newKeyStore();
- try {
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
- if (key!=null) {
- ks.setKeyEntry("brooklyn-jmx-access", key, "".toCharArray(), new Certificate[] { cert });
- }
- kmf.init(ks, "".toCharArray());
-
- TrustManager tms =
- // TODO use root cert for trusting server
- //trustStore!=null ? SecureKeys.getTrustManager(trustStore) :
- SslTrustUtils.TRUST_ALL;
-
- SSLContext ctx = SSLContext.getInstance("TLSv1");
- ctx.init(kmf.getKeyManagers(), new TrustManager[] { tms }, null);
- SSLSocketFactory ssf = ctx.getSocketFactory();
- env.put(JmxmpAgent.TLS_SOCKET_FACTORY_PROPERTY, ssf);
-
- } catch (Exception e) {
- LOG.warn("Error setting key "+key+" for "+entity+": "+e, e);
- }
- }
-
- return env;
- }
-
- /**
- * Continuously attempts to connect for at least the indicated amount of time; or indefinitely if -1. This method
- * is useful when you are not sure if the system you are trying to connect to already is up and running.
- *
- * This method doesn't throw an Exception, but returns true on success, false otherwise.
- *
- * TODO: What happens if already connected?
- *
- * @param timeoutMs
- * @return
- */
- public boolean connect(long timeoutMs) {
- if (LOG.isDebugEnabled()) LOG.debug("Connecting to JMX URL: {} ({})", url, ((timeoutMs == -1) ? "indefinitely" : timeoutMs+"ms timeout"));
- long startMs = System.currentTimeMillis();
- long endMs = (timeoutMs == -1) ? Long.MAX_VALUE : (startMs + timeoutMs);
- long currentTime = startMs;
- Throwable lastError = null;
- int attempt = 0;
- while (currentTime <= endMs) {
- currentTime = System.currentTimeMillis();
- if (attempt != 0) sleep(100); //sleep 100 to prevent thrashing and facilitate interruption
- if (LOG.isTraceEnabled()) LOG.trace("trying connection to {} at time {}", url, currentTime);
-
- try {
- connect();
- return true;
- } catch (Exception e) {
- Exceptions.propagateIfFatal(e);
- if (!terminated.get() && shouldRetryOn(e)) {
- if (LOG.isDebugEnabled()) LOG.debug("Attempt {} failed connecting to {} ({})", new Object[] {attempt + 1, url, e.getMessage()});
- lastError = e;
- } else {
- throw Exceptions.propagate(e);
- }
- }
- attempt++;
- }
- LOG.warn("unable to connect to JMX url: "+url, lastError);
- return false;
- }
-
- private boolean shouldRetryOn(Exception e) {
- // Expect SecurityException, IOException, etc.
- // But can also see things like javax.naming.ServiceUnavailableException with WSO2 app-servers.
- // So let's not try to second guess strange behaviours that future entities will exhibit.
- //
- // However, if it was our request that was invalid then not worth retrying.
-
- if (e instanceof AttributeNotFoundException) return false;
- if (e instanceof InstanceAlreadyExistsException) return false;
- if (e instanceof InstanceNotFoundException) return false;
- if (e instanceof InvalidAttributeValueException) return false;
- if (e instanceof ListenerNotFoundException) return false;
- if (e instanceof MalformedObjectNameException) return false;
- if (e instanceof NotCompliantMBeanException) return false;
- if (e instanceof InterruptedException) return false;
- if (e instanceof RuntimeInterruptedException) return false;
-
- return true;
- }
-
- /**
- * A thread-safe version of {@link #disconnectNow()}.
- *
- * This method is threadsafe.
- */
- public synchronized void disconnect() {
- disconnectNow();
- }
-
- /**
- * Disconnects, preventing subsequent connections to be made. Method doesn't throw an exception.
- *
- * Can safely be called if already disconnected.
- *
- * This method is not threadsafe, but will thus not block if
- * another thread is taking a long time for connections to timeout.
- *
- * Any concurrent requests will likely get an IOException - see
- * {@linkplain http://docs.oracle.com/javase/7/docs/api/javax/management/remote/JMXConnector.html#close()}.
- *
- */
- public void terminate() {
- terminated.set(true);
- disconnectNow();
- }
-
- protected void disconnectNow() {
- triedConnecting = false;
- if (connector != null) {
- if (LOG.isDebugEnabled()) LOG.debug("Disconnecting from JMX URL {}", url);
- try {
- connector.close();
- } catch (Exception e) {
- // close attempts to connect to close cleanly; and if it can't, it throws;
- // often we disconnect as part of shutdown, even if the other side has already stopped --
- // so swallow exceptions (no situations known where we need a clean closure on the remote side)
- if (LOG.isDebugEnabled()) LOG.debug("Caught exception disconnecting from JMX at {} ({})", url, e.getMessage());
- if (LOG.isTraceEnabled()) LOG.trace("Details for exception disconnecting JMX", e);
- } finally {
- connector = null;
- connection = null;
- }
- }
- }
-
- /**
- * Gets a usable MBeanServerConnection.
- *
- * Method is threadsafe.
- *
- * @returns the MBeanServerConnection
- * @throws IllegalStateException if not connected.
- */
- private synchronized MBeanServerConnection getConnectionOrFail() {
- if (isConnected())
- return getConnection();
-
- if (triedConnecting) {
- throw new IllegalStateException("Failed to connect to JMX at "+url);
- } else {
- String msg = "Not connected (and not attempted to connect) to JMX at "+url+
- (failedReconnecting ? (" (last reconnect failure at "+ Time.makeDateString(failedReconnectingTime) + ")") : "");
- throw new IllegalStateException(msg);
- }
- }
-
- private <T> T invokeWithReconnect(Callable<T> task) {
- try {
- return task.call();
- } catch (Exception e) {
- if (shouldRetryOn(e)) {
- try {
- reconnectWithRetryDampened();
- return task.call();
- } catch (Exception e2) {
- throw Throwables.propagate(e2);
- }
- } else {
- throw Throwables.propagate(e);
- }
- }
- }
-
- // ====================== query related calls =======================================
-
- /**
- * Converts from an object name pattern to a real object name, by querying with findMBean;
- * if no matching MBean can be found (or if more than one match found) then returns null.
- * If the supplied object name is not a pattern then just returns that. If the
- */
- public ObjectName toLiteralObjectName(ObjectName objectName) {
- if (checkNotNull(objectName, "objectName").isPattern()) {
- ObjectInstance bean = findMBean(objectName);
- return (bean != null) ? bean.getObjectName() : null;
- } else {
- return objectName;
- }
- }
-
- public Set<ObjectInstance> findMBeans(final ObjectName objectName) {
- return invokeWithReconnect(new Callable<Set<ObjectInstance>>() {
- public Set<ObjectInstance> call() throws Exception {
- return getConnectionOrFail().queryMBeans(objectName, null);
- }});
- }
-
- public ObjectInstance findMBean(ObjectName objectName) {
- Set<ObjectInstance> beans = findMBeans(objectName);
- if (beans.size() == 1) {
- notFoundMBeans.remove(objectName);
- return Iterables.getOnlyElement(beans);
- } else {
- boolean changed = notFoundMBeans.add(objectName);
-
- if (beans.size() > 1) {
- if (changed) {
- LOG.warn("JMX object name query returned {} values for {} at {}; ignoring all",
- new Object[] {beans.size(), objectName.getCanonicalName(), url});
- } else {
- if (LOG.isDebugEnabled()) LOG.debug("JMX object name query returned {} values for {} at {} (repeating); ignoring all",
- new Object[] {beans.size(), objectName.getCanonicalName(), url});
- }
- } else {
- if (changed) {
- LOG.warn("JMX object {} not found at {}", objectName.getCanonicalName(), url);
- } else {
- if (LOG.isDebugEnabled()) LOG.debug("JMX object {} not found at {} (repeating)", objectName.getCanonicalName(), url);
- }
- }
- return null;
- }
- }
-
- public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, Duration timeout) {
- return doesMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- }
- public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, TimeDuration timeout) {
- return doesMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- }
-
- public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, long timeoutMillis) {
- return doesMBeanExistsEventually(objectName, timeoutMillis, TimeUnit.MILLISECONDS);
- }
-
- public Set<ObjectInstance> doesMBeanExistsEventually(String objectName, Duration timeout) {
- return doesMBeanExistsEventually(createObjectName(objectName), timeout);
- }
- public Set<ObjectInstance> doesMBeanExistsEventually(String objectName, TimeDuration timeout) {
- return doesMBeanExistsEventually(createObjectName(objectName), timeout);
- }
-
- public Set<ObjectInstance> doesMBeanExistsEventually(String objectName, long timeout, TimeUnit timeUnit) {
- return doesMBeanExistsEventually(createObjectName(objectName), timeout, timeUnit);
- }
-
- /** returns set of beans found, with retry, empty set if none after timeout */
- public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, long timeout, TimeUnit timeUnit) {
- final long timeoutMillis = timeUnit.toMillis(timeout);
- final AtomicReference<Set<ObjectInstance>> beans = new AtomicReference<Set<ObjectInstance>>(ImmutableSet.<ObjectInstance>of());
- try {
- Repeater.create("Wait for "+objectName)
- .limitTimeTo(timeout, timeUnit)
- .every(500, TimeUnit.MILLISECONDS)
- .until(new Callable<Boolean>() {
- public Boolean call() {
- connect(timeoutMillis);
- beans.set(findMBeans(objectName));
- return !beans.get().isEmpty();
- }})
- .rethrowException()
- .run();
- return beans.get();
- } catch (Exception e) {
- throw Exceptions.propagate(e);
- }
- }
-
- public void assertMBeanExistsEventually(ObjectName objectName, Duration timeout) {
- assertMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- }
- public void assertMBeanExistsEventually(ObjectName objectName, TimeDuration timeout) {
- assertMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
- }
-
- public void assertMBeanExistsEventually(ObjectName objectName, long timeoutMillis) {
- assertMBeanExistsEventually(objectName, timeoutMillis, TimeUnit.MILLISECONDS);
- }
-
- public void assertMBeanExistsEventually(ObjectName objectName, long timeout, TimeUnit timeUnit) {
- Set<ObjectInstance> beans = doesMBeanExistsEventually(objectName, timeout, timeUnit);
- if (beans.size() != 1) {
- throw new IllegalStateException("MBean "+objectName+" not found within "+timeout+
- (beans.size() > 1 ? "; found multiple matches: "+beans : ""));
- }
- }
-
- /**
- * Returns a specific attribute for a JMX {@link ObjectName}.
- */
- public Object getAttribute(ObjectName objectName, final String attribute) {
- final ObjectName realObjectName = toLiteralObjectName(objectName);
-
- if (realObjectName != null) {
- Object result = invokeWithReconnect(new Callable<Object>() {
- public Object call() throws Exception {
- return getConnectionOrFail().getAttribute(realObjectName, attribute);
- }});
-
- if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx attribute {}.{}, got value {}", new Object[] {url, objectName.getCanonicalName(), attribute, result});
- return result;
- } else {
- return null;
- }
- }
-
- public void setAttribute(String objectName, String attribute, Object val) {
- setAttribute(createObjectName(objectName), attribute, val);
- }
-
- public void setAttribute(ObjectName objectName, final String attribute, final Object val) {
- final ObjectName realObjectName = toLiteralObjectName(objectName);
-
- if (realObjectName != null) {
- invokeWithReconnect(new Callable<Void>() {
- public Void call() throws Exception {
- getConnectionOrFail().setAttribute(realObjectName, new javax.management.Attribute(attribute, val));
- return null;
- }});
- if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx attribute {}.{}, set value {}", new Object[] {url, objectName.getCanonicalName(), attribute, val});
- } else {
- if (LOG.isDebugEnabled()) LOG.debug("From {}, cannot set attribute {}.{}, because mbean not found", new Object[] {url, objectName.getCanonicalName(), attribute});
- }
- }
-
- /** @see #operation(ObjectName, String, Object ...) */
- public Object operation(String objectName, String method, Object... arguments) {
- return operation(createObjectName(objectName), method, arguments);
- }
-
- /**
- * Executes an operation on a JMX {@link ObjectName}.
- */
- public Object operation(ObjectName objectName, final String method, final Object... arguments) {
- final ObjectName realObjectName = toLiteralObjectName(objectName);
- final String[] signature = new String[arguments.length];
- for (int i = 0; i < arguments.length; i++) {
- Class<?> clazz = arguments[i].getClass();
- signature[i] = (CLASSES.containsKey(clazz.getSimpleName()) ? CLASSES.get(clazz.getSimpleName()) : clazz.getName());
- }
-
- Object result = invokeWithReconnect(new Callable<Object>() {
- public Object call() throws Exception {
- return getConnectionOrFail().invoke(realObjectName, method, arguments, signature);
- }});
-
- if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx operation {}.{}({}), got value {}", new Object[] {url, realObjectName.getCanonicalName(), method, Arrays.asList(arguments),
- result});
- return result;
- }
-
- public void addNotificationListener(String objectName, NotificationListener listener) {
- addNotificationListener(createObjectName(objectName), listener, null);
- }
-
- public void addNotificationListener(String objectName, NotificationListener listener, NotificationFilter filter) {
- addNotificationListener(createObjectName(objectName), listener, filter);
- }
-
- public void addNotificationListener(ObjectName objectName, NotificationListener listener) {
- addNotificationListener(objectName, listener, null);
- }
-
- public void addNotificationListener(final ObjectName objectName, final NotificationListener listener, final NotificationFilter filter) {
- invokeWithReconnect(new Callable<Void>() {
- public Void call() throws Exception {
- getConnectionOrFail().addNotificationListener(objectName, listener, filter, null);
- return null;
- }});
- }
-
- public void removeNotificationListener(String objectName, NotificationListener listener) {
- removeNotificationListener(createObjectName(objectName), listener);
- }
-
- public void removeNotificationListener(final ObjectName objectName, final NotificationListener listener) {
- removeNotificationListener(objectName, listener, null);
- }
-
- public void removeNotificationListener(final ObjectName objectName, final NotificationListener listener, final NotificationFilter filter) {
- if (isConnected()) invokeWithReconnect(new Callable<Void>() {
- public Void call() throws Exception {
- getConnectionOrFail().removeNotificationListener(objectName, listener, filter, null);
- return null;
- }});
- }
-
- public <M> M getProxyObject(String objectName, Class<M> mbeanInterface) {
- return getProxyObject(createObjectName(objectName), mbeanInterface);
- }
-
- public <M> M getProxyObject(ObjectName objectName, Class<M> mbeanInterface) {
- MBeanServerConnection connection = getConnectionOrFail();
- return JMX.newMBeanProxy(connection, objectName, mbeanInterface, false);
- }
-
- public static ObjectName createObjectName(String name) {
- try {
- return new ObjectName(name);
- } catch (MalformedObjectNameException e) {
- throw Throwables.propagate(e);
- }
- }
-
- private static void sleep(long sleepTimeMillis) {
- try {
- Thread.sleep(sleepTimeMillis);
- } catch (InterruptedException e) {
- throw new RuntimeInterruptedException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationFilters.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationFilters.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationFilters.java
deleted file mode 100644
index 8cf5d62..0000000
--- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationFilters.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.event.feed.jmx;
-
-import javax.management.Notification;
-import javax.management.NotificationFilter;
-import javax.management.NotificationFilterSupport;
-
-public class JmxNotificationFilters {
-
- private JmxNotificationFilters() {} // instead use static utility methods
-
- /**
- * Matches the given notification type.
- * @see {@link NotificationFilterSupport#enableType(String)}
- */
- public static NotificationFilter matchesType(String type) {
- return matchesTypes(type);
- }
-
- /**
- * Matches any of the given notification types.
- * @see {@link NotificationFilterSupport#enableType(String)}
- */
- public static NotificationFilter matchesTypes(String... types) {
- NotificationFilterSupport result = new NotificationFilterSupport();
- for (String type : types) {
- result.enableType(type);
- }
- return result;
- }
-
- /**
- * @deprecated since 0.6.0;
- * only works if this brooklyn class is on the classpath of the JVM that your
- * subscribing to notifications on (because it tries to push the filter instance
- * to that JVM). So of very limited use in real-world java processes to be managed.
- * Therefore this will be deleted to avoid people hitting this surprising behaviour.
- */
- @SuppressWarnings("serial")
- public static NotificationFilter matchesTypeRegex(final String typeRegex) {
- return new NotificationFilter() {
- @Override public boolean isNotificationEnabled(Notification notif) {
- return notif.getType().matches(typeRegex);
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationSubscriptionConfig.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationSubscriptionConfig.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationSubscriptionConfig.java
deleted file mode 100644
index b27ffef..0000000
--- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxNotificationSubscriptionConfig.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.event.feed.jmx;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.Notification;
-import javax.management.NotificationFilter;
-import javax.management.ObjectName;
-
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.sensor.feed.FeedConfig;
-import org.apache.brooklyn.util.collections.MutableList;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-
-public class JmxNotificationSubscriptionConfig<T> extends FeedConfig<javax.management.Notification, T, JmxNotificationSubscriptionConfig<T>>{
-
- private ObjectName objectName;
- private NotificationFilter notificationFilter;
- private Function<Notification, T> onNotification;
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public JmxNotificationSubscriptionConfig(AttributeSensor<T> sensor) {
- super(sensor);
- onSuccess((Function)Functions.identity());
- }
-
- public JmxNotificationSubscriptionConfig(JmxNotificationSubscriptionConfig<T> other) {
- super(other);
- this.objectName = other.objectName;
- this.notificationFilter = other.notificationFilter;
- this.onNotification = other.onNotification;
- }
-
- public ObjectName getObjectName() {
- return objectName;
- }
-
- public NotificationFilter getNotificationFilter() {
- return notificationFilter;
- }
-
- public Function<Notification, T> getOnNotification() {
- return onNotification;
- }
-
- public JmxNotificationSubscriptionConfig<T> objectName(ObjectName val) {
- this.objectName = val; return this;
- }
-
- public JmxNotificationSubscriptionConfig<T> objectName(String val) {
- try {
- return objectName(new ObjectName(val));
- } catch (MalformedObjectNameException e) {
- throw new IllegalArgumentException("Invalid object name ("+val+")", e);
- }
- }
-
- public JmxNotificationSubscriptionConfig<T> notificationFilter(NotificationFilter val) {
- this.notificationFilter = val; return this;
- }
-
- public JmxNotificationSubscriptionConfig<T> onNotification(Function<Notification,T> val) {
- this.onNotification = val; return this;
- }
-
- @Override
- protected Object toStringPollSource() {
- return objectName;
- }
-
- @Override
- protected MutableList<Object> equalsFields() {
- return super.equalsFields()
- .appendIfNotNull(notificationFilter).appendIfNotNull(onNotification);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxOperationPollConfig.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxOperationPollConfig.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxOperationPollConfig.java
deleted file mode 100644
index 169f330..0000000
--- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxOperationPollConfig.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.event.feed.jmx;
-
-import java.util.Collections;
-import java.util.List;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.sensor.feed.PollConfig;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-
-public class JmxOperationPollConfig<T> extends PollConfig<Object, T, JmxOperationPollConfig<T>>{
-
- private ObjectName objectName;
- private String operationName;
- private List<String> signature = Collections.emptyList();
- private List<?> params = Collections.emptyList();
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public JmxOperationPollConfig(AttributeSensor<T> sensor) {
- super(sensor);
- onSuccess((Function)Functions.identity());
- }
-
- public JmxOperationPollConfig(JmxOperationPollConfig<T> other) {
- super(other);
- this.objectName = other.objectName;
- this.operationName = other.operationName;
- this.signature = other.signature != null ? ImmutableList.copyOf(other.signature) : null;
- this.params = other.params != null ? ImmutableList.copyOf(other.params) : null;
- }
-
- public ObjectName getObjectName() {
- return objectName;
- }
-
- public String getOperationName() {
- return operationName;
- }
-
- public List<String> getSignature() {
- return signature;
- }
-
- public List<?> getParams() {
- return params;
- }
-
- public JmxOperationPollConfig<T> objectName(ObjectName val) {
- this.objectName = val; return this;
- }
-
- public JmxOperationPollConfig<T> objectName(String val) {
- try {
- return objectName(new ObjectName(val));
- } catch (MalformedObjectNameException e) {
- throw new IllegalArgumentException("Invalid object name ("+val+")", e);
- }
- }
-
- public JmxOperationPollConfig<T> operationName(String val) {
- this.operationName = val; return this;
- }
-
- public JmxOperationPollConfig<T> operationSignature(List<String> val) {
- this.signature = val; return this;
- }
-
- public JmxOperationPollConfig<T> operationParams(List<?> val) {
- this.params = val; return this;
- }
-
- public List<?> buildOperationIdentity() {
- // FIXME Have a build() method for ensuring signature is set, and making class subsequently immutable?
- return ImmutableList.of(operationName, buildSignature(), params);
- }
-
- private List<String> buildSignature() {
- if (signature != null && signature.size() == params.size()) {
- return signature;
- } else {
- List<String> derivedSignature = Lists.newLinkedList();
- for (Object param : params) {
- Class<?> clazz = (param != null) ? param.getClass() : null;
- String clazzName = (clazz != null) ?
- (JmxHelper.CLASSES.containsKey(clazz.getSimpleName()) ?
- JmxHelper.CLASSES.get(clazz.getSimpleName()) : clazz.getName()) :
- Object.class.getName();
- derivedSignature.add(clazzName);
- }
- return derivedSignature;
- }
- }
-
- @Override protected String toStringBaseName() { return "jmx"; }
- @Override protected String toStringPollSource() { return objectName+":"+operationName+(params!=null ? params : "[]"); }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/brooklyn/event/feed/jmx/JmxValueFunctions.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxValueFunctions.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxValueFunctions.java
deleted file mode 100644
index 0b61b42..0000000
--- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxValueFunctions.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package brooklyn.event.feed.jmx;
-
-import java.util.List;
-import java.util.Map;
-
-import javax.management.openmbean.CompositeData;
-import javax.management.openmbean.TabularData;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Maps;
-
-public class JmxValueFunctions {
-
- private static final Logger log = LoggerFactory.getLogger(JmxValueFunctions.class);
-
- /**
- * @return a closure that converts a TabularDataSupport to a map.
- */
- public static Function<TabularData, Map> tabularDataToMap() {
- return new Function<TabularData, Map>() {
- @Override public Map apply(TabularData input) {
- return tabularDataToMap(input);
- }};
- }
-
- public static Function<TabularData, Map> tabularDataToMapOfMaps() {
- return new Function<TabularData, Map>() {
- @Override public Map apply(TabularData input) {
- return tabularDataToMapOfMaps(input);
- }};
- }
-
- public static Function<CompositeData,Map> compositeDataToMap() {
- return new Function<CompositeData, Map>() {
- @Override public Map apply(CompositeData input) {
- return compositeDataToMap(input);
- }};
- }
-
- public static Map tabularDataToMap(TabularData table) {
- Map<String, Object> result = Maps.newLinkedHashMap();
- for (Object entry : table.values()) {
- CompositeData data = (CompositeData) entry; //.getValue()
- for (String key : data.getCompositeType().keySet()) {
- Object old = result.put(key, data.get(key));
- if (old != null) {
- log.warn("tablularDataToMap has overwritten key {}", key);
- }
- }
- }
- return result;
- }
-
- public static Map<List<?>, Map<String, Object>> tabularDataToMapOfMaps(TabularData table) {
- Map<List<?>, Map<String, Object>> result = Maps.newLinkedHashMap();
- for (Object k : table.keySet()) {
- final Object[] kValues = ((List<?>)k).toArray();
- CompositeData v = (CompositeData) table.get(kValues);
- result.put((List<?>)k, compositeDataToMap(v));
- }
- return result;
- }
-
- public static Map<String, Object> compositeDataToMap(CompositeData data) {
- Map<String, Object> result = Maps.newLinkedHashMap();
- for (String key : data.getCompositeType().keySet()) {
- Object old = result.put(key, data.get(key));
- if (old != null) {
- log.warn("compositeDataToMap has overwritten key {}", key);
- }
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynCluster.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynCluster.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynCluster.java
new file mode 100644
index 0000000..daf7369
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynCluster.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.brooklynnode;
+
+import java.util.Map;
+
+import org.apache.brooklyn.api.effector.Effector;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.entity.ImplementedBy;
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.effector.core.Effectors;
+import org.apache.brooklyn.entity.brooklynnode.effector.BrooklynNodeUpgradeEffectorBody;
+import org.apache.brooklyn.entity.group.DynamicCluster;
+import org.apache.brooklyn.sensor.core.Sensors;
+
+@ImplementedBy(BrooklynClusterImpl.class)
+public interface BrooklynCluster extends DynamicCluster {
+
+ ConfigKey<EntitySpec<?>> MEMBER_SPEC = ConfigKeys.newConfigKeyWithDefault(DynamicCluster.MEMBER_SPEC,
+ EntitySpec.create(BrooklynNode.class));
+
+ AttributeSensor<BrooklynNode> MASTER_NODE = Sensors.newSensor(
+ BrooklynNode.class, "brooklyncluster.master", "Pointer to the child node with MASTER state in the cluster");
+
+ interface SelectMasterEffector {
+ ConfigKey<String> NEW_MASTER_ID = ConfigKeys.newStringConfigKey(
+ "brooklyncluster.new_master_id", "The ID of the node to become master", null);
+ Effector<Void> SELECT_MASTER = Effectors.effector(Void.class, "selectMaster")
+ .description("Select a new master in the cluster")
+ .parameter(NEW_MASTER_ID)
+ .buildAbstract();
+ }
+
+ Effector<Void> SELECT_MASTER = SelectMasterEffector.SELECT_MASTER;
+
+ interface UpgradeClusterEffector {
+ ConfigKey<String> DOWNLOAD_URL = BrooklynNode.DOWNLOAD_URL.getConfigKey();
+ ConfigKey<Map<String,Object>> EXTRA_CONFIG = BrooklynNodeUpgradeEffectorBody.EXTRA_CONFIG;
+
+ Effector<Void> UPGRADE_CLUSTER = Effectors.effector(Void.class, "upgradeCluster")
+ .description("Upgrade the cluster with new distribution version, "
+ + "by provisioning new nodes with the new version, failing over, "
+ + "and then deprovisioning the original nodes")
+ .parameter(BrooklynNode.SUGGESTED_VERSION)
+ .parameter(DOWNLOAD_URL)
+ .parameter(EXTRA_CONFIG)
+ .buildAbstract();
+ }
+
+ Effector<Void> UPGRADE_CLUSTER = UpgradeClusterEffector.UPGRADE_CLUSTER;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
new file mode 100644
index 0000000..61b0ba5
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/brooklynnode/BrooklynClusterImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.brooklynnode;
+
+import java.util.Collection;
+import java.util.concurrent.Callable;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.mgmt.ha.ManagementNodeState;
+import org.apache.brooklyn.entity.brooklynnode.effector.BrooklynClusterUpgradeEffectorBody;
+import org.apache.brooklyn.entity.brooklynnode.effector.SelectMasterEffectorBody;
+import org.apache.brooklyn.entity.core.EntityFunctions;
+import org.apache.brooklyn.entity.core.EntityPredicates;
+import org.apache.brooklyn.entity.group.DynamicClusterImpl;
+import org.apache.brooklyn.entity.lifecycle.ServiceStateLogic;
+import org.apache.brooklyn.entity.lifecycle.ServiceStateLogic.ServiceProblemsLogic;
+import org.apache.brooklyn.sensor.enricher.Enrichers;
+import org.apache.brooklyn.sensor.feed.function.FunctionFeed;
+import org.apache.brooklyn.sensor.feed.function.FunctionPollConfig;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+
+public class BrooklynClusterImpl extends DynamicClusterImpl implements BrooklynCluster {
+
+ private static final String MSG_NO_MASTER = "No master node in cluster";
+ private static final String MSG_TOO_MANY_MASTERS = "Too many master nodes in cluster";
+
+ private static final Logger LOG = LoggerFactory.getLogger(BrooklynClusterImpl.class);
+
+ // TODO should we set a default MEMBER_SPEC ? difficult though because we'd need to set a password
+
+ @Override
+ public void init() {
+ super.init();
+ getMutableEntityType().addEffector(SelectMasterEffectorBody.SELECT_MASTER);
+ getMutableEntityType().addEffector(BrooklynClusterUpgradeEffectorBody.UPGRADE_CLUSTER);
+
+ ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_NO_MASTER);
+ addFeed(FunctionFeed.builder()
+ .entity(this)
+ .poll(new FunctionPollConfig<Object, BrooklynNode>(MASTER_NODE)
+ .period(Duration.ONE_SECOND)
+ .callable(new MasterChildFinder()))
+ .build());
+
+ addEnricher( Enrichers.builder().transforming(MASTER_NODE)
+ .uniqueTag("master-node-web-uri")
+ .publishing(BrooklynNode.WEB_CONSOLE_URI)
+ .computing(EntityFunctions.attribute(BrooklynNode.WEB_CONSOLE_URI))
+ .build() );
+ }
+
+ private final class MasterChildFinder implements Callable<BrooklynNode> {
+ @Override
+ public BrooklynNode call() throws Exception {
+ return findMasterChild();
+ }
+ }
+
+ BrooklynNode findMasterChild() {
+ Collection<Entity> masters = FluentIterable.from(getMembers())
+ .filter(EntityPredicates.attributeEqualTo(BrooklynNode.MANAGEMENT_NODE_STATE, ManagementNodeState.MASTER))
+ .toList();
+
+ if (masters.size() == 0) {
+ ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_NO_MASTER);
+ return null;
+
+ } else if (masters.size() == 1) {
+ ServiceStateLogic.ServiceProblemsLogic.clearProblemsIndicator(this, MASTER_NODE);
+ return (BrooklynNode)Iterables.getOnlyElement(masters);
+
+ } else if (masters.size() == 2) {
+ LOG.warn("Two masters detected, probably a handover just occured: " + masters);
+
+ //Don't clearProblemsIndicator - if there were no masters previously why have two now.
+ //But also don't set it. Probably hit a window where we have a new master
+ //its BrooklynNode picked it up, but the BrooklynNode
+ //for the old master hasn't refreshed its state yet.
+ //Just pick one of them, should sort itself out in next update.
+
+ //TODO Do set such indicator if this continues for an extended period of time
+
+ return (BrooklynNode)masters.iterator().next();
+
+ } else {
+ ServiceProblemsLogic.updateProblemsIndicator(this, MASTER_NODE, MSG_TOO_MANY_MASTERS);
+ String msg = "Multiple (>=3) master nodes in cluster: " + masters;
+ LOG.error(msg);
+ throw new IllegalStateException(msg);
+
+ }
+ }
+
+}