You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by he...@apache.org on 2015/08/19 13:09:37 UTC
[19/72] [abbrv] incubator-brooklyn git commit: BROOKLYN-162 - apply
org.apache package prefix to software-base, tidying package names,
and moving a few sensory things to core
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
new file mode 100644
index 0000000..0bc98d1
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceEnricher.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.system_service;
+
+import java.util.Set;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.internal.EntityLocal;
+import org.apache.brooklyn.api.mgmt.ExecutionContext;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.api.sensor.Enricher;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags.WrappedStream;
+import org.apache.brooklyn.effector.core.EffectorTasks;
+import org.apache.brooklyn.entity.core.Attributes;
+import org.apache.brooklyn.entity.core.Entities;
+import org.apache.brooklyn.entity.software.base.SoftwareProcess;
+import org.apache.brooklyn.location.basic.SshMachineLocation;
+import org.apache.brooklyn.sensor.enricher.AbstractEnricher;
+import org.apache.brooklyn.util.core.task.BasicExecutionManager;
+import org.apache.brooklyn.util.core.task.DynamicTasks;
+import org.apache.brooklyn.util.core.task.TaskBuilder;
+import org.apache.brooklyn.util.core.task.ssh.SshPutTaskWrapper;
+import org.apache.brooklyn.util.core.task.ssh.SshTasks;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskFactory;
+import org.apache.brooklyn.util.core.task.system.ProcessTaskWrapper;
+import org.apache.brooklyn.util.net.Urls;
+
+import com.google.common.collect.ImmutableSet;
+
+public class SystemServiceEnricher extends AbstractEnricher implements Enricher {
+ public static final String DEFAULT_ENRICHER_UNIQUE_TAG = "systemService.tag";
+ protected static final Set<String> LAUNCH_EFFECTOR_NAMES = ImmutableSet.of("start", "restart");
+ public static final ConfigKey<String> LAUNCH_SCRIPT_NAME = ConfigKeys.newStringConfigKey(
+ "service.script_name", "The name of the launch script to be created in the runtime directory of the entity.", "service-launch.sh");
+ public static final ConfigKey<String> SERVICE_NAME = ConfigKeys.newStringConfigKey(
+ "service.name", "The name of the system service. Can use ${entity_name} and ${id} variables to template the value.", "${entity_name}-${id}");
+
+ @Override
+ public void setEntity(EntityLocal entity) {
+ super.setEntity(entity);
+ subscribeLaunch();
+ uniqueTag = DEFAULT_ENRICHER_UNIQUE_TAG;
+ }
+
+ private void subscribeLaunch() {
+ subscribe(entity, Attributes.SERVICE_STATE_ACTUAL, new EntityLaunchListener(this));
+ }
+
+ public void onLaunched(Task<?> task) {
+ WrappedStream streamStdin = BrooklynTaskTags.stream(task, BrooklynTaskTags.STREAM_STDIN);
+ if (streamStdin == null) return;
+
+ WrappedStream streamEnv = BrooklynTaskTags.stream(task, BrooklynTaskTags.STREAM_ENV);
+ String stdin = streamStdin.streamContents.get();
+ String env = streamEnv.streamContents.get();
+
+ final SshMachineLocation sshMachine = EffectorTasks.getSshMachine(entity);
+ final String launchScriptPath = Urls.mergePaths(getRunDir(), getStartScriptName());
+
+ Task<Void> installerTask = TaskBuilder.<Void>builder()
+ .name("install (service)")
+ .description("Install as a system service")
+ .body(new Runnable() {
+ @Override
+ public void run() {
+ ProcessTaskFactory<Integer> taskFactory = SshTasks.newSshExecTaskFactory(sshMachine, "[ -e '" + launchScriptPath + "' ]")
+ .summary("check installed")
+ .allowingNonZeroExitCode();
+ boolean isInstalled = DynamicTasks.queue(taskFactory).get() == 0;
+ if (!isInstalled) {
+ Task<?> serviceInstallTask = SystemServiceInstallerFactory.of(entity, SystemServiceEnricher.this).getServiceInstallTask();
+ DynamicTasks.queue(serviceInstallTask);
+ }
+ }
+ })
+ .build();
+
+ SshPutTaskWrapper updateLaunchScriptTask = SshTasks.newSshPutTaskFactory(sshMachine, launchScriptPath).contents(getLaunchScript(stdin, env)).newTask();
+ ProcessTaskWrapper<Integer> makeExecutableTask = SshTasks.newSshExecTaskFactory(sshMachine, "chmod +x " + launchScriptPath)
+ .requiringExitCodeZero()
+ .newTask();
+ Task<Void> udpateTask = TaskBuilder.<Void>builder()
+ .name("update-launch")
+ .description("Update launch script used by the system service")
+ .add(updateLaunchScriptTask)
+ .add(makeExecutableTask)
+ .build();
+
+ Task<Void> updateService = TaskBuilder.<Void>builder()
+ .name("update-system-service")
+ .description("Update system service")
+ .add(installerTask)
+ .add(udpateTask)
+ .tag(BrooklynTaskTags.tagForContextEntity(entity))
+ .tag(BrooklynTaskTags.NON_TRANSIENT_TASK_TAG)
+ .build();
+
+ submitTopLevel(updateService);
+ }
+
+ private void submitTopLevel(Task<Void> updateService) {
+ Task<?> currentTask = BasicExecutionManager.getPerThreadCurrentTask().get();
+ BasicExecutionManager.getPerThreadCurrentTask().set(null);
+ try {
+ Entities.submit(entity, updateService);
+ } finally {
+ BasicExecutionManager.getPerThreadCurrentTask().set(currentTask);
+ }
+ }
+
+ private String getLaunchScript(String stdin, String env) {
+ // (?m) - multiline enable
+ // insert export at beginning of each line
+ return env.replaceAll("(?m)^", "export ") + "\n" + stdin;
+ }
+
+ private String getRunDir() {
+ return entity.getAttribute(SoftwareProcess.RUN_DIR);
+ }
+
+ private String getStartScriptName() {
+ return config().get(LAUNCH_SCRIPT_NAME);
+ }
+
+ ExecutionContext getEntityExecutionContext() {
+ return getManagementContext().getExecutionContext(entity);
+ }
+
+ protected Entity getEntity() {
+ return entity;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceInstaller.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceInstaller.java b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceInstaller.java
new file mode 100644
index 0000000..ee4f242
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceInstaller.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.system_service;
+
+import org.apache.brooklyn.api.mgmt.Task;
+
+public interface SystemServiceInstaller {
+ Task<?> getServiceInstallTask();
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceInstallerFactory.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceInstallerFactory.java b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceInstallerFactory.java
new file mode 100644
index 0000000..20edfd2
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/entity/system_service/SystemServiceInstallerFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.entity.system_service;
+
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.sensor.Enricher;
+
+public class SystemServiceInstallerFactory {
+ public static SystemServiceInstaller of(Entity entity, Enricher systemServiceEnricher) {
+ return new InitdServiceInstaller(entity, systemServiceEnricher);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxAttributePollConfig.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxAttributePollConfig.java b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxAttributePollConfig.java
new file mode 100644
index 0000000..02bbbeb
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxAttributePollConfig.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.sensor.feed.jmx;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.sensor.feed.PollConfig;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+
+public class JmxAttributePollConfig<T> extends PollConfig<Object, T, JmxAttributePollConfig<T>>{
+
+ private ObjectName objectName;
+ private String attributeName;
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public JmxAttributePollConfig(AttributeSensor<T> sensor) {
+ super(sensor);
+ onSuccess((Function)Functions.identity());
+ }
+
+ public JmxAttributePollConfig(JmxAttributePollConfig<T> other) {
+ super(other);
+ this.objectName = other.objectName;
+ this.attributeName = other.attributeName;
+ }
+
+ public ObjectName getObjectName() {
+ return objectName;
+ }
+
+ public String getAttributeName() {
+ return attributeName;
+ }
+
+ public JmxAttributePollConfig<T> objectName(ObjectName val) {
+ this.objectName = val; return this;
+ }
+
+ public JmxAttributePollConfig<T> objectName(String val) {
+ try {
+ return objectName(new ObjectName(val));
+ } catch (MalformedObjectNameException e) {
+ throw new IllegalArgumentException("Invalid object name ("+val+")", e);
+ }
+ }
+
+ public JmxAttributePollConfig<T> attributeName(String val) {
+ this.attributeName = val; return this;
+ }
+
+ @Override protected String toStringBaseName() { return "jmx"; }
+ @Override protected String toStringPollSource() { return objectName+":"+attributeName; }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxFeed.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxFeed.java b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxFeed.java
new file mode 100644
index 0000000..3c1897e
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxFeed.java
@@ -0,0 +1,436 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.sensor.feed.jmx;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.Notification;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.ObjectName;
+
+import org.apache.brooklyn.api.internal.EntityLocal;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.entity.software.base.SoftwareProcessImpl;
+import org.apache.brooklyn.sensor.feed.AbstractFeed;
+import org.apache.brooklyn.sensor.feed.AttributePollHandler;
+import org.apache.brooklyn.sensor.feed.DelegatingPollHandler;
+import org.apache.brooklyn.sensor.feed.PollHandler;
+import org.apache.brooklyn.sensor.feed.Poller;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+import com.google.common.reflect.TypeToken;
+
+
+/**
+ * Provides a feed of attribute values, by polling or subscribing over jmx.
+ *
+ * Example usage (e.g. in an entity that extends {@link SoftwareProcessImpl}):
+ * <pre>
+ * {@code
+ * private JmxFeed feed;
+ *
+ * //@Override
+ * protected void connectSensors() {
+ * super.connectSensors();
+ *
+ * feed = JmxFeed.builder()
+ * .entity(this)
+ * .period(500, TimeUnit.MILLISECONDS)
+ * .pollAttribute(new JmxAttributePollConfig<Integer>(ERROR_COUNT)
+ * .objectName(requestProcessorMbeanName)
+ * .attributeName("errorCount"))
+ * .pollAttribute(new JmxAttributePollConfig<Boolean>(SERVICE_UP)
+ * .objectName(serverMbeanName)
+ * .attributeName("Started")
+ * .onError(Functions.constant(false)))
+ * .build();
+ * }
+ *
+ * {@literal @}Override
+ * protected void disconnectSensors() {
+ * super.disconnectSensors();
+ * if (feed != null) feed.stop();
+ * }
+ * }
+ * </pre>
+ *
+ * @author aled
+ */
+public class JmxFeed extends AbstractFeed {
+
+ public static final Logger log = LoggerFactory.getLogger(JmxFeed.class);
+
+ public static final long JMX_CONNECTION_TIMEOUT_MS = 120*1000;
+
+ public static final ConfigKey<JmxHelper> HELPER = ConfigKeys.newConfigKey(JmxHelper.class, "helper");
+ public static final ConfigKey<Boolean> OWN_HELPER = ConfigKeys.newBooleanConfigKey("ownHelper");
+ public static final ConfigKey<String> JMX_URI = ConfigKeys.newStringConfigKey("jmxUri");
+ public static final ConfigKey<Long> JMX_CONNECTION_TIMEOUT = ConfigKeys.newLongConfigKey("jmxConnectionTimeout");
+
+ @SuppressWarnings("serial")
+ public static final ConfigKey<SetMultimap<String, JmxAttributePollConfig<?>>> ATTRIBUTE_POLLS = ConfigKeys.newConfigKey(
+ new TypeToken<SetMultimap<String, JmxAttributePollConfig<?>>>() {},
+ "attributePolls");
+
+ @SuppressWarnings("serial")
+ public static final ConfigKey<SetMultimap<List<?>, JmxOperationPollConfig<?>>> OPERATION_POLLS = ConfigKeys.newConfigKey(
+ new TypeToken<SetMultimap<List<?>, JmxOperationPollConfig<?>>>() {},
+ "operationPolls");
+
+ @SuppressWarnings("serial")
+ public static final ConfigKey<SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>>> NOTIFICATION_SUBSCRIPTIONS = ConfigKeys.newConfigKey(
+ new TypeToken<SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>>>() {},
+ "notificationPolls");
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private EntityLocal entity;
+ private JmxHelper helper;
+ private long jmxConnectionTimeout = JMX_CONNECTION_TIMEOUT_MS;
+ private long period = 500;
+ private TimeUnit periodUnits = TimeUnit.MILLISECONDS;
+ private List<JmxAttributePollConfig<?>> attributePolls = Lists.newArrayList();
+ private List<JmxOperationPollConfig<?>> operationPolls = Lists.newArrayList();
+ private List<JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = Lists.newArrayList();
+ private String uniqueTag;
+ private volatile boolean built;
+
+ public Builder entity(EntityLocal val) {
+ this.entity = val;
+ return this;
+ }
+ public Builder helper(JmxHelper val) {
+ this.helper = val;
+ return this;
+ }
+ public Builder period(Duration duration) {
+ return period(duration.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+ public Builder period(long millis) {
+ return period(millis, TimeUnit.MILLISECONDS);
+ }
+ public Builder period(long val, TimeUnit units) {
+ this.period = val;
+ this.periodUnits = units;
+ return this;
+ }
+ public Builder pollAttribute(JmxAttributePollConfig<?> config) {
+ attributePolls.add(config);
+ return this;
+ }
+ public Builder pollOperation(JmxOperationPollConfig<?> config) {
+ operationPolls.add(config);
+ return this;
+ }
+ public Builder subscribeToNotification(JmxNotificationSubscriptionConfig<?> config) {
+ notificationSubscriptions.add(config);
+ return this;
+ }
+ public Builder uniqueTag(String uniqueTag) {
+ this.uniqueTag = uniqueTag;
+ return this;
+ }
+ public JmxFeed build() {
+ built = true;
+ JmxFeed result = new JmxFeed(this);
+ result.setEntity(checkNotNull(entity, "entity"));
+ result.start();
+ return result;
+ }
+ @Override
+ protected void finalize() {
+ if (!built) log.warn("JmxFeed.Builder created, but build() never called");
+ }
+ }
+
+ private final SetMultimap<ObjectName, NotificationListener> notificationListeners = HashMultimap.create();
+
+ /**
+ * For rebind; do not call directly; use builder
+ */
+ public JmxFeed() {
+ }
+
+ protected JmxFeed(Builder builder) {
+ super();
+ if (builder.helper != null) {
+ JmxHelper helper = builder.helper;
+ setConfig(HELPER, helper);
+ setConfig(OWN_HELPER, false);
+ setConfig(JMX_URI, helper.getUrl());
+ }
+ setConfig(JMX_CONNECTION_TIMEOUT, builder.jmxConnectionTimeout);
+
+ SetMultimap<String, JmxAttributePollConfig<?>> attributePolls = HashMultimap.<String,JmxAttributePollConfig<?>>create();
+ for (JmxAttributePollConfig<?> config : builder.attributePolls) {
+ if (!config.isEnabled()) continue;
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ JmxAttributePollConfig<?> configCopy = new JmxAttributePollConfig(config);
+ if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
+ attributePolls.put(configCopy.getObjectName().getCanonicalName() + configCopy.getAttributeName(), configCopy);
+ }
+ setConfig(ATTRIBUTE_POLLS, attributePolls);
+
+ SetMultimap<List<?>, JmxOperationPollConfig<?>> operationPolls = HashMultimap.<List<?>,JmxOperationPollConfig<?>>create();
+ for (JmxOperationPollConfig<?> config : builder.operationPolls) {
+ if (!config.isEnabled()) continue;
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ JmxOperationPollConfig<?> configCopy = new JmxOperationPollConfig(config);
+ if (configCopy.getPeriod() < 0) configCopy.period(builder.period, builder.periodUnits);
+ operationPolls.put(configCopy.buildOperationIdentity(), configCopy);
+ }
+ setConfig(OPERATION_POLLS, operationPolls);
+
+ SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = HashMultimap.create();
+ for (JmxNotificationSubscriptionConfig<?> config : builder.notificationSubscriptions) {
+ if (!config.isEnabled()) continue;
+ notificationSubscriptions.put(config.getNotificationFilter(), config);
+ }
+ setConfig(NOTIFICATION_SUBSCRIPTIONS, notificationSubscriptions);
+ initUniqueTag(builder.uniqueTag, attributePolls, operationPolls, notificationSubscriptions);
+ }
+
+ @Override
+ public void setEntity(EntityLocal entity) {
+ if (getConfig(HELPER) == null) {
+ JmxHelper helper = new JmxHelper(entity);
+ setConfig(HELPER, helper);
+ setConfig(OWN_HELPER, true);
+ setConfig(JMX_URI, helper.getUrl());
+ }
+ super.setEntity(entity);
+ }
+
+ public String getJmxUri() {
+ return getConfig(JMX_URI);
+ }
+
+ protected JmxHelper getHelper() {
+ return getConfig(HELPER);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Poller<Object> getPoller() {
+ return (Poller<Object>) super.getPoller();
+ }
+
+ @Override
+ protected boolean isConnected() {
+ return super.isConnected() && getHelper().isConnected();
+ }
+
+ @Override
+ protected void preStart() {
+ /*
+ * All actions on the JmxHelper are done async (through the poller's threading) so we don't
+ * block on start for a long time (e.g. if the entity is not contactable and doing a rebind
+ * on restart of brooklyn). Without that, one gets a 120 second pause with it stuck in a
+ * stack trace like:
+ *
+ * at brooklyn.event.feed.jmx.JmxHelper.sleep(JmxHelper.java:640)
+ * at brooklyn.event.feed.jmx.JmxHelper.connect(JmxHelper.java:320)
+ * at brooklyn.event.feed.jmx.JmxFeed.preStart(JmxFeed.java:172)
+ * at brooklyn.event.feed.AbstractFeed.start(AbstractFeed.java:68)
+ * at brooklyn.event.feed.jmx.JmxFeed$Builder.build(JmxFeed.java:119)
+ * at brooklyn.entity.java.JavaAppUtils.connectMXBeanSensors(JavaAppUtils.java:109)
+ * at brooklyn.entity.java.VanillaJavaApp.connectSensors(VanillaJavaApp.java:97)
+ * at brooklyn.entity.basic.SoftwareProcessImpl.callRebindHooks(SoftwareProcessImpl.java:189)
+ * at brooklyn.entity.basic.SoftwareProcessImpl.rebind(SoftwareProcessImpl.java:235)
+ * ...
+ * at brooklyn.entity.rebind.RebindManagerImpl.rebind(RebindManagerImpl.java:184)
+ */
+ final SetMultimap<NotificationFilter, JmxNotificationSubscriptionConfig<?>> notificationSubscriptions = getConfig(NOTIFICATION_SUBSCRIPTIONS);
+ final SetMultimap<List<?>, JmxOperationPollConfig<?>> operationPolls = getConfig(OPERATION_POLLS);
+ final SetMultimap<String, JmxAttributePollConfig<?>> attributePolls = getConfig(ATTRIBUTE_POLLS);
+
+ getPoller().submit(new Callable<Void>() {
+ public Void call() {
+ getHelper().connect(getConfig(JMX_CONNECTION_TIMEOUT));
+ return null;
+ }
+ @Override public String toString() { return "Connect JMX "+getHelper().getUrl(); }
+ });
+
+ for (final NotificationFilter filter : notificationSubscriptions.keySet()) {
+ getPoller().submit(new Callable<Void>() {
+ public Void call() {
+ // TODO Could config.getObjectName have wildcards? Is this code safe?
+ Set<JmxNotificationSubscriptionConfig<?>> configs = notificationSubscriptions.get(filter);
+ NotificationListener listener = registerNotificationListener(configs);
+ ObjectName objectName = Iterables.get(configs, 0).getObjectName();
+ notificationListeners.put(objectName, listener);
+ return null;
+ }
+ @Override public String toString() { return "Register JMX notifications: "+notificationSubscriptions.get(filter); }
+ });
+ }
+
+ // Setup polling of sensors
+ for (final String jmxAttributeName : attributePolls.keys()) {
+ registerAttributePoller(attributePolls.get(jmxAttributeName));
+ }
+
+ // Setup polling of operations
+ for (final List<?> operationIdentifier : operationPolls.keys()) {
+ registerOperationPoller(operationPolls.get(operationIdentifier));
+ }
+ }
+
+ @Override
+ protected void preStop() {
+ super.preStop();
+
+ for (Map.Entry<ObjectName, NotificationListener> entry : notificationListeners.entries()) {
+ unregisterNotificationListener(entry.getKey(), entry.getValue());
+ }
+ notificationListeners.clear();
+ }
+
+ @Override
+ protected void postStop() {
+ super.postStop();
+ JmxHelper helper = getHelper();
+ Boolean ownHelper = getConfig(OWN_HELPER);
+ if (helper != null && ownHelper) helper.terminate();
+ }
+
+ /**
+ * Registers to poll a jmx-operation for an ObjectName, where all the given configs are for the same ObjectName + operation + parameters.
+ */
+ private void registerOperationPoller(Set<JmxOperationPollConfig<?>> configs) {
+ Set<AttributePollHandler<? super Object>> handlers = Sets.newLinkedHashSet();
+ long minPeriod = Integer.MAX_VALUE;
+
+ final ObjectName objectName = Iterables.get(configs, 0).getObjectName();
+ final String operationName = Iterables.get(configs, 0).getOperationName();
+ final List<String> signature = Iterables.get(configs, 0).getSignature();
+ final List<?> params = Iterables.get(configs, 0).getParams();
+
+ for (JmxOperationPollConfig<?> config : configs) {
+ handlers.add(new AttributePollHandler<Object>(config, getEntity(), this));
+ if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
+ }
+
+ getPoller().scheduleAtFixedRate(
+ new Callable<Object>() {
+ public Object call() throws Exception {
+ if (log.isDebugEnabled()) log.debug("jmx operation polling for {} sensors at {} -> {}", new Object[] {getEntity(), getJmxUri(), operationName});
+ if (signature.size() == params.size()) {
+ return getHelper().operation(objectName, operationName, signature, params);
+ } else {
+ return getHelper().operation(objectName, operationName, params.toArray());
+ }
+ }
+ },
+ new DelegatingPollHandler<Object>(handlers), minPeriod);
+ }
+
+ /**
+ * Registers to poll a jmx-attribute for an ObjectName, where all the given configs are for that same ObjectName + attribute.
+ */
+ private void registerAttributePoller(Set<JmxAttributePollConfig<?>> configs) {
+ Set<AttributePollHandler<? super Object>> handlers = Sets.newLinkedHashSet();
+ long minPeriod = Integer.MAX_VALUE;
+
+ final ObjectName objectName = Iterables.get(configs, 0).getObjectName();
+ final String jmxAttributeName = Iterables.get(configs, 0).getAttributeName();
+
+ for (JmxAttributePollConfig<?> config : configs) {
+ handlers.add(new AttributePollHandler<Object>(config, getEntity(), this));
+ if (config.getPeriod() > 0) minPeriod = Math.min(minPeriod, config.getPeriod());
+ }
+
+ // TODO Not good calling this holding the synchronization lock
+ getPoller().scheduleAtFixedRate(
+ new Callable<Object>() {
+ public Object call() throws Exception {
+ if (log.isTraceEnabled()) log.trace("jmx attribute polling for {} sensors at {} -> {}", new Object[] {getEntity(), getJmxUri(), jmxAttributeName});
+ return getHelper().getAttribute(objectName, jmxAttributeName);
+ }
+ },
+ new DelegatingPollHandler<Object>(handlers), minPeriod);
+ }
+
+ /**
+ * Registers to subscribe to notifications for an ObjectName, where all the given configs are for that same ObjectName + filter.
+ */
+ private NotificationListener registerNotificationListener(Set<JmxNotificationSubscriptionConfig<?>> configs) {
+ final List<AttributePollHandler<? super javax.management.Notification>> handlers = Lists.newArrayList();
+
+ final ObjectName objectName = Iterables.get(configs, 0).getObjectName();
+ final NotificationFilter filter = Iterables.get(configs, 0).getNotificationFilter();
+
+ for (final JmxNotificationSubscriptionConfig<?> config : configs) {
+ AttributePollHandler<javax.management.Notification> handler = new AttributePollHandler<javax.management.Notification>(config, getEntity(), this) {
+ @Override protected Object transformValueOnSuccess(javax.management.Notification val) {
+ if (config.getOnNotification() != null) {
+ return config.getOnNotification().apply(val);
+ } else {
+ Object result = super.transformValueOnSuccess(val);
+ if (result instanceof javax.management.Notification)
+ return ((javax.management.Notification)result).getUserData();
+ return result;
+ }
+ }
+ };
+ handlers.add(handler);
+ }
+ final PollHandler<javax.management.Notification> compoundHandler = new DelegatingPollHandler<javax.management.Notification>(handlers);
+
+ NotificationListener listener = new NotificationListener() {
+ @Override public void handleNotification(Notification notification, Object handback) {
+ compoundHandler.onSuccess(notification);
+ }
+ };
+ getHelper().addNotificationListener(objectName, listener, filter);
+
+ return listener;
+ }
+
+ private void unregisterNotificationListener(ObjectName objectName, NotificationListener listener) {
+ try {
+ getHelper().removeNotificationListener(objectName, listener);
+ } catch (RuntimeException e) {
+ log.warn("Failed to unregister listener: "+objectName+", "+listener+"; continuing...", e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JmxFeed["+(getManagementContext()!=null&&getManagementContext().isRunning()?getJmxUri():"mgmt-not-running")+"]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxHelper.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxHelper.java b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxHelper.java
new file mode 100644
index 0000000..9599be1
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxHelper.java
@@ -0,0 +1,724 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.sensor.feed.jmx;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.brooklyn.util.JavaGroovyEquivalents.groovyTruth;
+import groovy.time.TimeDuration;
+
+import java.io.IOException;
+import java.security.KeyStore;
+import java.security.PrivateKey;
+import java.security.cert.Certificate;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.management.AttributeNotFoundException;
+import javax.management.InstanceAlreadyExistsException;
+import javax.management.InstanceNotFoundException;
+import javax.management.InvalidAttributeValueException;
+import javax.management.JMX;
+import javax.management.ListenerNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.NotificationFilter;
+import javax.management.NotificationListener;
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+
+import org.apache.brooklyn.api.internal.EntityLocal;
+import org.apache.brooklyn.entity.java.JmxSupport;
+import org.apache.brooklyn.entity.java.UsesJmx;
+import org.apache.brooklyn.util.core.crypto.SecureKeys;
+import org.apache.brooklyn.util.crypto.SslTrustUtils;
+import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
+import org.apache.brooklyn.util.jmx.jmxmp.JmxmpAgent;
+import org.apache.brooklyn.util.repeat.Repeater;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+public class JmxHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmxHelper.class);
+
+ public static final String JMX_URL_FORMAT = "service:jmx:rmi:///jndi/rmi://%s:%d/%s";
+ // first host:port may be ignored, so above is sufficient, but not sure
+ public static final String RMI_JMX_URL_FORMAT = "service:jmx:rmi://%s:%d/jndi/rmi://%s:%d/%s";
+ // jmxmp
+ public static final String JMXMP_URL_FORMAT = "service:jmx:jmxmp://%s:%d";
+
+ // Tracks the MBeans we have failed to find, with a set keyed off the url
+ private static final Map<String, Set<ObjectName>> notFoundMBeansByUrl = Collections.synchronizedMap(new WeakHashMap<String, Set<ObjectName>>());
+
+ public static final Map<String, String> CLASSES = ImmutableMap.<String,String>builder()
+ .put("Integer", Integer.TYPE.getName())
+ .put("Long", Long.TYPE.getName())
+ .put("Boolean", Boolean.TYPE.getName())
+ .put("Byte", Byte.TYPE.getName())
+ .put("Character", Character.TYPE.getName())
+ .put("Double", Double.TYPE.getName())
+ .put("Float", Float.TYPE.getName())
+ .put("GStringImpl", String.class.getName())
+ .put("LinkedHashMap", Map.class.getName())
+ .put("TreeMap", Map.class.getName())
+ .put("HashMap", Map.class.getName())
+ .put("ConcurrentHashMap", Map.class.getName())
+ .put("TabularDataSupport", TabularData.class.getName())
+ .put("CompositeDataSupport", CompositeData.class.getName())
+ .build();
+
+ /** constructs a JMX URL suitable for connecting to the given entity, being smart about JMX/RMI vs JMXMP */
+ public static String toJmxUrl(EntityLocal entity) {
+ String url = entity.getAttribute(UsesJmx.JMX_URL);
+ if (url != null) {
+ return url;
+ } else {
+ new JmxSupport(entity, null).setJmxUrl();
+ url = entity.getAttribute(UsesJmx.JMX_URL);
+ return Preconditions.checkNotNull(url, "Could not find URL for "+entity);
+ }
+ }
+
+ /** constructs an RMI/JMX URL with the given inputs
+ * (where the RMI Registry Port should be non-null, and at least one must be non-null) */
+ public static String toRmiJmxUrl(String host, Integer jmxRmiServerPort, Integer rmiRegistryPort, String context) {
+ if (rmiRegistryPort != null && rmiRegistryPort > 0) {
+ if (jmxRmiServerPort!=null && jmxRmiServerPort > 0 && jmxRmiServerPort!=rmiRegistryPort) {
+ // we have an explicit known JMX RMI server port (e.g. because we are using the agent),
+ // distinct from the RMI registry port
+ // (if the ports are the same, it is a short-hand, and don't use this syntax!)
+ return String.format(RMI_JMX_URL_FORMAT, host, jmxRmiServerPort, host, rmiRegistryPort, context);
+ }
+ return String.format(JMX_URL_FORMAT, host, rmiRegistryPort, context);
+ } else if (jmxRmiServerPort!=null && jmxRmiServerPort > 0) {
+ LOG.warn("No RMI registry port set for "+host+"; attempting to use JMX port for RMI lookup");
+ return String.format(JMX_URL_FORMAT, host, jmxRmiServerPort, context);
+ } else {
+ LOG.warn("No RMI/JMX details set for "+host+"; returning null");
+ return null;
+ }
+ }
+
+ /** constructs a JMXMP URL for connecting to the given host and port */
+ public static String toJmxmpUrl(String host, Integer jmxmpPort) {
+ return "service:jmx:jmxmp://"+host+(jmxmpPort!=null ? ":"+jmxmpPort : "");
+ }
+
+ final EntityLocal entity;
+ final String url;
+ final String user;
+ final String password;
+
+ private volatile transient JMXConnector connector;
+ private volatile transient MBeanServerConnection connection;
+ private transient boolean triedConnecting;
+ private transient boolean failedReconnecting;
+ private transient long failedReconnectingTime;
+ private int minTimeBetweenReconnectAttempts = 1000;
+ private final AtomicBoolean terminated = new AtomicBoolean();
+
+ // Tracks the MBeans we have failed to find for this JmsHelper's connection URL (so can log just once for each)
+ private final Set<ObjectName> notFoundMBeans;
+
+ public JmxHelper(EntityLocal entity) {
+ this(toJmxUrl(entity), entity, entity.getAttribute(UsesJmx.JMX_USER), entity.getAttribute(UsesJmx.JMX_PASSWORD));
+
+ if (entity.getAttribute(UsesJmx.JMX_URL) == null) {
+ entity.setAttribute(UsesJmx.JMX_URL, url);
+ }
+ }
+
+ // TODO split this in to two classes, one for entities, and one entity-neutral
+ // (simplifying set of constructors below)
+
+ public JmxHelper(String url) {
+ this(url, null, null);
+ }
+
+ public JmxHelper(String url, String user, String password) {
+ this(url, null, user, password);
+ }
+
+ public JmxHelper(String url, EntityLocal entity, String user, String password) {
+ this.url = url;
+ this.entity = entity;
+ this.user = user;
+ this.password = password;
+
+ synchronized (notFoundMBeansByUrl) {
+ Set<ObjectName> set = notFoundMBeansByUrl.get(url);
+ if (set == null) {
+ set = Collections.synchronizedSet(Collections.newSetFromMap(new WeakHashMap<ObjectName, Boolean>()));
+ notFoundMBeansByUrl.put(url, set);
+ }
+ notFoundMBeans = set;
+ }
+ }
+
+ public void setMinTimeBetweenReconnectAttempts(int val) {
+ minTimeBetweenReconnectAttempts = val;
+ }
+
+ public String getUrl(){
+ return url;
+ }
+
+ // ============== connection related calls =======================
+
+ //for tesing purposes
+ protected MBeanServerConnection getConnection() {
+ return connection;
+ }
+
+ /**
+ * Checks if the JmxHelper is connected. Returned value could be stale as soon
+ * as it is received.
+ *
+ * This method is thread safe.
+ *
+ * @return true if connected, false otherwise.
+ */
+ public boolean isConnected() {
+ return connection!=null;
+ }
+
+ /**
+ * Reconnects. If it already is connected, it disconnects first.
+ *
+ * @throws IOException
+ */
+ public synchronized void reconnectWithRetryDampened() throws IOException {
+ // If we've already tried reconnecting very recently, don't try again immediately
+ if (failedReconnecting) {
+ long timeSince = (System.currentTimeMillis() - failedReconnectingTime);
+ if (timeSince < minTimeBetweenReconnectAttempts) {
+ String msg = "Not reconnecting to JMX at "+url+" because attempt failed "+Time.makeTimeStringRounded(timeSince)+" ago";
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ reconnect();
+ }
+
+ public synchronized void reconnect() throws IOException {
+ disconnect();
+
+ try {
+ connect();
+ failedReconnecting = false;
+ } catch (Exception e) {
+ if (failedReconnecting) {
+ if (LOG.isDebugEnabled()) LOG.debug("unable to re-connect to JMX url (repeated failure): {}: {}", url, e);
+ } else {
+ LOG.debug("unable to re-connect to JMX url {} (rethrowing): {}", url, e);
+ failedReconnecting = true;
+ }
+ failedReconnectingTime = System.currentTimeMillis();
+ throw Throwables.propagate(e);
+ }
+ }
+
+ /** attempts to connect immediately */
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public synchronized void connect() throws IOException {
+ if (terminated.get()) throw new IllegalStateException("JMX Helper "+this+" already terminated");
+ if (connection != null) return;
+
+ triedConnecting = true;
+ if (connector != null) connector.close();
+ JMXServiceURL serviceUrl = new JMXServiceURL(url);
+ Map env = getConnectionEnvVars();
+ try {
+ connector = JMXConnectorFactory.connect(serviceUrl, env);
+ } catch (NullPointerException npe) {
+ //some software -- eg WSO2 -- will throw an NPE exception if the JMX connection can't be created, instead of an IOException.
+ //this is a break of contract with the JMXConnectorFactory.connect method, so this code verifies if the NPE is
+ //thrown by a known offender (wso2) and if so replaces the bad exception by a new IOException.
+ //ideally WSO2 will fix this bug and we can remove this code.
+ boolean thrownByWso2 = npe.getStackTrace()[0].toString().contains("org.wso2.carbon.core.security.CarbonJMXAuthenticator.authenticate");
+ if (thrownByWso2) {
+ throw new IOException("Failed to connect to url "+url+". NullPointerException is thrown, but replaced by an IOException to fix a WSO2 JMX problem", npe);
+ } else {
+ throw npe;
+ }
+ } catch (IOException e) {
+ Exceptions.propagateIfFatal(e);
+ if (terminated.get()) {
+ throw new IllegalStateException("JMX Helper "+this+" already terminated", e);
+ } else {
+ throw e;
+ }
+ }
+ connection = connector.getMBeanServerConnection();
+
+ if (terminated.get()) {
+ disconnectNow();
+ throw new IllegalStateException("JMX Helper "+this+" already terminated");
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ public Map getConnectionEnvVars() {
+ Map env = new LinkedHashMap();
+
+ if (groovyTruth(user) && groovyTruth(password)) {
+ String[] creds = new String[] {user, password};
+ env.put(JMXConnector.CREDENTIALS, creds);
+ }
+
+ if (entity!=null && groovyTruth(entity.getConfig(UsesJmx.JMX_SSL_ENABLED))) {
+ env.put("jmx.remote.profiles", JmxmpAgent.TLS_JMX_REMOTE_PROFILES);
+
+ PrivateKey key = entity.getConfig(UsesJmx.JMX_SSL_ACCESS_KEY);
+ Certificate cert = entity.getConfig(UsesJmx.JMX_SSL_ACCESS_CERT);
+ KeyStore ks = SecureKeys.newKeyStore();
+ try {
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ if (key!=null) {
+ ks.setKeyEntry("brooklyn-jmx-access", key, "".toCharArray(), new Certificate[] { cert });
+ }
+ kmf.init(ks, "".toCharArray());
+
+ TrustManager tms =
+ // TODO use root cert for trusting server
+ //trustStore!=null ? SecureKeys.getTrustManager(trustStore) :
+ SslTrustUtils.TRUST_ALL;
+
+ SSLContext ctx = SSLContext.getInstance("TLSv1");
+ ctx.init(kmf.getKeyManagers(), new TrustManager[] { tms }, null);
+ SSLSocketFactory ssf = ctx.getSocketFactory();
+ env.put(JmxmpAgent.TLS_SOCKET_FACTORY_PROPERTY, ssf);
+
+ } catch (Exception e) {
+ LOG.warn("Error setting key "+key+" for "+entity+": "+e, e);
+ }
+ }
+
+ return env;
+ }
+
+ /**
+ * Continuously attempts to connect for at least the indicated amount of time; or indefinitely if -1. This method
+ * is useful when you are not sure if the system you are trying to connect to already is up and running.
+ *
+ * This method doesn't throw an Exception, but returns true on success, false otherwise.
+ *
+ * TODO: What happens if already connected?
+ *
+ * @param timeoutMs
+ * @return
+ */
+ public boolean connect(long timeoutMs) {
+ if (LOG.isDebugEnabled()) LOG.debug("Connecting to JMX URL: {} ({})", url, ((timeoutMs == -1) ? "indefinitely" : timeoutMs+"ms timeout"));
+ long startMs = System.currentTimeMillis();
+ long endMs = (timeoutMs == -1) ? Long.MAX_VALUE : (startMs + timeoutMs);
+ long currentTime = startMs;
+ Throwable lastError = null;
+ int attempt = 0;
+ while (currentTime <= endMs) {
+ currentTime = System.currentTimeMillis();
+ if (attempt != 0) sleep(100); //sleep 100 to prevent thrashing and facilitate interruption
+ if (LOG.isTraceEnabled()) LOG.trace("trying connection to {} at time {}", url, currentTime);
+
+ try {
+ connect();
+ return true;
+ } catch (Exception e) {
+ Exceptions.propagateIfFatal(e);
+ if (!terminated.get() && shouldRetryOn(e)) {
+ if (LOG.isDebugEnabled()) LOG.debug("Attempt {} failed connecting to {} ({})", new Object[] {attempt + 1, url, e.getMessage()});
+ lastError = e;
+ } else {
+ throw Exceptions.propagate(e);
+ }
+ }
+ attempt++;
+ }
+ LOG.warn("unable to connect to JMX url: "+url, lastError);
+ return false;
+ }
+
+ private boolean shouldRetryOn(Exception e) {
+ // Expect SecurityException, IOException, etc.
+ // But can also see things like javax.naming.ServiceUnavailableException with WSO2 app-servers.
+ // So let's not try to second guess strange behaviours that future entities will exhibit.
+ //
+ // However, if it was our request that was invalid then not worth retrying.
+
+ if (e instanceof AttributeNotFoundException) return false;
+ if (e instanceof InstanceAlreadyExistsException) return false;
+ if (e instanceof InstanceNotFoundException) return false;
+ if (e instanceof InvalidAttributeValueException) return false;
+ if (e instanceof ListenerNotFoundException) return false;
+ if (e instanceof MalformedObjectNameException) return false;
+ if (e instanceof NotCompliantMBeanException) return false;
+ if (e instanceof InterruptedException) return false;
+ if (e instanceof RuntimeInterruptedException) return false;
+
+ return true;
+ }
+
+ /**
+ * A thread-safe version of {@link #disconnectNow()}.
+ *
+ * This method is threadsafe.
+ */
+ public synchronized void disconnect() {
+ disconnectNow();
+ }
+
+ /**
+ * Disconnects, preventing subsequent connections to be made. Method doesn't throw an exception.
+ *
+ * Can safely be called if already disconnected.
+ *
+ * This method is not threadsafe, but will thus not block if
+ * another thread is taking a long time for connections to timeout.
+ *
+ * Any concurrent requests will likely get an IOException - see
+ * {@linkplain http://docs.oracle.com/javase/7/docs/api/javax/management/remote/JMXConnector.html#close()}.
+ *
+ */
+ public void terminate() {
+ terminated.set(true);
+ disconnectNow();
+ }
+
+ protected void disconnectNow() {
+ triedConnecting = false;
+ if (connector != null) {
+ if (LOG.isDebugEnabled()) LOG.debug("Disconnecting from JMX URL {}", url);
+ try {
+ connector.close();
+ } catch (Exception e) {
+ // close attempts to connect to close cleanly; and if it can't, it throws;
+ // often we disconnect as part of shutdown, even if the other side has already stopped --
+ // so swallow exceptions (no situations known where we need a clean closure on the remote side)
+ if (LOG.isDebugEnabled()) LOG.debug("Caught exception disconnecting from JMX at {} ({})", url, e.getMessage());
+ if (LOG.isTraceEnabled()) LOG.trace("Details for exception disconnecting JMX", e);
+ } finally {
+ connector = null;
+ connection = null;
+ }
+ }
+ }
+
+ /**
+ * Gets a usable MBeanServerConnection.
+ *
+ * Method is threadsafe.
+ *
+ * @returns the MBeanServerConnection
+ * @throws IllegalStateException if not connected.
+ */
+ private synchronized MBeanServerConnection getConnectionOrFail() {
+ if (isConnected())
+ return getConnection();
+
+ if (triedConnecting) {
+ throw new IllegalStateException("Failed to connect to JMX at "+url);
+ } else {
+ String msg = "Not connected (and not attempted to connect) to JMX at "+url+
+ (failedReconnecting ? (" (last reconnect failure at "+ Time.makeDateString(failedReconnectingTime) + ")") : "");
+ throw new IllegalStateException(msg);
+ }
+ }
+
+ private <T> T invokeWithReconnect(Callable<T> task) {
+ try {
+ return task.call();
+ } catch (Exception e) {
+ if (shouldRetryOn(e)) {
+ try {
+ reconnectWithRetryDampened();
+ return task.call();
+ } catch (Exception e2) {
+ throw Throwables.propagate(e2);
+ }
+ } else {
+ throw Throwables.propagate(e);
+ }
+ }
+ }
+
+ // ====================== query related calls =======================================
+
+ /**
+ * Converts from an object name pattern to a real object name, by querying with findMBean;
+ * if no matching MBean can be found (or if more than one match found) then returns null.
+ * If the supplied object name is not a pattern then just returns that. If the
+ */
+ public ObjectName toLiteralObjectName(ObjectName objectName) {
+ if (checkNotNull(objectName, "objectName").isPattern()) {
+ ObjectInstance bean = findMBean(objectName);
+ return (bean != null) ? bean.getObjectName() : null;
+ } else {
+ return objectName;
+ }
+ }
+
+ public Set<ObjectInstance> findMBeans(final ObjectName objectName) {
+ return invokeWithReconnect(new Callable<Set<ObjectInstance>>() {
+ public Set<ObjectInstance> call() throws Exception {
+ return getConnectionOrFail().queryMBeans(objectName, null);
+ }});
+ }
+
+ public ObjectInstance findMBean(ObjectName objectName) {
+ Set<ObjectInstance> beans = findMBeans(objectName);
+ if (beans.size() == 1) {
+ notFoundMBeans.remove(objectName);
+ return Iterables.getOnlyElement(beans);
+ } else {
+ boolean changed = notFoundMBeans.add(objectName);
+
+ if (beans.size() > 1) {
+ if (changed) {
+ LOG.warn("JMX object name query returned {} values for {} at {}; ignoring all",
+ new Object[] {beans.size(), objectName.getCanonicalName(), url});
+ } else {
+ if (LOG.isDebugEnabled()) LOG.debug("JMX object name query returned {} values for {} at {} (repeating); ignoring all",
+ new Object[] {beans.size(), objectName.getCanonicalName(), url});
+ }
+ } else {
+ if (changed) {
+ LOG.warn("JMX object {} not found at {}", objectName.getCanonicalName(), url);
+ } else {
+ if (LOG.isDebugEnabled()) LOG.debug("JMX object {} not found at {} (repeating)", objectName.getCanonicalName(), url);
+ }
+ }
+ return null;
+ }
+ }
+
+ public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, Duration timeout) {
+ return doesMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+ public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, TimeDuration timeout) {
+ return doesMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
+ public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, long timeoutMillis) {
+ return doesMBeanExistsEventually(objectName, timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ public Set<ObjectInstance> doesMBeanExistsEventually(String objectName, Duration timeout) {
+ return doesMBeanExistsEventually(createObjectName(objectName), timeout);
+ }
+ public Set<ObjectInstance> doesMBeanExistsEventually(String objectName, TimeDuration timeout) {
+ return doesMBeanExistsEventually(createObjectName(objectName), timeout);
+ }
+
+ public Set<ObjectInstance> doesMBeanExistsEventually(String objectName, long timeout, TimeUnit timeUnit) {
+ return doesMBeanExistsEventually(createObjectName(objectName), timeout, timeUnit);
+ }
+
+ /** returns set of beans found, with retry, empty set if none after timeout */
+ public Set<ObjectInstance> doesMBeanExistsEventually(final ObjectName objectName, long timeout, TimeUnit timeUnit) {
+ final long timeoutMillis = timeUnit.toMillis(timeout);
+ final AtomicReference<Set<ObjectInstance>> beans = new AtomicReference<Set<ObjectInstance>>(ImmutableSet.<ObjectInstance>of());
+ try {
+ Repeater.create("Wait for "+objectName)
+ .limitTimeTo(timeout, timeUnit)
+ .every(500, TimeUnit.MILLISECONDS)
+ .until(new Callable<Boolean>() {
+ public Boolean call() {
+ connect(timeoutMillis);
+ beans.set(findMBeans(objectName));
+ return !beans.get().isEmpty();
+ }})
+ .rethrowException()
+ .run();
+ return beans.get();
+ } catch (Exception e) {
+ throw Exceptions.propagate(e);
+ }
+ }
+
+ public void assertMBeanExistsEventually(ObjectName objectName, Duration timeout) {
+ assertMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+ public void assertMBeanExistsEventually(ObjectName objectName, TimeDuration timeout) {
+ assertMBeanExistsEventually(objectName, timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+ }
+
+ public void assertMBeanExistsEventually(ObjectName objectName, long timeoutMillis) {
+ assertMBeanExistsEventually(objectName, timeoutMillis, TimeUnit.MILLISECONDS);
+ }
+
+ public void assertMBeanExistsEventually(ObjectName objectName, long timeout, TimeUnit timeUnit) {
+ Set<ObjectInstance> beans = doesMBeanExistsEventually(objectName, timeout, timeUnit);
+ if (beans.size() != 1) {
+ throw new IllegalStateException("MBean "+objectName+" not found within "+timeout+
+ (beans.size() > 1 ? "; found multiple matches: "+beans : ""));
+ }
+ }
+
+ /**
+ * Returns a specific attribute for a JMX {@link ObjectName}.
+ */
+ public Object getAttribute(ObjectName objectName, final String attribute) {
+ final ObjectName realObjectName = toLiteralObjectName(objectName);
+
+ if (realObjectName != null) {
+ Object result = invokeWithReconnect(new Callable<Object>() {
+ public Object call() throws Exception {
+ return getConnectionOrFail().getAttribute(realObjectName, attribute);
+ }});
+
+ if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx attribute {}.{}, got value {}", new Object[] {url, objectName.getCanonicalName(), attribute, result});
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ public void setAttribute(String objectName, String attribute, Object val) {
+ setAttribute(createObjectName(objectName), attribute, val);
+ }
+
+ public void setAttribute(ObjectName objectName, final String attribute, final Object val) {
+ final ObjectName realObjectName = toLiteralObjectName(objectName);
+
+ if (realObjectName != null) {
+ invokeWithReconnect(new Callable<Void>() {
+ public Void call() throws Exception {
+ getConnectionOrFail().setAttribute(realObjectName, new javax.management.Attribute(attribute, val));
+ return null;
+ }});
+ if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx attribute {}.{}, set value {}", new Object[] {url, objectName.getCanonicalName(), attribute, val});
+ } else {
+ if (LOG.isDebugEnabled()) LOG.debug("From {}, cannot set attribute {}.{}, because mbean not found", new Object[] {url, objectName.getCanonicalName(), attribute});
+ }
+ }
+
+ /** @see #operation(ObjectName, String, Object ...) */
+ public Object operation(String objectName, String method, Object... arguments) {
+ return operation(createObjectName(objectName), method, arguments);
+ }
+
+ /**
+ * Executes an operation on a JMX {@link ObjectName}.
+ */
+ public Object operation(ObjectName objectName, final String method, final Object... arguments) {
+ final ObjectName realObjectName = toLiteralObjectName(objectName);
+ final String[] signature = new String[arguments.length];
+ for (int i = 0; i < arguments.length; i++) {
+ Class<?> clazz = arguments[i].getClass();
+ signature[i] = (CLASSES.containsKey(clazz.getSimpleName()) ? CLASSES.get(clazz.getSimpleName()) : clazz.getName());
+ }
+
+ Object result = invokeWithReconnect(new Callable<Object>() {
+ public Object call() throws Exception {
+ return getConnectionOrFail().invoke(realObjectName, method, arguments, signature);
+ }});
+
+ if (LOG.isTraceEnabled()) LOG.trace("From {}, for jmx operation {}.{}({}), got value {}", new Object[] {url, realObjectName.getCanonicalName(), method, Arrays.asList(arguments),
+ result});
+ return result;
+ }
+
+ public void addNotificationListener(String objectName, NotificationListener listener) {
+ addNotificationListener(createObjectName(objectName), listener, null);
+ }
+
+ public void addNotificationListener(String objectName, NotificationListener listener, NotificationFilter filter) {
+ addNotificationListener(createObjectName(objectName), listener, filter);
+ }
+
+ public void addNotificationListener(ObjectName objectName, NotificationListener listener) {
+ addNotificationListener(objectName, listener, null);
+ }
+
+ public void addNotificationListener(final ObjectName objectName, final NotificationListener listener, final NotificationFilter filter) {
+ invokeWithReconnect(new Callable<Void>() {
+ public Void call() throws Exception {
+ getConnectionOrFail().addNotificationListener(objectName, listener, filter, null);
+ return null;
+ }});
+ }
+
+ public void removeNotificationListener(String objectName, NotificationListener listener) {
+ removeNotificationListener(createObjectName(objectName), listener);
+ }
+
+ public void removeNotificationListener(final ObjectName objectName, final NotificationListener listener) {
+ removeNotificationListener(objectName, listener, null);
+ }
+
+ public void removeNotificationListener(final ObjectName objectName, final NotificationListener listener, final NotificationFilter filter) {
+ if (isConnected()) invokeWithReconnect(new Callable<Void>() {
+ public Void call() throws Exception {
+ getConnectionOrFail().removeNotificationListener(objectName, listener, filter, null);
+ return null;
+ }});
+ }
+
+ public <M> M getProxyObject(String objectName, Class<M> mbeanInterface) {
+ return getProxyObject(createObjectName(objectName), mbeanInterface);
+ }
+
+ public <M> M getProxyObject(ObjectName objectName, Class<M> mbeanInterface) {
+ MBeanServerConnection connection = getConnectionOrFail();
+ return JMX.newMBeanProxy(connection, objectName, mbeanInterface, false);
+ }
+
+ public static ObjectName createObjectName(String name) {
+ try {
+ return new ObjectName(name);
+ } catch (MalformedObjectNameException e) {
+ throw Throwables.propagate(e);
+ }
+ }
+
+ private static void sleep(long sleepTimeMillis) {
+ try {
+ Thread.sleep(sleepTimeMillis);
+ } catch (InterruptedException e) {
+ throw new RuntimeInterruptedException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxNotificationFilters.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxNotificationFilters.java b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxNotificationFilters.java
new file mode 100644
index 0000000..182866a
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxNotificationFilters.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.sensor.feed.jmx;
+
+import javax.management.Notification;
+import javax.management.NotificationFilter;
+import javax.management.NotificationFilterSupport;
+
+public class JmxNotificationFilters {
+
+ private JmxNotificationFilters() {} // instead use static utility methods
+
+ /**
+ * Matches the given notification type.
+ * @see {@link NotificationFilterSupport#enableType(String)}
+ */
+ public static NotificationFilter matchesType(String type) {
+ return matchesTypes(type);
+ }
+
+ /**
+ * Matches any of the given notification types.
+ * @see {@link NotificationFilterSupport#enableType(String)}
+ */
+ public static NotificationFilter matchesTypes(String... types) {
+ NotificationFilterSupport result = new NotificationFilterSupport();
+ for (String type : types) {
+ result.enableType(type);
+ }
+ return result;
+ }
+
+ /**
+ * @deprecated since 0.6.0;
+ * only works if this brooklyn class is on the classpath of the JVM that your
+ * subscribing to notifications on (because it tries to push the filter instance
+ * to that JVM). So of very limited use in real-world java processes to be managed.
+ * Therefore this will be deleted to avoid people hitting this surprising behaviour.
+ */
+ @SuppressWarnings("serial")
+ public static NotificationFilter matchesTypeRegex(final String typeRegex) {
+ return new NotificationFilter() {
+ @Override public boolean isNotificationEnabled(Notification notif) {
+ return notif.getType().matches(typeRegex);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxNotificationSubscriptionConfig.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxNotificationSubscriptionConfig.java b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxNotificationSubscriptionConfig.java
new file mode 100644
index 0000000..2a99c62
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxNotificationSubscriptionConfig.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.sensor.feed.jmx;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.Notification;
+import javax.management.NotificationFilter;
+import javax.management.ObjectName;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.sensor.feed.FeedConfig;
+import org.apache.brooklyn.util.collections.MutableList;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+
+public class JmxNotificationSubscriptionConfig<T> extends FeedConfig<javax.management.Notification, T, JmxNotificationSubscriptionConfig<T>>{
+
+ private ObjectName objectName;
+ private NotificationFilter notificationFilter;
+ private Function<Notification, T> onNotification;
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public JmxNotificationSubscriptionConfig(AttributeSensor<T> sensor) {
+ super(sensor);
+ onSuccess((Function)Functions.identity());
+ }
+
+ public JmxNotificationSubscriptionConfig(JmxNotificationSubscriptionConfig<T> other) {
+ super(other);
+ this.objectName = other.objectName;
+ this.notificationFilter = other.notificationFilter;
+ this.onNotification = other.onNotification;
+ }
+
+ public ObjectName getObjectName() {
+ return objectName;
+ }
+
+ public NotificationFilter getNotificationFilter() {
+ return notificationFilter;
+ }
+
+ public Function<Notification, T> getOnNotification() {
+ return onNotification;
+ }
+
+ public JmxNotificationSubscriptionConfig<T> objectName(ObjectName val) {
+ this.objectName = val; return this;
+ }
+
+ public JmxNotificationSubscriptionConfig<T> objectName(String val) {
+ try {
+ return objectName(new ObjectName(val));
+ } catch (MalformedObjectNameException e) {
+ throw new IllegalArgumentException("Invalid object name ("+val+")", e);
+ }
+ }
+
+ public JmxNotificationSubscriptionConfig<T> notificationFilter(NotificationFilter val) {
+ this.notificationFilter = val; return this;
+ }
+
+ public JmxNotificationSubscriptionConfig<T> onNotification(Function<Notification,T> val) {
+ this.onNotification = val; return this;
+ }
+
+ @Override
+ protected Object toStringPollSource() {
+ return objectName;
+ }
+
+ @Override
+ protected MutableList<Object> equalsFields() {
+ return super.equalsFields()
+ .appendIfNotNull(notificationFilter).appendIfNotNull(onNotification);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxOperationPollConfig.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxOperationPollConfig.java b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxOperationPollConfig.java
new file mode 100644
index 0000000..b72a4ee
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxOperationPollConfig.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.sensor.feed.jmx;
+
+import java.util.Collections;
+import java.util.List;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.brooklyn.api.sensor.AttributeSensor;
+import org.apache.brooklyn.sensor.feed.PollConfig;
+
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+public class JmxOperationPollConfig<T> extends PollConfig<Object, T, JmxOperationPollConfig<T>>{
+
+ private ObjectName objectName;
+ private String operationName;
+ private List<String> signature = Collections.emptyList();
+ private List<?> params = Collections.emptyList();
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public JmxOperationPollConfig(AttributeSensor<T> sensor) {
+ super(sensor);
+ onSuccess((Function)Functions.identity());
+ }
+
+ public JmxOperationPollConfig(JmxOperationPollConfig<T> other) {
+ super(other);
+ this.objectName = other.objectName;
+ this.operationName = other.operationName;
+ this.signature = other.signature != null ? ImmutableList.copyOf(other.signature) : null;
+ this.params = other.params != null ? ImmutableList.copyOf(other.params) : null;
+ }
+
+ public ObjectName getObjectName() {
+ return objectName;
+ }
+
+ public String getOperationName() {
+ return operationName;
+ }
+
+ public List<String> getSignature() {
+ return signature;
+ }
+
+ public List<?> getParams() {
+ return params;
+ }
+
+ public JmxOperationPollConfig<T> objectName(ObjectName val) {
+ this.objectName = val; return this;
+ }
+
+ public JmxOperationPollConfig<T> objectName(String val) {
+ try {
+ return objectName(new ObjectName(val));
+ } catch (MalformedObjectNameException e) {
+ throw new IllegalArgumentException("Invalid object name ("+val+")", e);
+ }
+ }
+
+ public JmxOperationPollConfig<T> operationName(String val) {
+ this.operationName = val; return this;
+ }
+
+ public JmxOperationPollConfig<T> operationSignature(List<String> val) {
+ this.signature = val; return this;
+ }
+
+ public JmxOperationPollConfig<T> operationParams(List<?> val) {
+ this.params = val; return this;
+ }
+
+ public List<?> buildOperationIdentity() {
+ // FIXME Have a build() method for ensuring signature is set, and making class subsequently immutable?
+ return ImmutableList.of(operationName, buildSignature(), params);
+ }
+
+ private List<String> buildSignature() {
+ if (signature != null && signature.size() == params.size()) {
+ return signature;
+ } else {
+ List<String> derivedSignature = Lists.newLinkedList();
+ for (Object param : params) {
+ Class<?> clazz = (param != null) ? param.getClass() : null;
+ String clazzName = (clazz != null) ?
+ (JmxHelper.CLASSES.containsKey(clazz.getSimpleName()) ?
+ JmxHelper.CLASSES.get(clazz.getSimpleName()) : clazz.getName()) :
+ Object.class.getName();
+ derivedSignature.add(clazzName);
+ }
+ return derivedSignature;
+ }
+ }
+
+ @Override protected String toStringBaseName() { return "jmx"; }
+ @Override protected String toStringPollSource() { return objectName+":"+operationName+(params!=null ? params : "[]"); }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/64c2b2e5/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxValueFunctions.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxValueFunctions.java b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxValueFunctions.java
new file mode 100644
index 0000000..d0b5e21
--- /dev/null
+++ b/software/base/src/main/java/org/apache/brooklyn/sensor/feed/jmx/JmxValueFunctions.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.sensor.feed.jmx;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+
+public class JmxValueFunctions {
+
+ private static final Logger log = LoggerFactory.getLogger(JmxValueFunctions.class);
+
+ /**
+ * @return a closure that converts a TabularDataSupport to a map.
+ */
+ public static Function<TabularData, Map> tabularDataToMap() {
+ return new Function<TabularData, Map>() {
+ @Override public Map apply(TabularData input) {
+ return tabularDataToMap(input);
+ }};
+ }
+
+ public static Function<TabularData, Map> tabularDataToMapOfMaps() {
+ return new Function<TabularData, Map>() {
+ @Override public Map apply(TabularData input) {
+ return tabularDataToMapOfMaps(input);
+ }};
+ }
+
+ public static Function<CompositeData,Map> compositeDataToMap() {
+ return new Function<CompositeData, Map>() {
+ @Override public Map apply(CompositeData input) {
+ return compositeDataToMap(input);
+ }};
+ }
+
+ public static Map tabularDataToMap(TabularData table) {
+ Map<String, Object> result = Maps.newLinkedHashMap();
+ for (Object entry : table.values()) {
+ CompositeData data = (CompositeData) entry; //.getValue()
+ for (String key : data.getCompositeType().keySet()) {
+ Object old = result.put(key, data.get(key));
+ if (old != null) {
+ log.warn("tablularDataToMap has overwritten key {}", key);
+ }
+ }
+ }
+ return result;
+ }
+
+ public static Map<List<?>, Map<String, Object>> tabularDataToMapOfMaps(TabularData table) {
+ Map<List<?>, Map<String, Object>> result = Maps.newLinkedHashMap();
+ for (Object k : table.keySet()) {
+ final Object[] kValues = ((List<?>)k).toArray();
+ CompositeData v = (CompositeData) table.get(kValues);
+ result.put((List<?>)k, compositeDataToMap(v));
+ }
+ return result;
+ }
+
+ public static Map<String, Object> compositeDataToMap(CompositeData data) {
+ Map<String, Object> result = Maps.newLinkedHashMap();
+ for (String key : data.getCompositeType().keySet()) {
+ Object old = result.put(key, data.get(key));
+ if (old != null) {
+ log.warn("compositeDataToMap has overwritten key {}", key);
+ }
+ }
+ return result;
+ }
+}