You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/07/09 16:26:27 UTC

[1/2] activemq-artemis git commit: ARTEMIS-149 Advisory Message Support Adding functions to send advisory messages.

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 17cc62bca -> 2a4e9f191


ARTEMIS-149 Advisory Message Support
  Adding functions to send advisory messages.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7cf58b1e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7cf58b1e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7cf58b1e

Branch: refs/heads/master
Commit: 7cf58b1e885ce449d1ddefd04542eff702fd3a2b
Parents: 17cc62b
Author: Howard Gao <hg...@redhat.com>
Authored: Thu Jul 9 10:20:26 2015 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jul 9 10:23:24 2015 -0400

----------------------------------------------------------------------
 .../openwire/OpenWireProtocolManager.java       | 63 +++++++++++++++-
 .../core/protocol/openwire/amq/AMQConsumer.java | 10 +++
 .../core/protocol/openwire/amq/AMQSession.java  |  5 ++
 .../apache/activemq/broker/BrokerService.java   | 77 ++-----------------
 .../artemiswrapper/ArtemisBrokerWrapper.java    | 68 +++++++++++++++--
 .../broker/region/policy/PolicyMap.java         | 79 ++++++++++++++++++++
 6 files changed, 224 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 8c1cbe7..a34168c 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -34,6 +34,12 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.api.core.Interceptor;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.server.management.Notification;
+import org.apache.activemq.artemis.core.server.management.NotificationListener;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -84,7 +90,7 @@ import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.InetAddressUtil;
 import org.apache.activemq.util.LongSequenceGenerator;
 
-public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
+public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, NotificationListener
 {
    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
    private static final IdGenerator ID_GENERATOR = new IdGenerator();
@@ -121,6 +127,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
 
    private Map<TransactionId, AMQSession> transactions = new ConcurrentHashMap<TransactionId, AMQSession>();
 
+   private Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<String, SessionId>();
+
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server)
    {
       this.factory = factory;
@@ -130,6 +138,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
       wireFactory.setCacheEnabled(false);
       brokerState = new BrokerState();
       advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
+      ManagementService service = server.getManagementService();
+      if (service != null)
+      {
+         service.addNotificationListener(this);
+      }
    }
 
 
@@ -603,6 +616,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
       amqSession.initialize();
       amqSession.setInternal(internal);
       sessions.put(ss.getSessionId(), amqSession);
+      sessionIdMap.put(amqSession.getCoreSession().getName(), ss.getSessionId());
       return amqSession;
    }
 
@@ -783,4 +797,51 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
    {
       transactions.put(txId, amqSession);
    }
+
+   //advisory support
+   @Override
+   public void onNotification(Notification notif)
+   {
+      try
+      {
+         if (notif.getType() instanceof CoreNotificationType)
+         {
+            CoreNotificationType type = (CoreNotificationType)notif.getType();
+            switch (type)
+            {
+               case CONSUMER_SLOW:
+                  fireSlowConsumer(notif);
+                  break;
+               default:
+                  break;
+            }
+         }
+      }
+      catch (Exception e)
+      {
+         ActiveMQServerLogger.LOGGER.error("Failed to send notification " + notif, e);
+      }
+   }
+
+   private void fireSlowConsumer(Notification notif) throws Exception
+   {
+      SimpleString coreSessionId = notif.getProperties().getSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME);
+      Long coreConsumerId = notif.getProperties().getLongProperty(ManagementHelper.HDR_CONSUMER_NAME);
+      SessionId sessionId = sessionIdMap.get(coreSessionId.toString());
+      AMQSession session = sessions.get(sessionId);
+      AMQConsumer consumer = session.getConsumer(coreConsumerId);
+      ActiveMQDestination destination = consumer.getDestination();
+
+      if (!AdvisorySupport.isAdvisoryTopic(destination))
+      {
+         ActiveMQTopic topic = AdvisorySupport.getSlowConsumerAdvisoryTopic(destination);
+         ConnectionId connId = sessionId.getParentId();
+         AMQTransportConnectionState cc = (AMQTransportConnectionState)this.brokerConnectionStates.get(connId);
+         OpenWireConnection conn = cc.getConnection();
+         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
+         advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_CONSUMER_ID, consumer.getId().toString());
+
+         fireAdvisory(conn.getConext(), topic, advisoryMessage, consumer.getId());
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 523a9fb..1292aee 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -390,4 +390,14 @@ public class AMQConsumer implements BrowserListener
          session.getCoreSession().acknowledge(nativeId, lastMi.nativeId);
       }
    }
+
+   public org.apache.activemq.command.ActiveMQDestination getDestination()
+   {
+      return actualDest;
+   }
+
+   public ConsumerInfo getInfo()
+   {
+      return info;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 3b166f0..4f951fe 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -506,6 +506,11 @@ public class AMQSession implements SessionCallback
       this.coreSession.close(false);
    }
 
