You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/08/28 14:08:51 UTC
[1/2] https://issues.apache.org/jira/browse/AMQ-5305 - modify
element with runtime configuration plugin
Repository: activemq
Updated Branches:
refs/heads/trunk b76d8318d -> bbc039fce
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java
index b764495..c815d31 100644
--- a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/RuntimeConfigurationBroker.java
@@ -16,95 +16,46 @@
*/
package org.apache.activemq.plugin;
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.net.MalformedURLException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import javax.management.JMException;
-import javax.management.ObjectName;
-import javax.xml.XMLConstants;
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.Source;
-import javax.xml.transform.stream.StreamSource;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.region.*;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.CompositeQueue;
-import org.apache.activemq.broker.region.virtual.CompositeTopic;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.plugin.jmx.RuntimeConfigurationView;
-import org.apache.activemq.schema.core.DtoAuthenticationUser;
-import org.apache.activemq.schema.core.DtoAuthorizationEntry;
-import org.apache.activemq.schema.core.DtoAuthorizationMap;
-import org.apache.activemq.schema.core.DtoAuthorizationPlugin;
import org.apache.activemq.schema.core.DtoBroker;
-import org.apache.activemq.schema.core.DtoCompositeQueue;
-import org.apache.activemq.schema.core.DtoCompositeTopic;
-import org.apache.activemq.schema.core.DtoNetworkConnector;
-import org.apache.activemq.schema.core.DtoPolicyEntry;
-import org.apache.activemq.schema.core.DtoPolicyMap;
-import org.apache.activemq.schema.core.DtoQueue;
-import org.apache.activemq.schema.core.DtoSimpleAuthenticationPlugin;
-import org.apache.activemq.schema.core.DtoTopic;
-import org.apache.activemq.schema.core.DtoVirtualDestinationInterceptor;
-import org.apache.activemq.schema.core.DtoVirtualTopic;
-import org.apache.activemq.security.AuthenticationUser;
-import org.apache.activemq.security.AuthorizationBroker;
-import org.apache.activemq.security.AuthorizationMap;
-import org.apache.activemq.security.SimpleAuthenticationBroker;
-import org.apache.activemq.security.SimpleAuthenticationPlugin;
-import org.apache.activemq.security.TempDestinationAuthorizationEntry;
-import org.apache.activemq.security.XBeanAuthorizationEntry;
-import org.apache.activemq.security.XBeanAuthorizationMap;
import org.apache.activemq.spring.Utils;
-import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.FactoryBean;
-import org.springframework.beans.factory.xml.PluggableSchemaResolver;
import org.springframework.core.io.Resource;
import org.w3c.dom.Document;
-import org.w3c.dom.Element;
import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
+import javax.management.JMException;
+import javax.management.ObjectName;
+import javax.xml.XMLConstants;
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.regex.Pattern;
+
public class RuntimeConfigurationBroker extends BrokerFilter {
public static final Logger LOG = LoggerFactory.getLogger(RuntimeConfigurationBroker.class);
@@ -117,8 +68,8 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
private Resource configToMonitor;
private DtoBroker currentConfiguration;
private Runnable monitorTask;
- private ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>();
- private ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>();
+ protected ConcurrentLinkedQueue<Runnable> addDestinationWork = new ConcurrentLinkedQueue<Runnable>();
+ protected ConcurrentLinkedQueue<Runnable> addConnectionWork = new ConcurrentLinkedQueue<Runnable>();
private ObjectName objectName;
private String infoString;
private Schema schema;
@@ -259,7 +210,11 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
}
}
- private void info(String s) {
+ protected void debug(String s) {
+ LOG.debug(s);
+ }
+
+ protected void info(String s) {
LOG.info(filterPasswords(s));
if (infoString != null) {
infoString += s;
@@ -267,7 +222,7 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
}
}
- private void info(String s, Throwable t) {
+ protected void info(String s, Throwable t) {
LOG.info(filterPasswords(s), t);
if (infoString != null) {
infoString += s;
@@ -295,352 +250,15 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
DtoBroker.DestinationPolicy.class,
DtoBroker.NetworkConnectors.class,
DtoBroker.DestinationInterceptors.class,
- DtoBroker.Plugins.class}) {
+ DtoBroker.Plugins.class,
+ DtoBroker.Destinations.class}) {
processChanges(currentConfiguration, modifiedConfiguration, upDatable);
}
}
private void processChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration, Class upDatable) {
-
- List current = filter(currentConfiguration, upDatable);
- List modified = filter(modifiedConfiguration, upDatable);
-
- if (current.equals(modified)) {
- LOG.debug("no changes to " + upDatable.getSimpleName());
- return;
- } else {
- info("changes to " + upDatable.getSimpleName());
- }
-
- int modIndex = 0, currentIndex = 0;
- for (; modIndex < modified.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
- // walk the list for mods
- applyModifications(getContents(current.get(currentIndex)),
- getContents(modified.get(modIndex)));
- }
-
- for (; modIndex < modified.size(); modIndex++) {
- // new element; add all
- for (Object nc : getContents(modified.get(modIndex))) {
- addNew(nc);
- }
- }
-
- for (; currentIndex < current.size(); currentIndex++) {
- // removal of element; remove all
- for (Object nc : getContents(current.get(currentIndex))) {
- remove(nc);
- }
- }
- }
-
- // mapping all supported updatable elements to support getContents
- private List<Object> getContents(Object o) {
- List<Object> answer = new ArrayList<Object>();
- try {
- Object val = o.getClass().getMethod("getContents", new Class[]{}).invoke(o, new Object[]{});
- if (val instanceof List) {
- answer = (List<Object>) val;
- } else {
- answer.add(val);
- }
- } catch (NoSuchMethodException mappingIncomplete) {
- LOG.debug(filterPasswords(o) + " has no modifiable elements");
- } catch (Exception e) {
- info("Failed to access getContents for " + o + ", runtime modifications not supported", e);
- }
- return answer;
- }
-
- private void applyModifications(List<Object> current, List<Object> modification) {
- int modIndex = 0, currentIndex = 0;
- for (; modIndex < modification.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
- Object existing = current.get(currentIndex);
- Object candidate = modification.get(modIndex);
- if (!existing.equals(candidate)) {
- info("modification to:" + existing + " , with: " + candidate);
- modify(existing, candidate);
- }
- }
-
- for (; modIndex < modification.size(); modIndex++) {
- addNew(modification.get(modIndex));
- }
-
- for (; currentIndex < current.size(); currentIndex++) {
- remove(current.get(currentIndex));
- }
- }
-
- private void modify(Object existing, Object candidate) {
- if (candidate instanceof DtoAuthorizationPlugin) {
-
- try {
- // replace authorization map - need exclusive write lock to total broker
- AuthorizationBroker authorizationBroker =
- (AuthorizationBroker) getBrokerService().getBroker().getAdaptor(AuthorizationBroker.class);
-
- authorizationBroker.setAuthorizationMap(fromDto(filter(candidate, DtoAuthorizationPlugin.Map.class)));
- } catch (Exception e) {
- info("failed to apply modified AuthorizationMap to AuthorizationBroker", e);
- }
-
- } else if (candidate instanceof DtoSimpleAuthenticationPlugin) {
- try {
- final SimpleAuthenticationPlugin updatedPlugin = fromDto(candidate, new SimpleAuthenticationPlugin());
- final SimpleAuthenticationBroker authenticationBroker =
- (SimpleAuthenticationBroker) getBrokerService().getBroker().getAdaptor(SimpleAuthenticationBroker.class);
- addConnectionWork.add(new Runnable() {
- public void run() {
- authenticationBroker.setUserGroups(updatedPlugin.getUserGroups());
- authenticationBroker.setUserPasswords(updatedPlugin.getUserPasswords());
- authenticationBroker.setAnonymousAccessAllowed(updatedPlugin.isAnonymousAccessAllowed());
- authenticationBroker.setAnonymousUser(updatedPlugin.getAnonymousUser());
- authenticationBroker.setAnonymousGroup(updatedPlugin.getAnonymousGroup());
- }
- });
- } catch (Exception e) {
- info("failed to apply SimpleAuthenticationPlugin modifications to SimpleAuthenticationBroker", e);
- }
-
- } else if (candidate instanceof DtoPolicyMap) {
-
- List<Object> existingEntries = filter(existing, DtoPolicyMap.PolicyEntries.class);
- List<Object> candidateEntries = filter(candidate, DtoPolicyMap.PolicyEntries.class);
- // walk the map for mods
- applyModifications(getContents(existingEntries.get(0)), getContents(candidateEntries.get(0)));
-
- } else if (candidate instanceof DtoPolicyEntry) {
-
- PolicyMap existingMap = getBrokerService().getDestinationPolicy();
-
- PolicyEntry updatedEntry = fromDto(candidate, new PolicyEntry());
-
- Set existingEntry = existingMap.get(updatedEntry.getDestination());
- if (existingEntry.size() == 1) {
- updatedEntry = fromDto(candidate, (PolicyEntry) existingEntry.iterator().next());
- applyRetrospectively(updatedEntry);
- info("updated policy for: " + updatedEntry.getDestination());
- } else {
- info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + updatedEntry.getDestination());
- }
-
- } else {
- remove(existing);
- addNew(candidate);
- }
- }
-
- private void applyRetrospectively(PolicyEntry updatedEntry) {
- RegionBroker regionBroker = (RegionBroker) getBrokerService().getRegionBroker();
- for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) {
- Destination target = destination;
- if (destination instanceof DestinationFilter) {
- target = ((DestinationFilter)destination).getNext();
- }
- if (target.getActiveMQDestination().isQueue()) {
- updatedEntry.update((Queue) target);
- } else if (target.getActiveMQDestination().isTopic()) {
- updatedEntry.update((Topic) target);
- }
- LOG.debug("applied update to:" + target);
- }
- }
-
- private AuthorizationMap fromDto(List<Object> map) {
- XBeanAuthorizationMap xBeanAuthorizationMap = new XBeanAuthorizationMap();
- for (Object o : map) {
- if (o instanceof DtoAuthorizationPlugin.Map) {
- DtoAuthorizationPlugin.Map dtoMap = (DtoAuthorizationPlugin.Map) o;
- List<DestinationMapEntry> entries = new LinkedList<DestinationMapEntry>();
- // revisit - would like to map getAuthorizationMap to generic getContents
- for (Object authMap : filter(dtoMap.getAuthorizationMap(), DtoAuthorizationMap.AuthorizationEntries.class)) {
- for (Object entry : filter(getContents(authMap), DtoAuthorizationEntry.class)) {
- entries.add(fromDto(entry, new XBeanAuthorizationEntry()));
- }
- }
- xBeanAuthorizationMap.setAuthorizationEntries(entries);
- try {
- xBeanAuthorizationMap.afterPropertiesSet();
- } catch (Exception e) {
- info("failed to update xBeanAuthorizationMap auth entries:", e);
- }
-
- for (Object entry : filter(dtoMap.getAuthorizationMap(), DtoAuthorizationMap.TempDestinationAuthorizationEntry.class)) {
- // another restriction - would like to be getContents
- DtoAuthorizationMap.TempDestinationAuthorizationEntry dtoEntry = (DtoAuthorizationMap.TempDestinationAuthorizationEntry) entry;
- xBeanAuthorizationMap.setTempDestinationAuthorizationEntry(fromDto(dtoEntry.getTempDestinationAuthorizationEntry(), new TempDestinationAuthorizationEntry()));
- }
-
- } else {
- info("No support for updates to: " + o);
- }
- }
- return xBeanAuthorizationMap;
- }
-
- private void remove(Object o) {
- if (o instanceof DtoNetworkConnector) {
- DtoNetworkConnector toRemove = (DtoNetworkConnector) o;
- for (NetworkConnector existingCandidate :
- getBrokerService().getNetworkConnectors()) {
- if (configMatch(toRemove, existingCandidate)) {
- if (getBrokerService().removeNetworkConnector(existingCandidate)) {
- try {
- existingCandidate.stop();
- info("stopped and removed networkConnector: " + existingCandidate);
- } catch (Exception e) {
- info("Failed to stop removed network connector: " + existingCandidate);
- }
- }
- }
- }
- } else if (o instanceof DtoVirtualDestinationInterceptor) {
- // whack it
- addDestinationWork.add(new Runnable() {
- public void run() {
- List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
- for (DestinationInterceptor candidate : getBrokerService().getDestinationInterceptors()) {
- if (!(candidate instanceof VirtualDestinationInterceptor)) {
- interceptorsList.add(candidate);
- }
- }
- DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
- getBrokerService().setDestinationInterceptors(destinationInterceptors);
- ((CompositeDestinationInterceptor) ((RegionBroker) getBrokerService().getRegionBroker()).getDestinationInterceptor()).setInterceptors(destinationInterceptors);
- info("removed VirtualDestinationInterceptor from: " + interceptorsList);
- }
- });
- } else {
- info("No runtime support for removal of: " + o);
- }
- }
-
- private boolean configMatch(DtoNetworkConnector dto, NetworkConnector candidate) {
- TreeMap<String, String> dtoProps = new TreeMap<String, String>();
- IntrospectionSupport.getProperties(dto, dtoProps, null);
-
- TreeMap<String, String> candidateProps = new TreeMap<String, String>();
- IntrospectionSupport.getProperties(candidate, candidateProps, null);
-
- // every dto prop must be present in the candidate
- for (String key : dtoProps.keySet()) {
- if (!candidateProps.containsKey(key) || !candidateProps.get(key).equals(dtoProps.get(key))) {
- return false;
- }
- }
- return true;
- }
-
- private void addNew(Object o) {
- if (o instanceof DtoNetworkConnector) {
- DtoNetworkConnector networkConnector = (DtoNetworkConnector) o;
- if (networkConnector.getUri() != null) {
- try {
- DiscoveryNetworkConnector nc = fromDto(networkConnector, new DiscoveryNetworkConnector());
- getBrokerService().addNetworkConnector(nc);
- nc.start();
- info("started new network connector: " + nc);
- } catch (Exception e) {
- info("Failed to add new networkConnector " + networkConnector, e);
- }
- }
- } else if (o instanceof DtoVirtualDestinationInterceptor) {
- final DtoVirtualDestinationInterceptor dto = (DtoVirtualDestinationInterceptor) o;
- addDestinationWork.add(new Runnable() {
- public void run() {
-
- boolean updatedExistingInterceptor = false;
- RegionBroker regionBroker = (RegionBroker) getBrokerService().getRegionBroker();
-
- for (DestinationInterceptor destinationInterceptor : getBrokerService().getDestinationInterceptors()) {
- if (destinationInterceptor instanceof VirtualDestinationInterceptor) {
- // update existing interceptor
- final VirtualDestinationInterceptor virtualDestinationInterceptor =
- (VirtualDestinationInterceptor) destinationInterceptor;
-
- virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
- info("applied updates to: " + virtualDestinationInterceptor);
- updatedExistingInterceptor = true;
- }
- }
-
- if (!updatedExistingInterceptor) {
- // add
- VirtualDestinationInterceptor virtualDestinationInterceptor =
- new VirtualDestinationInterceptor();
- virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
-
- List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
- interceptorsList.addAll(Arrays.asList(getBrokerService().getDestinationInterceptors()));
- interceptorsList.add(virtualDestinationInterceptor);
-
- DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
- getBrokerService().setDestinationInterceptors(destinationInterceptors);
-
- ((CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor()).setInterceptors(destinationInterceptors);
- info("applied new: " + interceptorsList);
- }
- regionBroker.reapplyInterceptor();
- }
- });
- } else if (o instanceof DtoPolicyEntry) {
-
- PolicyEntry addition = fromDto(o, new PolicyEntry());
- PolicyMap existingMap = getBrokerService().getDestinationPolicy();
- existingMap.put(addition.getDestination(), addition);
- applyRetrospectively(addition);
- info("added policy for: " + addition.getDestination());
-
- } else {
- info("No runtime support for additions of " + o);
- }
- }
-
- private VirtualDestination[] fromDto(DtoVirtualDestinationInterceptor virtualDestinationInterceptor) {
- List<VirtualDestination> answer = new ArrayList<VirtualDestination>();
- for (Object vd : filter(virtualDestinationInterceptor, DtoVirtualDestinationInterceptor.VirtualDestinations.class)) {
- for (Object vt : filter(vd, DtoVirtualTopic.class)) {
- answer.add(fromDto(vt, new VirtualTopic()));
- }
- for (Object vt : filter(vd, DtoCompositeTopic.class)) {
- answer.add(fromDto(vt, new CompositeTopic()));
- }
- for (Object vt : filter(vd, DtoCompositeQueue.class)) {
- answer.add(fromDto(vt, new CompositeQueue()));
- }
- }
- VirtualDestination[] array = new VirtualDestination[answer.size()];
- answer.toArray(array);
- return array;
- }
-
- private <T> T fromDto(Object dto, T instance) {
- Properties properties = new Properties();
- IntrospectionSupport.getProperties(dto, properties, null);
- replacePlaceHolders(properties);
- LOG.trace("applying props: " + filterPasswords(properties) + ", to " + instance.getClass().getSimpleName());
- IntrospectionSupport.setProperties(instance, properties);
-
- // deal with nested elements
- for (Object nested : filter(dto, Object.class)) {
- String elementName = nested.getClass().getSimpleName();
- Method setter = findSetter(instance, elementName);
- if (setter != null) {
- List<Object> argument = new LinkedList<Object>();
- for (Object elementContent : filter(nested, Object.class)) {
- argument.add(fromDto(elementContent, inferTargetObject(elementContent)));
- }
- try {
- setter.invoke(instance, matchType(argument, setter.getParameterTypes()[0]));
- } catch (Exception e) {
- info("failed to invoke " + setter + " on " + instance, e);
- }
- } else {
- info("failed to find setter for " + elementName + " on :" + instance);
- }
- }
- return instance;
+ ConfigurationProcessor processor = ProcessorFactory.createProcessor(this, upDatable);
+ processor.processChanges(currentConfiguration, modifiedConfiguration);
}
Pattern matchPassword = Pattern.compile("password=.*,");
@@ -648,56 +266,6 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,");
}
- private Object matchType(List<Object> parameterValues, Class<?> aClass) {
- Object result = parameterValues;
- if (Set.class.isAssignableFrom(aClass)) {
- result = new HashSet(parameterValues);
- }
- return result;
- }
-
- private Object inferTargetObject(Object elementContent) {
- if (DtoTopic.class.isAssignableFrom(elementContent.getClass())) {
- return new ActiveMQTopic();
- } else if (DtoQueue.class.isAssignableFrom(elementContent.getClass())) {
- return new ActiveMQQueue();
- } else if (DtoAuthenticationUser.class.isAssignableFrom(elementContent.getClass())) {
- return new AuthenticationUser();
- } else {
- info("update not supported for dto: " + elementContent);
- return new Object();
- }
- }
-
- private Method findSetter(Object instance, String elementName) {
- String setter = "set" + elementName;
- for (Method m : instance.getClass().getMethods()) {
- if (setter.equals(m.getName())) {
- return m;
- }
- }
- return null;
- }
-
- private <T> List<Object> filter(Object obj, Class<T> type) {
- return filter(getContents(obj), type);
- }
-
- private <T> List<Object> filter(List<Object> objectList, Class<T> type) {
- List<Object> result = new LinkedList<Object>();
- for (Object o : objectList) {
- if (o instanceof JAXBElement) {
- JAXBElement element = (JAXBElement) o;
- if (type.isAssignableFrom(element.getDeclaredType())) {
- result.add((T) element.getValue());
- }
- } else if (type.isAssignableFrom(o.getClass())) {
- result.add((T) o);
- }
- }
- return result;
- }
-
private DtoBroker loadConfiguration(Resource configToMonitor) {
DtoBroker jaxbConfig = null;
if (configToMonitor != null) {
@@ -748,103 +316,10 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
if (brokerContext != null) {
Properties initialProperties = new Properties(System.getProperties());
placeHolderUtil = new PropertiesPlaceHolderUtil(initialProperties);
- mergeProperties(doc, initialProperties, brokerContext);
- }
- }
-
- private void mergeProperties(Document doc, Properties initialProperties, BrokerContext brokerContext) {
- // find resources
- // <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
- // <property name="locations" || name="properties">
- // ...
- // </property>
- // </bean>
- LinkedList<String> resources = new LinkedList<String>();
- LinkedList<String> propertiesClazzes = new LinkedList<String>();
- NodeList beans = doc.getElementsByTagNameNS("*", "bean");
- for (int i = 0; i < beans.getLength(); i++) {
- Node bean = beans.item(0);
- if (bean.hasAttributes() && bean.getAttributes().getNamedItem("class").getTextContent().contains("PropertyPlaceholderConfigurer")) {
- if (bean.hasChildNodes()) {
- NodeList beanProps = bean.getChildNodes();
- for (int j = 0; j < beanProps.getLength(); j++) {
- Node beanProp = beanProps.item(j);
- if (Node.ELEMENT_NODE == beanProp.getNodeType() && beanProp.hasAttributes() && beanProp.getAttributes().getNamedItem("name") != null) {
- String propertyName = beanProp.getAttributes().getNamedItem("name").getTextContent();
- if ("locations".equals(propertyName)) {
-
- // interested in value or list/value of locations property
- Element beanPropElement = (Element) beanProp;
- NodeList values = beanPropElement.getElementsByTagNameNS("*", "value");
- for (int k = 0; k < values.getLength(); k++) {
- Node value = values.item(k);
- resources.add(value.getFirstChild().getTextContent());
- }
- } else if ("properties".equals(propertyName)) {
-
- // bean or beanFactory
- Element beanPropElement = (Element) beanProp;
- NodeList values = beanPropElement.getElementsByTagNameNS("*", "bean");
- for (int k = 0; k < values.getLength(); k++) {
- Node value = values.item(k);
- if (value.hasAttributes()) {
- Node beanClassTypeNode = value.getAttributes().getNamedItem("class");
- if (beanClassTypeNode != null) {
- propertiesClazzes.add(beanClassTypeNode.getFirstChild().getTextContent());
- }
- }
- }
- }
- }
- }
- }
- }
- }
- for (String value : propertiesClazzes) {
- try {
- Object springBean = getClass().getClassLoader().loadClass(value).newInstance();
- if (springBean instanceof FactoryBean) {
- // can't access the factory or created properties from spring context so we got to recreate
- initialProperties.putAll((Properties) FactoryBean.class.getMethod("getObject", (Class<?>[]) null).invoke(springBean));
- }
- } catch (Throwable e) {
- LOG.debug("unexpected exception processing properties bean class: " + propertiesClazzes, e);
- }
- }
- List<Resource> propResources = new LinkedList<Resource>();
- for (String value : resources) {
- try {
- if (!value.isEmpty()) {
- propResources.add(Utils.resourceFromString(replacePlaceHolders(value)));
- }
- } catch (MalformedURLException e) {
- info("failed to resolve resource: " + value, e);
- }
- }
- for (Resource resource : propResources) {
- Properties properties = new Properties();
- try {
- properties.load(resource.getInputStream());
- } catch (IOException e) {
- info("failed to load properties resource: " + resource, e);
- }
- initialProperties.putAll(properties);
+ placeHolderUtil.mergeProperties(doc, initialProperties, brokerContext);
}
}
- private void replacePlaceHolders(Properties properties) {
- if (placeHolderUtil != null) {
- placeHolderUtil.filter(properties);
- }
- }
-
- private String replacePlaceHolders(String s) {
- if (placeHolderUtil != null) {
- s = placeHolderUtil.filter(s);
- }
- return s;
- }
-
private Schema getSchema() throws SAXException, IOException {
if (schema == null) {
SchemaFactory schemaFactory = SchemaFactory.newInstance(
@@ -874,67 +349,4 @@ public class RuntimeConfigurationBroker extends BrokerFilter {
this.checkPeriod = checkPeriod;
}
- static public class PropertiesPlaceHolderUtil {
-
- static final Pattern pattern = Pattern.compile("\\$\\{([^\\}]+)\\}");
- final Properties properties;
-
- public PropertiesPlaceHolderUtil(Properties properties) {
- this.properties = properties;
- }
-
- public void filter(Properties toFilter) {
- for (Map.Entry<Object, Object> entry : toFilter.entrySet()) {
- String val = (String) entry.getValue();
- String newVal = filter(val);
- if (!val.equals(newVal)) {
- toFilter.put(entry.getKey(), newVal);
- }
- }
- }
-
- public String filter(String str) {
- int start = 0;
- while (true) {
- Matcher matcher = pattern.matcher(str);
- if (!matcher.find(start)) {
- break;
- }
- String group = matcher.group(1);
- String property = properties.getProperty(group);
- if (property != null) {
- str = matcher.replaceFirst(Matcher.quoteReplacement(property));
- } else {
- start = matcher.end();
- }
- }
- return replaceBytePostfix(str);
- }
-
- static Pattern[] byteMatchers = new Pattern[] {
- Pattern.compile("^\\s*(\\d+)\\s*(b)?\\s*$", Pattern.CASE_INSENSITIVE),
- Pattern.compile("^\\s*(\\d+)\\s*k(b)?\\s*$", Pattern.CASE_INSENSITIVE),
- Pattern.compile("^\\s*(\\d+)\\s*m(b)?\\s*$", Pattern.CASE_INSENSITIVE),
- Pattern.compile("^\\s*(\\d+)\\s*g(b)?\\s*$", Pattern.CASE_INSENSITIVE)};
-
- // xbean can Xb, Xkb, Xmb, Xg etc
- private String replaceBytePostfix(String str) {
- try {
- for (int i=0; i< byteMatchers.length; i++) {
- Matcher matcher = byteMatchers[i].matcher(str);
- if (matcher.matches()) {
- long value = Long.parseLong(matcher.group(1));
- for (int j=1; j<=i; j++) {
- value *= 1024;
- }
- return String.valueOf(value);
- }
- }
- } catch (NumberFormatException ignored) {
- LOG.debug("nfe on: " + str, ignored);
- }
- return str;
- }
-
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/SimpleAuthenticationPluginProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/SimpleAuthenticationPluginProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/SimpleAuthenticationPluginProcessor.java
new file mode 100644
index 0000000..359d970
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/SimpleAuthenticationPluginProcessor.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.security.SimpleAuthenticationBroker;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
+
+public class SimpleAuthenticationPluginProcessor extends DefaultConfigurationProcessor {
+
+ public SimpleAuthenticationPluginProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ super(plugin, configurationClass);
+ }
+
+ @Override
+ public void modify(Object existing, Object candidate) {
+ try {
+ final SimpleAuthenticationPlugin updatedPlugin = fromDto(candidate, new SimpleAuthenticationPlugin());
+ final SimpleAuthenticationBroker authenticationBroker =
+ (SimpleAuthenticationBroker) plugin.getBrokerService().getBroker().getAdaptor(SimpleAuthenticationBroker.class);
+ plugin.addConnectionWork.add(new Runnable() {
+ public void run() {
+ authenticationBroker.setUserGroups(updatedPlugin.getUserGroups());
+ authenticationBroker.setUserPasswords(updatedPlugin.getUserPasswords());
+ authenticationBroker.setAnonymousAccessAllowed(updatedPlugin.isAnonymousAccessAllowed());
+ authenticationBroker.setAnonymousUser(updatedPlugin.getAnonymousUser());
+ authenticationBroker.setAnonymousGroup(updatedPlugin.getAnonymousGroup());
+ }
+ });
+ } catch (Exception e) {
+ plugin.info("failed to apply SimpleAuthenticationPlugin modifications to SimpleAuthenticationBroker", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/VirtualDestinationInterceptorProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/VirtualDestinationInterceptorProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/VirtualDestinationInterceptorProcessor.java
new file mode 100644
index 0000000..356755c
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/VirtualDestinationInterceptorProcessor.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.virtual.*;
+import org.apache.activemq.schema.core.DtoVirtualDestinationInterceptor;
+import org.apache.activemq.schema.core.DtoVirtualTopic;
+import org.apache.activemq.schema.core.DtoCompositeTopic;
+import org.apache.activemq.schema.core.DtoCompositeQueue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class VirtualDestinationInterceptorProcessor extends DefaultConfigurationProcessor {
+
+ public VirtualDestinationInterceptorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ super(plugin, configurationClass);
+ }
+
+ @Override
+ public void addNew(Object o) {
+ final DtoVirtualDestinationInterceptor dto = (DtoVirtualDestinationInterceptor) o;
+ plugin.addDestinationWork.add(new Runnable() {
+ public void run() {
+
+ boolean updatedExistingInterceptor = false;
+ RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService().getRegionBroker();
+
+ for (DestinationInterceptor destinationInterceptor : plugin.getBrokerService().getDestinationInterceptors()) {
+ if (destinationInterceptor instanceof VirtualDestinationInterceptor) {
+ // update existing interceptor
+ final VirtualDestinationInterceptor virtualDestinationInterceptor =
+ (VirtualDestinationInterceptor) destinationInterceptor;
+
+ virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
+ plugin.info("applied updates to: " + virtualDestinationInterceptor);
+ updatedExistingInterceptor = true;
+ }
+ }
+
+ if (!updatedExistingInterceptor) {
+ // add
+ VirtualDestinationInterceptor virtualDestinationInterceptor =
+ new VirtualDestinationInterceptor();
+ virtualDestinationInterceptor.setVirtualDestinations(fromDto(dto));
+
+ List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
+ interceptorsList.addAll(Arrays.asList(plugin.getBrokerService().getDestinationInterceptors()));
+ interceptorsList.add(virtualDestinationInterceptor);
+
+ DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
+ plugin.getBrokerService().setDestinationInterceptors(destinationInterceptors);
+
+ ((CompositeDestinationInterceptor) regionBroker.getDestinationInterceptor()).setInterceptors(destinationInterceptors);
+ plugin.info("applied new: " + interceptorsList);
+ }
+ regionBroker.reapplyInterceptor();
+ }
+ });
+ }
+
+ @Override
+ public void remove(Object o) {
+ // whack it
+ plugin.addDestinationWork.add(new Runnable() {
+ public void run() {
+ List<DestinationInterceptor> interceptorsList = new ArrayList<DestinationInterceptor>();
+ for (DestinationInterceptor candidate : plugin.getBrokerService().getDestinationInterceptors()) {
+ if (!(candidate instanceof VirtualDestinationInterceptor)) {
+ interceptorsList.add(candidate);
+ }
+ }
+ DestinationInterceptor[] destinationInterceptors = interceptorsList.toArray(new DestinationInterceptor[]{});
+ plugin.getBrokerService().setDestinationInterceptors(destinationInterceptors);
+ ((CompositeDestinationInterceptor) ((RegionBroker) plugin.getBrokerService().getRegionBroker()).getDestinationInterceptor()).setInterceptors(destinationInterceptors);
+ plugin.info("removed VirtualDestinationInterceptor from: " + interceptorsList);
+ }
+ });
+ }
+
+ private VirtualDestination[] fromDto(DtoVirtualDestinationInterceptor virtualDestinationInterceptor) {
+ List<VirtualDestination> answer = new ArrayList<VirtualDestination>();
+ for (Object vd : filter(virtualDestinationInterceptor, DtoVirtualDestinationInterceptor.VirtualDestinations.class)) {
+ for (Object vt : filter(vd, DtoVirtualTopic.class)) {
+ answer.add(fromDto(vt, new VirtualTopic()));
+ }
+ for (Object vt : filter(vd, DtoCompositeTopic.class)) {
+ answer.add(fromDto(vt, new CompositeTopic()));
+ }
+ for (Object vt : filter(vd, DtoCompositeQueue.class)) {
+ answer.add(fromDto(vt, new CompositeQueue()));
+ }
+ }
+ VirtualDestination[] array = new VirtualDestination[answer.size()];
+ answer.toArray(array);
+ return array;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/resources/binding.xjb
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/resources/binding.xjb b/activemq-runtime-config/src/main/resources/binding.xjb
index aceb265..0e728c2 100644
--- a/activemq-runtime-config/src/main/resources/binding.xjb
+++ b/activemq-runtime-config/src/main/resources/binding.xjb
@@ -36,6 +36,10 @@
<jxb:property name="Contents" />
</jxb:bindings>
+ <jxb:bindings node="xs:element[@name='broker']/xs:complexType/xs:choice/xs:choice/xs:element[@name='destinations']/xs:complexType/xs:choice">
+ <jxb:property name="Contents" />
+ </jxb:bindings>
+
<jxb:bindings node="xs:element[@name='queue']/xs:complexType/xs:choice">
<jxb:property name="Contents" />
</jxb:bindings>
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/test/java/org/apache/activemq/DestinationsTest.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/DestinationsTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/DestinationsTest.java
new file mode 100644
index 0000000..2500659
--- /dev/null
+++ b/activemq-runtime-config/src/test/java/org/apache/activemq/DestinationsTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Set;
+
+import static org.junit.Assert.assertTrue;
+
+public class DestinationsTest extends RuntimeConfigTestSupport {
+ public static final Logger LOG = LoggerFactory.getLogger(DestinationsTest.class);
+
+ @Test
+ public void testMod() throws Exception {
+ String configurationSeed = "destinationTest";
+ final String brokerConfig = configurationSeed + "-destinations";
+ applyNewConfig(brokerConfig, configurationSeed + "-original");
+
+ startBroker(brokerConfig);
+ assertTrue("broker alive", brokerService.isStarted());
+ printDestinations();
+ assertTrue("contains original", containsDestination(new ActiveMQQueue("ORIGINAL")));
+
+ LOG.info("Adding destinations");
+ applyNewConfig(brokerConfig, configurationSeed + "-add", SLEEP);
+ printDestinations();
+ assertTrue("contains original", containsDestination(new ActiveMQQueue("ORIGINAL")));
+ assertTrue("contains before", containsDestination(new ActiveMQTopic("BEFORE")));
+ assertTrue("contains after", containsDestination(new ActiveMQQueue("AFTER")));
+
+
+ LOG.info("Removing destinations");
+ applyNewConfig(brokerConfig, configurationSeed + "-remove", SLEEP);
+ printDestinations();
+ assertTrue("contains original", containsDestination(new ActiveMQQueue("ORIGINAL")));
+ assertTrue("contains before", containsDestination(new ActiveMQTopic("BEFORE")));
+ assertTrue("contains after", containsDestination(new ActiveMQQueue("AFTER")));
+ }
+
+ protected boolean containsDestination(ActiveMQDestination destination) throws Exception {
+ return Arrays.asList(brokerService.getRegionBroker().getDestinations()).contains(destination);
+ }
+
+ protected void printDestinations() throws Exception {
+ ActiveMQDestination[] destinations = brokerService.getRegionBroker().getDestinations();
+ for (ActiveMQDestination destination : destinations) {
+ LOG.info("Broker destination: " + destination.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-add.xml
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-add.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-add.xml
new file mode 100644
index 0000000..9d36aa5
--- /dev/null
+++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-add.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
+
+ <destinations>
+ <topic physicalName="BEFORE"/>
+ <queue physicalName="ORIGINAL"/>
+ <queue physicalName="AFTER"/>
+ </destinations>
+
+ <plugins>
+ <runtimeConfigurationPlugin checkPeriod="1000"/>
+ </plugins>
+ </broker>
+</beans>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-original.xml
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-original.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-original.xml
new file mode 100644
index 0000000..8af4e9c
--- /dev/null
+++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-original.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
+
+ <destinations>
+ <queue physicalName="ORIGINAL"/>
+ </destinations>
+
+ <plugins>
+ <runtimeConfigurationPlugin checkPeriod="1000"/>
+ </plugins>
+ </broker>
+</beans>
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-remove.xml
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-remove.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-remove.xml
new file mode 100644
index 0000000..895b2ee
--- /dev/null
+++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/destinationTest-remove.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
+
+ <destinations>
+ <topic physicalName="BEFORE"/>
+ <queue physicalName="AFTER"/>
+ </destinations>
+
+
+ <plugins>
+ <runtimeConfigurationPlugin checkPeriod="1000"/>
+ </plugins>
+ </broker>
+</beans>
\ No newline at end of file
[2/2] git commit: https://issues.apache.org/jira/browse/AMQ-5305 -
modify element with runtime configuration plugin
Posted by de...@apache.org.
https://issues.apache.org/jira/browse/AMQ-5305 - modify <destination> element with runtime configuration plugin
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bbc039fc
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bbc039fc
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bbc039fc
Branch: refs/heads/trunk
Commit: bbc039fceb85239667563f8c852db6ed016b73f8
Parents: b76d831
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Thu Aug 28 14:08:11 2014 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Thu Aug 28 14:08:41 2014 +0200
----------------------------------------------------------------------
.../plugin/AuthorizationPluginProcessor.java | 77 +++
.../activemq/plugin/ConfigurationProcessor.java | 36 +
.../plugin/DefaultConfigurationProcessor.java | 215 ++++++
.../plugin/DestinationInterceptorProcessor.java | 34 +
.../plugin/DestinationPolicyProcessor.java | 37 ++
.../activemq/plugin/DestinationsProcessor.java | 76 +++
.../org/apache/activemq/plugin/JAXBUtils.java | 65 ++
.../plugin/NetworkConnectorProcessor.java | 80 +++
.../plugin/NetworkConnectorsProcessor.java | 34 +
.../activemq/plugin/PluginsProcessor.java | 37 ++
.../activemq/plugin/PolicyEntryProcessor.java | 71 ++
.../activemq/plugin/PolicyMapProcessor.java | 44 ++
.../activemq/plugin/ProcessorFactory.java | 39 ++
.../plugin/PropertiesPlaceHolderUtil.java | 183 +++++
.../plugin/RuntimeConfigurationBroker.java | 660 +------------------
.../SimpleAuthenticationPluginProcessor.java | 47 ++
.../VirtualDestinationInterceptorProcessor.java | 116 ++++
.../src/main/resources/binding.xjb | 4 +
.../org/apache/activemq/DestinationsTest.java | 72 ++
.../org/apache/activemq/destinationTest-add.xml | 36 +
.../activemq/destinationTest-original.xml | 34 +
.../apache/activemq/destinationTest-remove.xml | 36 +
22 files changed, 1409 insertions(+), 624 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AuthorizationPluginProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AuthorizationPluginProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AuthorizationPluginProcessor.java
new file mode 100644
index 0000000..e7f2fa0
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/AuthorizationPluginProcessor.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.filter.DestinationMapEntry;
+import org.apache.activemq.security.*;
+import org.apache.activemq.schema.core.DtoAuthorizationPlugin;
+import org.apache.activemq.schema.core.DtoAuthorizationMap;
+import org.apache.activemq.schema.core.DtoAuthorizationEntry;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class AuthorizationPluginProcessor extends DefaultConfigurationProcessor {
+
+ public AuthorizationPluginProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ super(plugin, configurationClass);
+ }
+
+ @Override
+ public void modify(Object existing, Object candidate) {
+ try {
+ // replace authorization map - need exclusive write lock to total broker
+ AuthorizationBroker authorizationBroker =
+ (AuthorizationBroker) plugin.getBrokerService().getBroker().getAdaptor(AuthorizationBroker.class);
+
+ authorizationBroker.setAuthorizationMap(fromDto(filter(candidate, DtoAuthorizationPlugin.Map.class)));
+ } catch (Exception e) {
+ plugin.info("failed to apply modified AuthorizationMap to AuthorizationBroker", e);
+ }
+ }
+
+ private AuthorizationMap fromDto(List<Object> map) {
+ XBeanAuthorizationMap xBeanAuthorizationMap = new XBeanAuthorizationMap();
+ for (Object o : map) {
+ if (o instanceof DtoAuthorizationPlugin.Map) {
+ DtoAuthorizationPlugin.Map dtoMap = (DtoAuthorizationPlugin.Map) o;
+ List<DestinationMapEntry> entries = new LinkedList<DestinationMapEntry>();
+ // revisit - would like to map getAuthorizationMap to generic getContents
+ for (Object authMap : filter(dtoMap.getAuthorizationMap(), DtoAuthorizationMap.AuthorizationEntries.class)) {
+ for (Object entry : filter(getContents(authMap), DtoAuthorizationEntry.class)) {
+ entries.add(fromDto(entry, new XBeanAuthorizationEntry()));
+ }
+ }
+ xBeanAuthorizationMap.setAuthorizationEntries(entries);
+ try {
+ xBeanAuthorizationMap.afterPropertiesSet();
+ } catch (Exception e) {
+ plugin.info("failed to update xBeanAuthorizationMap auth entries:", e);
+ }
+
+ for (Object entry : filter(dtoMap.getAuthorizationMap(), DtoAuthorizationMap.TempDestinationAuthorizationEntry.class)) {
+ // another restriction - would like to be getContents
+ DtoAuthorizationMap.TempDestinationAuthorizationEntry dtoEntry = (DtoAuthorizationMap.TempDestinationAuthorizationEntry) entry;
+ xBeanAuthorizationMap.setTempDestinationAuthorizationEntry(fromDto(dtoEntry.getTempDestinationAuthorizationEntry(), new TempDestinationAuthorizationEntry()));
+ }
+ } else {
+ plugin.info("No support for updates to: " + o);
+ }
+ }
+ return xBeanAuthorizationMap;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/ConfigurationProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/ConfigurationProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/ConfigurationProcessor.java
new file mode 100644
index 0000000..89dc2b2
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/ConfigurationProcessor.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import java.util.List;
+import org.apache.activemq.schema.core.DtoBroker;
+
+public interface ConfigurationProcessor {
+
+ public void processChanges(List current, List modified);
+
+ public void processChanges(DtoBroker current, DtoBroker modified);
+
+ public void modify(Object existing, Object candidate);
+
+ public void addNew(Object o);
+
+ public void remove(Object o);
+
+ public ConfigurationProcessor findProcessor(Object o);
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DefaultConfigurationProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DefaultConfigurationProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DefaultConfigurationProcessor.java
new file mode 100644
index 0000000..fddfe48
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DefaultConfigurationProcessor.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.util.IntrospectionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBElement;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import org.apache.activemq.schema.core.DtoBroker;
+
+public class DefaultConfigurationProcessor implements ConfigurationProcessor {
+
+ public static final Logger LOG = LoggerFactory.getLogger(DefaultConfigurationProcessor.class);
+ RuntimeConfigurationBroker plugin;
+ Class configurationClass;
+
+ Pattern matchPassword = Pattern.compile("password=.*,");
+
+ public DefaultConfigurationProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ this.plugin = plugin;
+ this.configurationClass = configurationClass;
+ }
+
+ @Override
+ public void processChanges(DtoBroker currentConfiguration, DtoBroker modifiedConfiguration) {
+ List current = filter(currentConfiguration, configurationClass);
+ List modified = filter(modifiedConfiguration, configurationClass);
+
+ if (current.equals(modified)) {
+ plugin.debug("no changes to " + configurationClass.getSimpleName());
+ return;
+ } else {
+ plugin.info("changes to " + configurationClass.getSimpleName());
+ }
+
+ processChanges(current, modified);
+ }
+
+ public void processChanges(List current, List modified) {
+ int modIndex = 0, currentIndex = 0;
+ for (; modIndex < modified.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
+ // walk the list for mods
+ applyModifications(getContents(current.get(currentIndex)),
+ getContents(modified.get(modIndex)));
+ }
+
+ for (; modIndex < modified.size(); modIndex++) {
+ // new element; add all
+ for (Object nc : getContents(modified.get(modIndex))) {
+ ConfigurationProcessor processor = findProcessor(nc);
+ if (processor != null) {
+ processor.addNew(nc);
+ } else {
+ addNew(nc);
+ }
+ }
+ }
+
+ for (; currentIndex < current.size(); currentIndex++) {
+ // removal of element; remove all
+ for (Object nc : getContents(current.get(currentIndex))) {
+ ConfigurationProcessor processor = findProcessor(nc);
+ if (processor != null) {
+ processor.remove(nc);
+ } else {
+ remove(nc);
+ }
+ }
+ }
+ }
+
+ protected void applyModifications(List<Object> current, List<Object> modification) {
+ int modIndex = 0, currentIndex = 0;
+ for (; modIndex < modification.size() && currentIndex < current.size(); modIndex++, currentIndex++) {
+ Object existing = current.get(currentIndex);
+ Object candidate = modification.get(modIndex);
+ if (!existing.equals(candidate)) {
+ plugin.info("modification to:" + existing + " , with: " + candidate);
+ ConfigurationProcessor processor = findProcessor(existing);
+ if (processor != null) {
+ processor.modify(existing, candidate);
+ } else {
+ modify(existing, candidate);
+ }
+ }
+ }
+ for (; modIndex < modification.size(); modIndex++) {
+ Object mod = modification.get(modIndex);
+ ConfigurationProcessor processor = findProcessor(mod);
+ if (processor != null) {
+ processor.addNew(mod);
+ } else {
+ addNew(mod);
+ }
+ }
+ for (; currentIndex < current.size(); currentIndex++) {
+ Object mod = current.get(currentIndex);
+ ConfigurationProcessor processor = findProcessor(mod);
+ if (processor != null) {
+ processor.remove(mod);
+ } else {
+ remove(mod);
+ }
+ }
+ }
+
+ public void modify(Object existing, Object candidate) {
+ remove(existing);
+ addNew(candidate);
+ }
+
+ public void addNew(Object o) {
+ plugin.info("No runtime support for additions of " + o);
+ }
+
+ public void remove(Object o) {
+ plugin.info("No runtime support for removal of: " + o);
+ }
+
+ @Override
+ public ConfigurationProcessor findProcessor(Object o) {
+ plugin.info("No processor for " + o);
+ return null;
+ }
+
+ // mapping all supported updatable elements to support getContents
+ protected List<Object> getContents(Object o) {
+ List<Object> answer = new ArrayList<Object>();
+ try {
+ Object val = o.getClass().getMethod("getContents", new Class[]{}).invoke(o, new Object[]{});
+ if (val instanceof List) {
+ answer = (List<Object>) val;
+ } else {
+ answer.add(val);
+ }
+ } catch (NoSuchMethodException mappingIncomplete) {
+ plugin.debug(filterPasswords(o) + " has no modifiable elements");
+ } catch (Exception e) {
+ plugin.info("Failed to access getContents for " + o + ", runtime modifications not supported", e);
+ }
+ return answer;
+ }
+
+ protected String filterPasswords(Object toEscape) {
+ return matchPassword.matcher(toEscape.toString()).replaceAll("password=???,");
+ }
+
+ protected <T> List<Object> filter(Object obj, Class<T> type) {
+ return filter(getContents(obj), type);
+ }
+
+ protected <T> List<Object> filter(List<Object> objectList, Class<T> type) {
+ List<Object> result = new LinkedList<Object>();
+ for (Object o : objectList) {
+ if (o instanceof JAXBElement) {
+ JAXBElement element = (JAXBElement) o;
+ if (type.isAssignableFrom(element.getDeclaredType())) {
+ result.add((T) element.getValue());
+ }
+ } else if (type.isAssignableFrom(o.getClass())) {
+ result.add((T) o);
+ }
+ }
+ return result;
+ }
+
+ protected <T> T fromDto(Object dto, T instance) {
+ Properties properties = new Properties();
+ IntrospectionSupport.getProperties(dto, properties, null);
+ plugin.placeHolderUtil.filter(properties);
+ LOG.trace("applying props: " + filterPasswords(properties) + ", to " + instance.getClass().getSimpleName());
+ IntrospectionSupport.setProperties(instance, properties);
+
+ // deal with nested elements
+ for (Object nested : filter(dto, Object.class)) {
+ String elementName = nested.getClass().getSimpleName();
+ Method setter = JAXBUtils.findSetter(instance, elementName);
+ if (setter != null) {
+ List<Object> argument = new LinkedList<Object>();
+ for (Object elementContent : filter(nested, Object.class)) {
+ argument.add(fromDto(elementContent, JAXBUtils.inferTargetObject(elementContent)));
+ }
+ try {
+ setter.invoke(instance, JAXBUtils.matchType(argument, setter.getParameterTypes()[0]));
+ } catch (Exception e) {
+ plugin.info("failed to invoke " + setter + " on " + instance, e);
+ }
+ } else {
+ plugin.info("failed to find setter for " + elementName + " on :" + instance);
+ }
+ }
+ return instance;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationInterceptorProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationInterceptorProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationInterceptorProcessor.java
new file mode 100644
index 0000000..b98b34e
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationInterceptorProcessor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.schema.core.DtoVirtualDestinationInterceptor;
+
+public class DestinationInterceptorProcessor extends DefaultConfigurationProcessor {
+
+ public DestinationInterceptorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ super(plugin, configurationClass);
+ }
+
+ @Override
+ public ConfigurationProcessor findProcessor(Object o) {
+ if (o instanceof DtoVirtualDestinationInterceptor) {
+ return new VirtualDestinationInterceptorProcessor(plugin, o.getClass());
+ }
+ return super.findProcessor(o);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationPolicyProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationPolicyProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationPolicyProcessor.java
new file mode 100644
index 0000000..3e59610
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationPolicyProcessor.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.schema.core.DtoPolicyEntry;
+import org.apache.activemq.schema.core.DtoPolicyMap;
+
+public class DestinationPolicyProcessor extends DefaultConfigurationProcessor {
+
+ public DestinationPolicyProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ super(plugin, configurationClass);
+ }
+
+ @Override
+ public ConfigurationProcessor findProcessor(Object o) {
+ if (o instanceof DtoPolicyEntry) {
+ return new PolicyEntryProcessor(plugin, o.getClass());
+ } else if (o instanceof DtoPolicyMap) {
+ return new PolicyMapProcessor(plugin, o.getClass());
+ }
+ return super.findProcessor(o);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationsProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationsProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationsProcessor.java
new file mode 100644
index 0000000..b0e737c
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/DestinationsProcessor.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+import org.apache.activemq.schema.core.DtoQueue;
+import org.apache.activemq.schema.core.DtoTopic;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class DestinationsProcessor extends DefaultConfigurationProcessor {
+
+ public DestinationsProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ super(plugin, configurationClass);
+ }
+
+ @Override
+ public void processChanges(List current, List modified) {
+ for (Object destinations : modified) {
+ for (Object dto : getContents(destinations)) {
+ try {
+ ActiveMQDestination destination = createDestination(dto);
+ if (!containsDestination(destination)) {
+ plugin.addDestination(plugin.getBrokerService().getAdminConnectionContext(), destination, true);
+ plugin.info("Added destination " + destination);
+ }
+ } catch (Exception e) {
+ plugin.info("Failed to add a new destination for DTO: " + dto, e);
+ }
+ }
+ }
+ }
+
+ protected boolean containsDestination(ActiveMQDestination destination) throws Exception {
+ return Arrays.asList(plugin.getBrokerService().getRegionBroker().getDestinations()).contains(destination);
+ }
+
+ @Override
+ public void addNew(Object o) {
+ try {
+ ActiveMQDestination destination = createDestination(o);
+ plugin.addDestination(plugin.getBrokerService().getAdminConnectionContext(), destination, true);
+ plugin.info("Added destination " + destination);
+ } catch (Exception e) {
+ plugin.info("Failed to add a new destination for DTO: " + o, e);
+ }
+ }
+
+ private ActiveMQDestination createDestination(Object dto) throws Exception {
+ if (dto instanceof DtoQueue) {
+ return new ActiveMQQueue(((DtoQueue) dto).getPhysicalName());
+ } else if (dto instanceof DtoTopic) {
+ return new ActiveMQTopic(((DtoTopic) dto).getPhysicalName());
+ } else {
+ throw new Exception("Unknown destination type for DTO " + dto);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/JAXBUtils.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/JAXBUtils.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/JAXBUtils.java
new file mode 100644
index 0000000..2d0d54d
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/JAXBUtils.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import javax.xml.bind.JAXBElement;
+import java.lang.reflect.Method;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.schema.core.DtoTopic;
+import org.apache.activemq.schema.core.DtoQueue;
+import org.apache.activemq.schema.core.DtoAuthenticationUser;
+import org.apache.activemq.security.AuthenticationUser;
+
+public class JAXBUtils {
+
+ public static Method findSetter(Object instance, String elementName) {
+ String setter = "set" + elementName;
+ for (Method m : instance.getClass().getMethods()) {
+ if (setter.equals(m.getName())) {
+ return m;
+ }
+ }
+ return null;
+ }
+
+ public static Object inferTargetObject(Object elementContent) {
+ if (DtoTopic.class.isAssignableFrom(elementContent.getClass())) {
+ return new ActiveMQTopic();
+ } else if (DtoQueue.class.isAssignableFrom(elementContent.getClass())) {
+ return new ActiveMQQueue();
+ } else if (DtoAuthenticationUser.class.isAssignableFrom(elementContent.getClass())) {
+ return new AuthenticationUser();
+ } else {
+ return new Object();
+ }
+ }
+
+ public static Object matchType(List<Object> parameterValues, Class<?> aClass) {
+ Object result = parameterValues;
+ if (Set.class.isAssignableFrom(aClass)) {
+ result = new HashSet(parameterValues);
+ }
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorProcessor.java
new file mode 100644
index 0000000..a761d5b
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorProcessor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.schema.core.DtoNetworkConnector;
+import org.apache.activemq.util.IntrospectionSupport;
+
+import java.util.TreeMap;
+
+public class NetworkConnectorProcessor extends DefaultConfigurationProcessor {
+
+ public NetworkConnectorProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ super(plugin, configurationClass);
+ }
+
+ @Override
+ public void addNew(Object o) {
+ DtoNetworkConnector networkConnector = (DtoNetworkConnector) o;
+ if (networkConnector.getUri() != null) {
+ try {
+ DiscoveryNetworkConnector nc = fromDto(networkConnector, new DiscoveryNetworkConnector());
+ plugin.getBrokerService().addNetworkConnector(nc);
+ nc.start();
+ plugin.info("started new network connector: " + nc);
+ } catch (Exception e) {
+ plugin.info("Failed to add new networkConnector " + networkConnector, e);
+ }
+ }
+ }
+
+ @Override
+ public void remove(Object o) {
+ DtoNetworkConnector toRemove = (DtoNetworkConnector) o;
+ for (NetworkConnector existingCandidate :
+ plugin.getBrokerService().getNetworkConnectors()) {
+ if (configMatch(toRemove, existingCandidate)) {
+ if (plugin.getBrokerService().removeNetworkConnector(existingCandidate)) {
+ try {
+ existingCandidate.stop();
+ plugin.info("stopped and removed networkConnector: " + existingCandidate);
+ } catch (Exception e) {
+ plugin.info("Failed to stop removed network connector: " + existingCandidate);
+ }
+ }
+ }
+ }
+ }
+
+ private boolean configMatch(DtoNetworkConnector dto, NetworkConnector candidate) {
+ TreeMap<String, String> dtoProps = new TreeMap<String, String>();
+ IntrospectionSupport.getProperties(dto, dtoProps, null);
+
+ TreeMap<String, String> candidateProps = new TreeMap<String, String>();
+ IntrospectionSupport.getProperties(candidate, candidateProps, null);
+
+ // every dto prop must be present in the candidate
+ for (String key : dtoProps.keySet()) {
+ if (!candidateProps.containsKey(key) || !candidateProps.get(key).equals(dtoProps.get(key))) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorsProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorsProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorsProcessor.java
new file mode 100644
index 0000000..ed429b4
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/NetworkConnectorsProcessor.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.schema.core.DtoNetworkConnector;
+
+public class NetworkConnectorsProcessor extends DefaultConfigurationProcessor {
+
+ public NetworkConnectorsProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ super(plugin, configurationClass);
+ }
+
+ @Override
+ public ConfigurationProcessor findProcessor(Object o) {
+ if (o instanceof DtoNetworkConnector) {
+ return new NetworkConnectorProcessor(plugin, o.getClass());
+ }
+ return super.findProcessor(o);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PluginsProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PluginsProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PluginsProcessor.java
new file mode 100644
index 0000000..180a8ce
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PluginsProcessor.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.schema.core.DtoSimpleAuthenticationPlugin;
+import org.apache.activemq.schema.core.DtoAuthorizationPlugin;
+
+public class PluginsProcessor extends DefaultConfigurationProcessor {
+
+ public PluginsProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ super(plugin, configurationClass);
+ }
+
+ @Override
+ public ConfigurationProcessor findProcessor(Object o) {
+ if (o instanceof DtoSimpleAuthenticationPlugin) {
+ return new SimpleAuthenticationPluginProcessor(plugin, o.getClass());
+ } else if (o instanceof DtoAuthorizationPlugin) {
+ return new AuthorizationPluginProcessor(plugin, o.getClass());
+ }
+ return super.findProcessor(o);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyEntryProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyEntryProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyEntryProcessor.java
new file mode 100644
index 0000000..4d0dc42
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyEntryProcessor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.broker.region.*;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+
+import java.util.Set;
+
+public class PolicyEntryProcessor extends DefaultConfigurationProcessor {
+
+ public PolicyEntryProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ super(plugin, configurationClass);
+ }
+
+ @Override
+ public void addNew(Object o) {
+ PolicyEntry addition = fromDto(o, new PolicyEntry());
+ PolicyMap existingMap = plugin.getBrokerService().getDestinationPolicy();
+ existingMap.put(addition.getDestination(), addition);
+ applyRetrospectively(addition);
+ plugin.info("added policy for: " + addition.getDestination());
+ }
+
+ @Override
+ public void modify(Object existing, Object candidate) {
+ PolicyMap existingMap = plugin.getBrokerService().getDestinationPolicy();
+
+ PolicyEntry updatedEntry = fromDto(candidate, new PolicyEntry());
+
+ Set existingEntry = existingMap.get(updatedEntry.getDestination());
+ if (existingEntry.size() == 1) {
+ updatedEntry = fromDto(candidate, (PolicyEntry) existingEntry.iterator().next());
+ applyRetrospectively(updatedEntry);
+ plugin.info("updated policy for: " + updatedEntry.getDestination());
+ } else {
+ plugin.info("cannot modify policy matching multiple destinations: " + existingEntry + ", destination:" + updatedEntry.getDestination());
+ }
+ }
+
+ protected void applyRetrospectively(PolicyEntry updatedEntry) {
+ RegionBroker regionBroker = (RegionBroker) plugin.getBrokerService().getRegionBroker();
+ for (Destination destination : regionBroker.getDestinations(updatedEntry.getDestination())) {
+ Destination target = destination;
+ if (destination instanceof DestinationFilter) {
+ target = ((DestinationFilter)destination).getNext();
+ }
+ if (target.getActiveMQDestination().isQueue()) {
+ updatedEntry.update((Queue) target);
+ } else if (target.getActiveMQDestination().isTopic()) {
+ updatedEntry.update((Topic) target);
+ }
+ plugin.debug("applied update to:" + target);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyMapProcessor.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyMapProcessor.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyMapProcessor.java
new file mode 100644
index 0000000..150542b
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PolicyMapProcessor.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import java.util.List;
+import org.apache.activemq.schema.core.DtoPolicyMap;
+import org.apache.activemq.schema.core.DtoPolicyEntry;
+
+public class PolicyMapProcessor extends DefaultConfigurationProcessor {
+
+ public PolicyMapProcessor(RuntimeConfigurationBroker plugin, Class configurationClass) {
+ super(plugin, configurationClass);
+ }
+
+ @Override
+ public void modify(Object existing, Object candidate) {
+ List<Object> existingEntries = filter(existing, DtoPolicyMap.PolicyEntries.class);
+ List<Object> candidateEntries = filter(candidate, DtoPolicyMap.PolicyEntries.class);
+ // walk the map for mods
+ applyModifications(getContents(existingEntries.get(0)), getContents(candidateEntries.get(0)));
+ }
+
+ @Override
+ public ConfigurationProcessor findProcessor(Object o) {
+ if (o instanceof DtoPolicyEntry) {
+ return new PolicyEntryProcessor(plugin, o.getClass());
+ }
+ return super.findProcessor(o);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/ProcessorFactory.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/ProcessorFactory.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/ProcessorFactory.java
new file mode 100644
index 0000000..e034cd9
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/ProcessorFactory.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.schema.core.DtoBroker;
+
+public class ProcessorFactory {
+
+ public static ConfigurationProcessor createProcessor(RuntimeConfigurationBroker plugin, Class dtoClass) {
+ if (dtoClass.equals(DtoBroker.Plugins.class)) {
+ return new PluginsProcessor(plugin, dtoClass);
+ } else if (dtoClass.equals(DtoBroker.NetworkConnectors.class)) {
+ return new NetworkConnectorsProcessor(plugin, dtoClass);
+ } else if (dtoClass.equals(DtoBroker.DestinationPolicy.class)) {
+ return new DestinationPolicyProcessor(plugin, dtoClass);
+ } else if (dtoClass.equals(DtoBroker.DestinationInterceptors.class)) {
+ return new DestinationInterceptorProcessor(plugin, dtoClass);
+ } else if (dtoClass.equals(DtoBroker.Destinations.class)) {
+ return new DestinationsProcessor(plugin, dtoClass);
+ } else {
+ return new DefaultConfigurationProcessor(plugin, dtoClass);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/bbc039fc/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PropertiesPlaceHolderUtil.java
----------------------------------------------------------------------
diff --git a/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PropertiesPlaceHolderUtil.java b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PropertiesPlaceHolderUtil.java
new file mode 100644
index 0000000..fdfe006
--- /dev/null
+++ b/activemq-runtime-config/src/main/java/org/apache/activemq/plugin/PropertiesPlaceHolderUtil.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.plugin;
+
+import org.apache.activemq.broker.BrokerContext;
+import org.apache.activemq.spring.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.FactoryBean;
+import org.springframework.core.io.Resource;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class PropertiesPlaceHolderUtil {
+
+ public static final Logger LOG = LoggerFactory.getLogger(PropertiesPlaceHolderUtil.class);
+
+ static final Pattern pattern = Pattern.compile("\\$\\{([^\\}]+)\\}");
+ final Properties properties;
+
+ public PropertiesPlaceHolderUtil(Properties properties) {
+ this.properties = properties;
+ }
+
+ public void filter(Properties toFilter) {
+ for (Map.Entry<Object, Object> entry : toFilter.entrySet()) {
+ String val = (String) entry.getValue();
+ String newVal = filter(val);
+ if (!val.equals(newVal)) {
+ toFilter.put(entry.getKey(), newVal);
+ }
+ }
+ }
+
+ public String filter(String str) {
+ int start = 0;
+ while (true) {
+ Matcher matcher = pattern.matcher(str);
+ if (!matcher.find(start)) {
+ break;
+ }
+ String group = matcher.group(1);
+ String property = properties.getProperty(group);
+ if (property != null) {
+ str = matcher.replaceFirst(Matcher.quoteReplacement(property));
+ } else {
+ start = matcher.end();
+ }
+ }
+ return replaceBytePostfix(str);
+ }
+
+ static Pattern[] byteMatchers = new Pattern[] {
+ Pattern.compile("^\\s*(\\d+)\\s*(b)?\\s*$", Pattern.CASE_INSENSITIVE),
+ Pattern.compile("^\\s*(\\d+)\\s*k(b)?\\s*$", Pattern.CASE_INSENSITIVE),
+ Pattern.compile("^\\s*(\\d+)\\s*m(b)?\\s*$", Pattern.CASE_INSENSITIVE),
+ Pattern.compile("^\\s*(\\d+)\\s*g(b)?\\s*$", Pattern.CASE_INSENSITIVE)};
+
+ // xbean can Xb, Xkb, Xmb, Xg etc
+ private String replaceBytePostfix(String str) {
+ try {
+ for (int i=0; i< byteMatchers.length; i++) {
+ Matcher matcher = byteMatchers[i].matcher(str);
+ if (matcher.matches()) {
+ long value = Long.parseLong(matcher.group(1));
+ for (int j=1; j<=i; j++) {
+ value *= 1024;
+ }
+ return String.valueOf(value);
+ }
+ }
+ } catch (NumberFormatException ignored) {
+ LOG.debug("nfe on: " + str, ignored);
+ }
+ return str;
+ }
+
+ public void mergeProperties(Document doc, Properties initialProperties, BrokerContext brokerContext) {
+ // find resources
+ // <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ // <property name="locations" || name="properties">
+ // ...
+ // </property>
+ // </bean>
+ LinkedList<String> resources = new LinkedList<String>();
+ LinkedList<String> propertiesClazzes = new LinkedList<String>();
+ NodeList beans = doc.getElementsByTagNameNS("*", "bean");
+ for (int i = 0; i < beans.getLength(); i++) {
+ Node bean = beans.item(0);
+ if (bean.hasAttributes() && bean.getAttributes().getNamedItem("class").getTextContent().contains("PropertyPlaceholderConfigurer")) {
+ if (bean.hasChildNodes()) {
+ NodeList beanProps = bean.getChildNodes();
+ for (int j = 0; j < beanProps.getLength(); j++) {
+ Node beanProp = beanProps.item(j);
+ if (Node.ELEMENT_NODE == beanProp.getNodeType() && beanProp.hasAttributes() && beanProp.getAttributes().getNamedItem("name") != null) {
+ String propertyName = beanProp.getAttributes().getNamedItem("name").getTextContent();
+ if ("locations".equals(propertyName)) {
+
+ // interested in value or list/value of locations property
+ Element beanPropElement = (Element) beanProp;
+ NodeList values = beanPropElement.getElementsByTagNameNS("*", "value");
+ for (int k = 0; k < values.getLength(); k++) {
+ Node value = values.item(k);
+ resources.add(value.getFirstChild().getTextContent());
+ }
+ } else if ("properties".equals(propertyName)) {
+
+ // bean or beanFactory
+ Element beanPropElement = (Element) beanProp;
+ NodeList values = beanPropElement.getElementsByTagNameNS("*", "bean");
+ for (int k = 0; k < values.getLength(); k++) {
+ Node value = values.item(k);
+ if (value.hasAttributes()) {
+ Node beanClassTypeNode = value.getAttributes().getNamedItem("class");
+ if (beanClassTypeNode != null) {
+ propertiesClazzes.add(beanClassTypeNode.getFirstChild().getTextContent());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ for (String value : propertiesClazzes) {
+ try {
+ Object springBean = getClass().getClassLoader().loadClass(value).newInstance();
+ if (springBean instanceof FactoryBean) {
+ // can't access the factory or created properties from spring context so we got to recreate
+ initialProperties.putAll((Properties) FactoryBean.class.getMethod("getObject", (Class<?>[]) null).invoke(springBean));
+ }
+ } catch (Throwable e) {
+ LOG.debug("unexpected exception processing properties bean class: " + propertiesClazzes, e);
+ }
+ }
+ List<Resource> propResources = new LinkedList<Resource>();
+ for (String value : resources) {
+ try {
+ if (!value.isEmpty()) {
+ propResources.add(Utils.resourceFromString(filter(value)));
+ }
+ } catch (MalformedURLException e) {
+ LOG.info("failed to resolve resource: " + value, e);
+ }
+ }
+ for (Resource resource : propResources) {
+ Properties properties = new Properties();
+ try {
+ properties.load(resource.getInputStream());
+ } catch (IOException e) {
+ LOG.info("failed to load properties resource: " + resource, e);
+ }
+ initialProperties.putAll(properties);
+ }
+ }
+
+}