You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/07/09 16:26:27 UTC
[1/2] activemq-artemis git commit: ARTEMIS-149 Advisory Message
Support Adding functions to send advisory messages.
Repository: activemq-artemis
Updated Branches:
refs/heads/master 17cc62bca -> 2a4e9f191
ARTEMIS-149 Advisory Message Support
Adding functions to send advisory messages.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7cf58b1e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7cf58b1e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7cf58b1e
Branch: refs/heads/master
Commit: 7cf58b1e885ce449d1ddefd04542eff702fd3a2b
Parents: 17cc62b
Author: Howard Gao <hg...@redhat.com>
Authored: Thu Jul 9 10:20:26 2015 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jul 9 10:23:24 2015 -0400
----------------------------------------------------------------------
.../openwire/OpenWireProtocolManager.java | 63 +++++++++++++++-
.../core/protocol/openwire/amq/AMQConsumer.java | 10 +++
.../core/protocol/openwire/amq/AMQSession.java | 5 ++
.../apache/activemq/broker/BrokerService.java | 77 ++-----------------
.../artemiswrapper/ArtemisBrokerWrapper.java | 68 +++++++++++++++--
.../broker/region/policy/PolicyMap.java | 79 ++++++++++++++++++++
6 files changed, 224 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 8c1cbe7..a34168c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -34,6 +34,12 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.server.management.Notification;
+import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@@ -84,7 +90,7 @@ import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.LongSequenceGenerator;
-public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
+public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener
{
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
private static final IdGenerator ID_GENERATOR = new IdGenerator();
@@ -121,6 +127,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<TransactionId, AMQSession>();
+ private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<String, SessionId>();
+
public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server)
{
this.factory = factory;
@@ -130,6 +138,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
wireFactory.setCacheEnabled(false);
brokerState = new BrokerState();
advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
+ ManagementService service = server.getManagementService();
+ if (service != null)
+ {
+ service.addNotificationListener(this);
+ }
}
@@ -603,6 +616,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
amqSession.initialize();
amqSession.setInternal(internal);
sessions.put(ss.getSessionId(), amqSession);
+ sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
return amqSession;
}
@@ -783,4 +797,51 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
{
transactions.put(txId, amqSession);
}
+
+ //advisory support
+ @Override
+ public void onNotification(Notification notif)
+ {
+ try
+ {
+ if (notif.getType() instanceof CoreNotificationType)
+ {
+ CoreNotificationType type = (CoreNotificationType)notif.getType();
+ switch (type)
+ {
+ case CONSUMER_SLOW:
+ fireSlowConsumer(notif);
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ catch (Exception e)
+ {
+ ActiveMQServerLogger.LOGGER.error("Failed to send notification " + notif, e);
+ }
+ }
+
+ private void fireSlowConsumer(Notification notif) throws Exception
+ {
+ SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME);
+ Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME);
+ SessionId sessionId = sessionIdMap.get(coreSessionId.toString());
+ AMQSession session = sessions.get(sessionId);
+ AMQConsumer consumer = session.getConsumer(coreConsumerId);
+ ActiveMQDestination destination = consumer.getDestination();
+
+ if (!AdvisorySupport.isAdvisoryTopic(destination))
+ {
+ ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
+ ConnectionId connId = sessionId.getParentId();
+ AMQTransportConnectionState cc = (AMQTransportConnectionState)this.brokerConnectionStates.get(connId);
+ OpenWireConnection conn = cc.getConnection();
+ ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+ advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
+
+ fireAdvisory(conn.getConext(), topic, advisoryMessage, consumer.getId());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 523a9fb..1292aee 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -390,4 +390,14 @@ public class AMQConsumer implements BrowserListener
session.getCoreSession().acknowledge(nativeId, lastMi.nativeId);
}
}
+
+ public org.apache.activemq.command.ActiveMQDestination getDestination()
+ {
+ return actualDest;
+ }
+
+ public ConsumerInfo getInfo()
+ {
+ return info;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 3b166f0..4f951fe 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -506,6 +506,11 @@ public class AMQSession implements SessionCallback
this.coreSession.close(false);
}
+ public AMQConsumer getConsumer(Long coreConsumerId)
+ {
+ return consumers.get(coreConsumerId);
+ }
+
private class SendRetryTask implements Runnable
{
private ServerMessage coreMsg;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
index dd34769..e9d84ef 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -22,110 +22,41 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.security.Provider;
-import java.security.Security;
import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
+
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
-import org.apache.activemq.ConfigurationException;
import org.apache.activemq.Service;
-import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
-import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
-import org.apache.activemq.broker.jmx.AnnotatedMBean;
-import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.jmx.ConnectorView;
-import org.apache.activemq.broker.jmx.ConnectorViewMBean;
-import org.apache.activemq.broker.jmx.HealthView;
-import org.apache.activemq.broker.jmx.HealthViewMBean;
-import org.apache.activemq.broker.jmx.JmsConnectorView;
-import org.apache.activemq.broker.jmx.JobSchedulerView;
-import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
-import org.apache.activemq.broker.jmx.Log4JConfigView;
-import org.apache.activemq.broker.jmx.ManagedRegionBroker;
import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.jmx.NetworkConnectorView;
-import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
-import org.apache.activemq.broker.jmx.ProxyConnectorView;
-import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFactory;
-import org.apache.activemq.broker.region.DestinationFactoryImpl;
import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.MirroredQueue;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.broker.scheduler.SchedulerBroker;
-import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.filter.DestinationFilter;
-import org.apache.activemq.network.ConnectionFilter;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.network.jms.JmsConnector;
-import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.proxy.ProxyConnector;
import org.apache.activemq.security.MessageAuthorizationPolicy;
-import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.store.JournaledStore;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterFactory;
-import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
-import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.TransportFactorySupport;
import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.BrokerSupport;
-import org.apache.activemq.util.DefaultIOExceptionHandler;
import org.apache.activemq.util.IOExceptionHandler;
-import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ThreadPoolUtils;
-import org.apache.activemq.util.TimeUtils;
-import org.apache.activemq.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
/**
* Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
@@ -161,6 +92,8 @@ public class BrokerService implements Service
public Set<Integer> extraConnectors = new HashSet<Integer>();
private File dataDirectoryFile;
+ private PolicyMap destinationPolicy;
+
static
{
InputStream in;
@@ -337,7 +270,6 @@ public class BrokerService implements Service
public void makeSureDestinationExists(ActiveMQDestination activemqDestination) throws Exception
{
- System.out.println(">>>> making sure dest exits: " + activemqDestination);
ArtemisBrokerWrapper hqBroker = (ArtemisBrokerWrapper) this.broker;
//it can be null
if (activemqDestination == null)
@@ -404,6 +336,7 @@ public class BrokerService implements Service
public void setDestinationPolicy(PolicyMap policyMap)
{
+ this.destinationPolicy = policyMap;
}
public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup)
@@ -607,7 +540,7 @@ public class BrokerService implements Service
public PolicyMap getDestinationPolicy()
{
- return null;
+ return this.destinationPolicy;
}
public void setTransportConnectorURIs(String[] transportConnectorURIs)
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index 822faed..ced7857 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -20,6 +20,7 @@ import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -32,10 +33,14 @@ import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
public class ArtemisBrokerWrapper extends ArtemisBrokerBase
{
@@ -62,14 +67,26 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
Configuration serverConfig = server.getConfiguration();
Set<TransportConfiguration> acceptors0 = serverConfig.getAcceptorConfigurations();
- Iterator<TransportConfiguration> iter0 = acceptors0.iterator();
- Map<String, AddressSettings> addressSettings = serverConfig.getAddressesSettings();
+ Map<String, AddressSettings> addressSettingsMap = serverConfig.getAddressesSettings();
+
+ //do policy translation
+ PolicyMap policyMap = this.bservice.getDestinationPolicy();
+
+ if (policyMap != null)
+ {
+ translatePolicyMap(serverConfig, policyMap);
+ }
+
String match = "jms.queue.#";
- AddressSettings dlaSettings = new AddressSettings();
+ AddressSettings commonSettings = addressSettingsMap.get(match);
+ if (commonSettings == null)
+ {
+ commonSettings = new AddressSettings();
+ addressSettingsMap.put(match, commonSettings);
+ }
SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
- dlaSettings.setDeadLetterAddress(dla);
- addressSettings.put(match, dlaSettings);
+ commonSettings.setDeadLetterAddress(dla);
serverConfig.getAcceptorConfigurations().add(transportConfiguration);
if (this.bservice.enableSsl())
@@ -177,6 +194,47 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
}
+ private void translatePolicyMap(Configuration serverConfig, PolicyMap policyMap)
+ {
+ List allEntries = policyMap.getAllEntries();
+ for (Object o : allEntries)
+ {
+ PolicyEntry entry = (PolicyEntry)o;
+ org.apache.activemq.command.ActiveMQDestination targetDest = entry.getDestination();
+ String match = getCorePattern(targetDest);
+ Map<String, AddressSettings> settingsMap = serverConfig.getAddressesSettings();
+ AddressSettings settings = settingsMap.get(match);
+ if (settings == null)
+ {
+ settings = new AddressSettings();
+ settingsMap.put(match, settings);
+ }
+
+ if (entry.isAdvisoryForSlowConsumers())
+ {
+ settings.setSlowConsumerThreshold(1000);
+ settings.setSlowConsumerCheckPeriod(1);
+ settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
+ }
+ }
+ }
+
+ private String getCorePattern(org.apache.activemq.command.ActiveMQDestination dest)
+ {
+ String physicalName = dest.getPhysicalName();
+ String pattern = physicalName.replace(">", "#");
+ if (dest.isTopic())
+ {
+ pattern = "jms.topic." + pattern;
+ }
+ else
+ {
+ pattern = "jms.queue." + pattern;
+ }
+
+ return pattern;
+ }
+
@Override
public void stop() throws Exception
{
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/PolicyMap.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/PolicyMap.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/PolicyMap.java
new file mode 100644
index 0000000..e48f2d9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/PolicyMap.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.filter.DestinationMap;
+import org.apache.activemq.filter.DestinationMapEntry;
+
+/**
+ * Represents a destination based configuration of policies so that individual
+ * destinations or wildcard hierarchies of destinations can be configured using
+ * different policies.
+ *
+ * @org.apache.xbean.XBean
+ */
+public class PolicyMap extends DestinationMap
+{
+
+ private PolicyEntry defaultEntry;
+ private List allEntries = new ArrayList();
+
+ public PolicyEntry getEntryFor(ActiveMQDestination destination)
+ {
+ PolicyEntry answer = (PolicyEntry) chooseValue(destination);
+ if (answer == null)
+ {
+ answer = getDefaultEntry();
+ }
+ return answer;
+ }
+
+ /**
+ * Sets the individual entries on the policy map
+ *
+ * @org.apache.xbean.ElementType class="org.apache.activemq.broker.region.policy.PolicyEntry"
+ */
+ public void setPolicyEntries(List entries)
+ {
+ super.setEntries(entries);
+ allEntries.addAll(entries);
+ }
+
+ public List getAllEntries()
+ {
+ return allEntries;
+ }
+
+ public PolicyEntry getDefaultEntry()
+ {
+ return defaultEntry;
+ }
+
+ public void setDefaultEntry(PolicyEntry defaultEntry)
+ {
+ this.defaultEntry = defaultEntry;
+ }
+
+ protected Class<? extends DestinationMapEntry> getEntryClass()
+ {
+ return PolicyEntry.class;
+ }
+}
[2/2] activemq-artemis git commit: This closes #64 Open Wire Advisory
Support
Posted by cl...@apache.org.
This closes #64 Open Wire Advisory Support
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2a4e9f19
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2a4e9f19
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2a4e9f19
Branch: refs/heads/master
Commit: 2a4e9f191a3ef32f6ed40ccfba66fa6f922c13e6
Parents: 17cc62b 7cf58b1
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jul 9 10:23:25 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jul 9 10:23:25 2015 -0400
----------------------------------------------------------------------
.../openwire/OpenWireProtocolManager.java | 63 +++++++++++++++-
.../core/protocol/openwire/amq/AMQConsumer.java | 10 +++
.../core/protocol/openwire/amq/AMQSession.java | 5 ++
.../apache/activemq/broker/BrokerService.java | 77 ++-----------------
.../artemiswrapper/ArtemisBrokerWrapper.java | 68 +++++++++++++++--
.../broker/region/policy/PolicyMap.java | 79 ++++++++++++++++++++
6 files changed, 224 insertions(+), 78 deletions(-)
----------------------------------------------------------------------