+   public AMQConsumer getConsumer(Long coreConsumerId)
+   {
+      return consumers.get(coreConsumerId);
+   }
+
    private class SendRetryTask implements Runnable
    {
       private ServerMessage coreMsg;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
index dd34769..e9d84ef 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -22,110 +22,41 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.security.Provider;
-import java.security.Security;
 import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
+
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 
 import org.apache.activemq.ActiveMQConnectionMetaData;
-import org.apache.activemq.ConfigurationException;
 import org.apache.activemq.Service;
-import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
-import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
-import org.apache.activemq.broker.jmx.AnnotatedMBean;
-import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
 import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.jmx.ConnectorView;
-import org.apache.activemq.broker.jmx.ConnectorViewMBean;
-import org.apache.activemq.broker.jmx.HealthView;
-import org.apache.activemq.broker.jmx.HealthViewMBean;
-import org.apache.activemq.broker.jmx.JmsConnectorView;
-import org.apache.activemq.broker.jmx.JobSchedulerView;
-import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
-import org.apache.activemq.broker.jmx.Log4JConfigView;
-import org.apache.activemq.broker.jmx.ManagedRegionBroker;
 import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.jmx.NetworkConnectorView;
-import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
-import org.apache.activemq.broker.jmx.ProxyConnectorView;
-import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
 import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFactory;
-import org.apache.activemq.broker.region.DestinationFactoryImpl;
 import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.MirroredQueue;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualTopic;
 import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.broker.scheduler.SchedulerBroker;
-import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.filter.DestinationFilter;
-import org.apache.activemq.network.ConnectionFilter;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
 import org.apache.activemq.network.NetworkConnector;
 import org.apache.activemq.network.jms.JmsConnector;
-import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.proxy.ProxyConnector;
 import org.apache.activemq.security.MessageAuthorizationPolicy;
-import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.store.JournaledStore;
 import org.apache.activemq.store.PListStore;
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.PersistenceAdapterFactory;
-import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
-import org.apache.activemq.thread.Scheduler;
 import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.TransportFactorySupport;
 import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.vm.VMTransportFactory;
 import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.BrokerSupport;
-import org.apache.activemq.util.DefaultIOExceptionHandler;
 import org.apache.activemq.util.IOExceptionHandler;
-import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.InetAddressUtil;
 import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.ThreadPoolUtils;
-import org.apache.activemq.util.TimeUtils;
-import org.apache.activemq.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
 /**
  * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
@@ -161,6 +92,8 @@ public class BrokerService implements Service
    public Set<Integer> extraConnectors = new HashSet<Integer>();
    private File dataDirectoryFile;
 
+   private PolicyMap destinationPolicy;
+
    static
    {
       InputStream in;
@@ -337,7 +270,6 @@ public class BrokerService implements Service
 
    public void makeSureDestinationExists(ActiveMQDestination activemqDestination) throws Exception
    {
-      System.out.println(">>>> making sure dest exits: " + activemqDestination);
       ArtemisBrokerWrapper hqBroker = (ArtemisBrokerWrapper) this.broker;
       //it can be null
       if (activemqDestination == null)
@@ -404,6 +336,7 @@ public class BrokerService implements Service
 
    public void setDestinationPolicy(PolicyMap policyMap)
    {
+      this.destinationPolicy = policyMap;
    }
 
    public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup)
@@ -607,7 +540,7 @@ public class BrokerService implements Service
 
    public PolicyMap getDestinationPolicy()
    {
-      return null;
+      return this.destinationPolicy;
    }
 
    public void setTransportConnectorURIs(String[] transportConnectorURIs)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index 822faed..ced7857 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -20,6 +20,7 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -32,10 +33,14 @@ import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
 import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
 
 public class ArtemisBrokerWrapper extends ArtemisBrokerBase
 {
@@ -62,14 +67,26 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
       Configuration serverConfig = server.getConfiguration();
 
       Set<TransportConfiguration> acceptors0 = serverConfig.getAcceptorConfigurations();
-      Iterator<TransportConfiguration> iter0 = acceptors0.iterator();
 
-      Map<String, AddressSettings> addressSettings = serverConfig.getAddressesSettings();
+      Map<String, AddressSettings> addressSettingsMap = serverConfig.getAddressesSettings();
+
+      //do policy translation
+      PolicyMap policyMap = this.bservice.getDestinationPolicy();
+
+      if (policyMap != null)
+      {
+         translatePolicyMap(serverConfig, policyMap);
+      }
+
       String match = "jms.queue.#";
-      AddressSettings dlaSettings = new AddressSettings();
+      AddressSettings commonSettings = addressSettingsMap.get(match);
+      if (commonSettings == null)
+      {
+         commonSettings = new AddressSettings();
+         addressSettingsMap.put(match, commonSettings);
+      }
       SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
-      dlaSettings.setDeadLetterAddress(dla);
-      addressSettings.put(match, dlaSettings);
+      commonSettings.setDeadLetterAddress(dla);
 
       serverConfig.getAcceptorConfigurations().add(transportConfiguration);
       if (this.bservice.enableSsl())
@@ -177,6 +194,47 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase
 
    }
 
+   private void translatePolicyMap(Configuration serverConfig, PolicyMap policyMap)
+   {
+      List allEntries = policyMap.getAllEntries();
+      for (Object o : allEntries)
+      {
+         PolicyEntry entry = (PolicyEntry)o;
+         org.apache.activemq.command.ActiveMQDestination targetDest = entry.getDestination();
+         String match = getCorePattern(targetDest);
+         Map<String, AddressSettings> settingsMap = serverConfig.getAddressesSettings();
+         AddressSettings settings = settingsMap.get(match);
+         if (settings == null)
+         {
+            settings = new AddressSettings();
+            settingsMap.put(match, settings);
+         }
+
+         if (entry.isAdvisoryForSlowConsumers())
+         {
+            settings.setSlowConsumerThreshold(1000);
+            settings.setSlowConsumerCheckPeriod(1);
+            settings.setSlowConsumerPolicy(SlowConsumerPolicy.NOTIFY);
+         }
+      }
+   }
+
+   private String getCorePattern(org.apache.activemq.command.ActiveMQDestination dest)
+   {
+      String physicalName = dest.getPhysicalName();
+      String pattern = physicalName.replace(">", "#");
+      if (dest.isTopic())
+      {
+         pattern = "jms.topic." + pattern;
+      }
+      else
+      {
+         pattern = "jms.queue." + pattern;
+      }
+
+      return pattern;
+   }
+
    @Override
    public void stop() throws Exception
    {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7cf58b1e/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/PolicyMap.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/PolicyMap.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/PolicyMap.java
new file mode 100644
index 0000000..e48f2d9
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/region/policy/PolicyMap.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.filter.DestinationMap;
+import org.apache.activemq.filter.DestinationMapEntry;
+
+/**
+ * Represents a destination based configuration of policies so that individual
+ * destinations or wildcard hierarchies of destinations can be configured using
+ * different policies.
+ *
+ * @org.apache.xbean.XBean
+ */
+public class PolicyMap extends DestinationMap
+{
+
+   private PolicyEntry defaultEntry;
+   private List allEntries = new ArrayList();
+
+   public PolicyEntry getEntryFor(ActiveMQDestination destination)
+   {
+      PolicyEntry answer = (PolicyEntry) chooseValue(destination);
+      if (answer == null)
+      {
+         answer = getDefaultEntry();
+      }
+      return answer;
+   }
+
+   /**
+    * Sets the individual entries on the policy map
+    *
+    * @org.apache.xbean.ElementType class="org.apache.activemq.broker.region.policy.PolicyEntry"
+    */
+   public void setPolicyEntries(List entries)
+   {
+      super.setEntries(entries);
+      allEntries.addAll(entries);
+   }
+
+   public List getAllEntries()
+   {
+      return allEntries;
+   }
+
+   public PolicyEntry getDefaultEntry()
+   {
+      return defaultEntry;
+   }
+
+   public void setDefaultEntry(PolicyEntry defaultEntry)
+   {
+      this.defaultEntry = defaultEntry;
+   }
+
+   protected Class<? extends DestinationMapEntry> getEntryClass()
+   {
+      return PolicyEntry.class;
+   }
+}


[2/2] activemq-artemis git commit: This closes #64 Open Wire Advisory Support

Posted by cl...@apache.org.
This closes #64 Open Wire Advisory Support


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2a4e9f19
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2a4e9f19
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2a4e9f19

Branch: refs/heads/master
Commit: 2a4e9f191a3ef32f6ed40ccfba66fa6f922c13e6
Parents: 17cc62b 7cf58b1
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Jul 9 10:23:25 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Jul 9 10:23:25 2015 -0400

----------------------------------------------------------------------
 .../openwire/OpenWireProtocolManager.java       | 63 +++++++++++++++-
 .../core/protocol/openwire/amq/AMQConsumer.java | 10 +++
 .../core/protocol/openwire/amq/AMQSession.java  |  5 ++
 .../apache/activemq/broker/BrokerService.java   | 77 ++-----------------
 .../artemiswrapper/ArtemisBrokerWrapper.java    | 68 +++++++++++++++--
 .../broker/region/policy/PolicyMap.java         | 79 ++++++++++++++++++++
 6 files changed, 224 insertions(+), 78 deletions(-)
----------------------------------------------------------------------