You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/10 18:57:10 UTC

svn commit: r564679 [2/8] - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/ activemq-core/src/main/java/org/apache/activemq/broker/ activemq-core/src/main/java/org/apache/activemq/broker/jmx/ activemq-core/src/main/java/org/apache/...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Aug 10 09:57:01 2007
@@ -233,8 +233,9 @@
     public void serviceTransportException(IOException e) {
         if (!disposed.get()) {
             transportException.set(e);
-            if (TRANSPORTLOG.isDebugEnabled())
+            if (TRANSPORTLOG.isDebugEnabled()) {
                 TRANSPORTLOG.debug("Transport failed: " + e, e);
+            }
             ServiceSupport.dispose(this);
         }
     }
@@ -263,17 +264,20 @@
      * error transmitted to the client before stopping it's transport.
      */
     public void serviceException(Throwable e) {
+        
         // are we a transport exception such as not being able to dispatch
         // synchronously to a transport
         if (e instanceof IOException) {
             serviceTransportException((IOException)e);
-        }
-        // Handle the case where the broker is stopped
-        // But the client is still connected.
-        else if (e.getClass() == BrokerStoppedException.class) {
+        } else if (e.getClass() == BrokerStoppedException.class) {
+            // Handle the case where the broker is stopped
+            // But the client is still connected.
+
             if (!disposed.get()) {
-                if (SERVICELOG.isDebugEnabled())
+                if (SERVICELOG.isDebugEnabled()) {
                     SERVICELOG.debug("Broker has been stopped.  Notifying client and closing his connection.");
+                }
+
                 ConnectionError ce = new ConnectionError();
                 ce.setException(e);
                 dispatchSync(ce);
@@ -310,8 +314,9 @@
                 response = command.visit(this);
             } catch (Throwable e) {
                 if (responseRequired) {
-                    if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class)
+                    if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
                         SERVICELOG.debug("Error occured while processing sync command: " + e, e);
+                    }
                     response = new ExceptionResponse(e);
                 } else {
                     serviceException(e);
@@ -402,8 +407,9 @@
             throw new NullPointerException("Context is null");
         }
         TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
-        if (transactionState == null)
+        if (transactionState == null) {
             throw new IllegalStateException("Cannot prepare a transaction that had not been started: " + info.getTransactionId());
+        }
         // Avoid dups.
         if (!transactionState.isPrepared()) {
             transactionState.setPrepared(true);
@@ -500,8 +506,9 @@
         ConnectionId connectionId = sessionId.getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
         SessionState ss = cs.getSessionState(sessionId);
-        if (ss == null)
+        if (ss == null) {
             throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
+        }
         // Avoid replaying dup commands
         if (!ss.getProducerIds().contains(info.getProducerId())) {
             broker.addProducer(cs.getContext(), info);
@@ -519,11 +526,13 @@
         ConnectionId connectionId = sessionId.getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
         SessionState ss = cs.getSessionState(sessionId);
-        if (ss == null)
+        if (ss == null) {
             throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " + sessionId);
+        }
         ProducerState ps = ss.removeProducer(id);
-        if (ps == null)
+        if (ps == null) {
             throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
+        }
         removeProducerBrokerExchange(id);
         broker.removeProducer(cs.getContext(), ps.getInfo());
         return null;
@@ -534,8 +543,9 @@
         ConnectionId connectionId = sessionId.getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
         SessionState ss = cs.getSessionState(sessionId);
-        if (ss == null)
+        if (ss == null) {
             throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: " + sessionId);
+        }
         // Avoid replaying dup commands
         if (!ss.getConsumerIds().contains(info.getConsumerId())) {
             broker.addConsumer(cs.getContext(), info);
@@ -553,11 +563,13 @@
         ConnectionId connectionId = sessionId.getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
         SessionState ss = cs.getSessionState(sessionId);
-        if (ss == null)
+        if (ss == null) {
             throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
+        }
         ConsumerState consumerState = ss.removeConsumer(id);
-        if (consumerState == null)
+        if (consumerState == null) {
             throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
+        }
         broker.removeConsumer(cs.getContext(), consumerState.getInfo());
         removeConsumerBrokerExchange(id);
         return null;
@@ -582,8 +594,9 @@
         ConnectionId connectionId = id.getParentId();
         TransportConnectionState cs = lookupConnectionState(connectionId);
         SessionState session = cs.getSessionState(id);
-        if (session == null)
+        if (session == null) {
             throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
+        }
         // Don't let new consumers or producers get added while we are closing
         // this down.
         session.shutdown();
@@ -912,8 +925,9 @@
                 disposeTransport();
             }
 
-            if (taskRunner != null)
+            if (taskRunner != null) {
                 taskRunner.shutdown();
+            }
 
             // Run the MessageDispatch callbacks so that message references get
             // cleaned up.
@@ -1090,12 +1104,13 @@
             // so this TransportConnection is the rear end of a network bridge
             // We have been requested to create a two way pipe ...
             try {
-                Properties props = MarshallingSupport.stringToProperties(info.getNetworkProperties());
+                Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
+                Map<String, String> props = new HashMap(properties);
                 NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
                 IntrospectionSupport.setProperties(config, props, "");
                 config.setBrokerName(broker.getBrokerName());
                 URI uri = broker.getVmConnectorURI();
-                HashMap map = new HashMap(URISupport.parseParamters(uri));
+                HashMap<String,String> map = new HashMap<String,String>(URISupport.parseParamters(uri));
                 map.put("network", "true");
                 map.put("async", "false");
                 uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
@@ -1145,8 +1160,9 @@
     public String getConnectionId() {
         List<TransportConnectionState> connectionStates = listConnectionStates();
         for (TransportConnectionState cs : connectionStates) {
-            if (cs.getInfo().getClientId() != null)
+            if (cs.getInfo().getClientId() != null) {
                 return cs.getInfo().getClientId();
+            }
             return cs.getInfo().getConnectionId().toString();
         }
         return null;
@@ -1197,7 +1213,7 @@
                     if (cs != null) {
                         ConsumerInfo info = cs.getInfo();
                         if (info != null) {
-                            if (info.getDestination() != null && info.getDestination().isPattern()) {
+                            if (info.getDestination() != null && info.getDestination().isPattern()) { 
                                 result.setWildcard(true);
                             }
                         }
@@ -1233,8 +1249,9 @@
 
     public Response processControlCommand(ControlCommand command) throws Exception {
         String control = command.getCommand();
-        if (control != null && control.equals("shutdown"))
+        if (control != null && control.equals("shutdown")) {
             System.exit(0);
+        }
         return null;
     }
 
@@ -1283,7 +1300,7 @@
     }
 
     protected List<TransportConnectionState> listConnectionStates() {
-        ArrayList<TransportConnectionState> rc = new ArrayList<TransportConnectionState>();
+        List<TransportConnectionState> rc = new ArrayList<TransportConnectionState>();
         if (connectionState != null) {
             rc.add(connectionState);
         }
@@ -1292,36 +1309,41 @@
 
     protected TransportConnectionState lookupConnectionState(String connectionId) {
         TransportConnectionState cs = connectionState;
-        if (cs == null)
+        if (cs == null) {
             throw new IllegalStateException("Cannot lookup a connectionId for a connection that had not been registered: " + connectionId);
+        }
         return cs;
     }
 
     protected TransportConnectionState lookupConnectionState(ConsumerId id) {
         TransportConnectionState cs = connectionState;
-        if (cs == null)
+        if (cs == null) {
             throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: " + id.getParentId().getParentId());
+        }
         return cs;
     }
 
     protected TransportConnectionState lookupConnectionState(ProducerId id) {
         TransportConnectionState cs = connectionState;
-        if (cs == null)
+        if (cs == null) {
             throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: " + id.getParentId().getParentId());
+        }
         return cs;
     }
 
     protected TransportConnectionState lookupConnectionState(SessionId id) {
         TransportConnectionState cs = connectionState;
-        if (cs == null)
+        if (cs == null) {
             throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: " + id.getParentId());
+        }
         return cs;
     }
 
     protected TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
         TransportConnectionState cs = connectionState;
-        if (cs == null)
+        if (cs == null) {
             throw new IllegalStateException("Cannot lookup a connection that had not been registered: " + connectionId);
+        }
         return cs;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Fri Aug 10 09:57:01 2007
@@ -49,6 +49,9 @@
 
     private static final Log LOG = LogFactory.getLog(TransportConnector.class);
 
+    protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
+    protected TransportStatusDetector statusDector;
+
     private Broker broker;
     private TransportServer server;
     private URI uri;
@@ -56,8 +59,6 @@
     private TaskRunnerFactory taskRunnerFactory;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
     private DiscoveryAgent discoveryAgent;
-    protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
-    protected TransportStatusDetector statusDector;
     private ConnectorStatistics statistics = new ConnectorStatistics();
     private URI discoveryUri;
     private URI connectUri;
@@ -65,13 +66,6 @@
     private boolean disableAsyncDispatch;
     private boolean enableStatusMonitor = true;
 
-    /**
-     * @return Returns the connections.
-     */
-    public CopyOnWriteArrayList getConnections() {
-        return connections;
-    }
-
     public TransportConnector() {
     }
 
@@ -88,6 +82,14 @@
 
     }
 
+
+    /**
+     * @return Returns the connections.
+     */
+    public CopyOnWriteArrayList<TransportConnection> getConnections() {
+        return connections;
+    }
+
     /**
      * Factory method to create a JMX managed version of this transport
      * connector
@@ -251,8 +253,8 @@
             this.statusDector.stop();
         }
 
-        for (Iterator iter = connections.iterator(); iter.hasNext();) {
-            TransportConnection c = (TransportConnection)iter.next();
+        for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
+            TransportConnection c = iter.next();
             ss.stop(c);
         }
         ss.throwFirstException();
@@ -342,8 +344,9 @@
 
     public String toString() {
         String rc = getName();
-        if (rc == null)
+        if (rc == null) {
             rc = super.toString();
+        }
         return rc;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri Aug 10 09:57:01 2007
@@ -76,19 +76,19 @@
     private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class);
     private final MBeanServer mbeanServer;
     private final ObjectName brokerObjectName;
-    private final Map topics = new ConcurrentHashMap();
-    private final Map queues = new ConcurrentHashMap();
-    private final Map temporaryQueues = new ConcurrentHashMap();
-    private final Map temporaryTopics = new ConcurrentHashMap();
-    private final Map queueSubscribers = new ConcurrentHashMap();
-    private final Map topicSubscribers = new ConcurrentHashMap();
-    private final Map durableTopicSubscribers = new ConcurrentHashMap();
-    private final Map inactiveDurableTopicSubscribers = new ConcurrentHashMap();
-    private final Map temporaryQueueSubscribers = new ConcurrentHashMap();
-    private final Map temporaryTopicSubscribers = new ConcurrentHashMap();
-    private final Map subscriptionKeys = new ConcurrentHashMap();
-    private final Map subscriptionMap = new ConcurrentHashMap();
-    private final Set registeredMBeans = new CopyOnWriteArraySet();
+    private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
+    private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
+    private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
+    private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
+    private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+    private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+    private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+    private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+    private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+    private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+    private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
+    private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
+    private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
     /* This is the first broker in the broker interceptor chain. */
     private Broker contextBroker;
 
@@ -108,8 +108,8 @@
     protected void doStop(ServiceStopper stopper) {
         super.doStop(stopper);
         // lets remove any mbeans not yet removed
-        for (Iterator iter = registeredMBeans.iterator(); iter.hasNext();) {
-            ObjectName name = (ObjectName)iter.next();
+        for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
+            ObjectName name = iter.next();
             try {
                 mbeanServer.unregisterMBean(name);
             } catch (InstanceNotFoundException e) {
@@ -211,7 +211,7 @@
     }
 
     public void unregisterSubscription(Subscription sub) {
-        ObjectName name = (ObjectName)subscriptionMap.remove(sub);
+        ObjectName name = subscriptionMap.remove(sub);
         if (name != null) {
             try {
                 unregisterSubscription(name);
@@ -275,7 +275,7 @@
                     durableTopicSubscribers.put(key, view);
                     // unregister any inactive durable subs
                     try {
-                        ObjectName inactiveName = (ObjectName)subscriptionKeys.get(subscriptionKey);
+                        ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
                         if (inactiveName != null) {
                             inactiveDurableTopicSubscribers.remove(inactiveName);
                             registeredMBeans.remove(inactiveName);
@@ -327,7 +327,7 @@
     }
 
     protected void buildExistingSubscriptions() throws Exception {
-        Map subscriptions = new HashMap();
+        Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
         Set destinations = destinationFactory.getDestinations();
         if (destinations != null) {
             for (Iterator iter = destinations.iterator(); iter.hasNext();) {
@@ -376,11 +376,11 @@
     }
 
     public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
-        List messages = getSubscriberMessages(view);
+        List<Message> messages = getSubscriberMessages(view);
         CompositeData c[] = new CompositeData[messages.size()];
         for (int i = 0; i < c.length; i++) {
             try {
-                c[i] = OpenTypeSupport.convert((Message)messages.get(i));
+                c[i] = OpenTypeSupport.convert(messages.get(i));
             } catch (Throwable e) {
                 LOG.error("failed to browse : " + view, e);
             }
@@ -390,7 +390,7 @@
 
     public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
         OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
-        List messages = getSubscriberMessages(view);
+        List<Message> messages = getSubscriberMessages(view);
         CompositeType ct = factory.getCompositeType();
         TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
         TabularDataSupport rc = new TabularDataSupport(tt);
@@ -400,13 +400,13 @@
         return rc;
     }
 
-    protected List getSubscriberMessages(SubscriptionView view) {
+    protected List<Message> getSubscriberMessages(SubscriptionView view) {
         // TODO It is very dangerous operation for big backlogs
         if (!(destinationFactory instanceof DestinationFactoryImpl)) {
             throw new RuntimeException("unsupported by " + destinationFactory);
         }
         PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
-        final List result = new ArrayList();
+        final List<Message> result = new ArrayList<Message>();
         try {
             ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
             TopicMessageStore store = adapter.createTopicMessageStore(topic);
@@ -435,53 +435,53 @@
     }
 
     protected ObjectName[] getTopics() {
-        Set set = topics.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set<ObjectName> set = topics.keySet();
+        return set.toArray(new ObjectName[set.size()]);
     }
 
     protected ObjectName[] getQueues() {
-        Set set = queues.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set<ObjectName> set = queues.keySet();
+        return set.toArray(new ObjectName[set.size()]);
     }
 
     protected ObjectName[] getTemporaryTopics() {
-        Set set = temporaryTopics.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set<ObjectName> set = temporaryTopics.keySet();
+        return set.toArray(new ObjectName[set.size()]);
     }
 
     protected ObjectName[] getTemporaryQueues() {
-        Set set = temporaryQueues.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set<ObjectName> set = temporaryQueues.keySet();
+        return set.toArray(new ObjectName[set.size()]);
     }
 
     protected ObjectName[] getTopicSubscribers() {
-        Set set = topicSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set<ObjectName> set = topicSubscribers.keySet();
+        return set.toArray(new ObjectName[set.size()]);
     }
 
     protected ObjectName[] getDurableTopicSubscribers() {
-        Set set = durableTopicSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set<ObjectName> set = durableTopicSubscribers.keySet();
+        return set.toArray(new ObjectName[set.size()]);
     }
 
     protected ObjectName[] getQueueSubscribers() {
-        Set set = queueSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set<ObjectName> set = queueSubscribers.keySet();
+        return set.toArray(new ObjectName[set.size()]);
     }
 
     protected ObjectName[] getTemporaryTopicSubscribers() {
-        Set set = temporaryTopicSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set<ObjectName> set = temporaryTopicSubscribers.keySet();
+        return set.toArray(new ObjectName[set.size()]);
     }
 
     protected ObjectName[] getTemporaryQueueSubscribers() {
-        Set set = temporaryQueueSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set<ObjectName> set = temporaryQueueSubscribers.keySet();
+        return set.toArray(new ObjectName[set.size()]);
     }
 
     protected ObjectName[] getInactiveDurableTopicSubscribers() {
-        Set set = inactiveDurableTopicSubscribers.keySet();
-        return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+        Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
+        return set.toArray(new ObjectName[set.size()]);
     }
 
     public Broker getContextBroker() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java Fri Aug 10 09:57:01 2007
@@ -105,8 +105,9 @@
     public String getPassword() {
         String pw = connector.getPassword();
         // Hide the password for security reasons.
-        if (pw != null)
+        if (pw != null) {
             pw = pw.replaceAll(".", "*");
+        }
         return pw;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Fri Aug 10 09:57:01 2007
@@ -20,6 +20,7 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import javax.jms.DeliveryMode;
@@ -40,22 +41,22 @@
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.command.Message;
 
-public class OpenTypeSupport {
+public final class OpenTypeSupport {
 
     interface OpenTypeFactory {
         CompositeType getCompositeType() throws OpenDataException;
 
-        Map getFields(Object o) throws OpenDataException;
+        Map<String, Object> getFields(Object o) throws OpenDataException;
     }
 
-    private static final HashMap OPEN_TYPE_FACTORIES = new HashMap();
+    private static final Map<Class, MessageOpenTypeFactory> OPEN_TYPE_FACTORIES = new HashMap<Class, MessageOpenTypeFactory>();
 
     abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
 
         private CompositeType compositeType;
-        ArrayList itemNamesList = new ArrayList();
-        ArrayList itemDescriptionsList = new ArrayList();
-        ArrayList itemTypesList = new ArrayList();
+        private List<String> itemNamesList = new ArrayList<String>();
+        private List<String> itemDescriptionsList = new ArrayList<String>();
+        private List<OpenType> itemTypesList = new ArrayList<OpenType>();
 
         public CompositeType getCompositeType() throws OpenDataException {
             if (compositeType == null) {
@@ -69,9 +70,9 @@
         }
 
         protected CompositeType createCompositeType() throws OpenDataException {
-            String[] itemNames = (String[])itemNamesList.toArray(new String[itemNamesList.size()]);
-            String[] itemDescriptions = (String[])itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
-            OpenType[] itemTypes = (OpenType[])itemTypesList.toArray(new OpenType[itemTypesList.size()]);
+            String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]);
+            String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
+            OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]);
             return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes);
         }
 
@@ -87,8 +88,8 @@
             return getTypeName();
         }
 
-        public Map getFields(Object o) throws OpenDataException {
-            HashMap rc = new HashMap();
+        public Map<String, Object> getFields(Object o) throws OpenDataException {
+            Map<String, Object> rc = new HashMap<String, Object>();
             return rc;
         }
     }
@@ -114,9 +115,9 @@
             addItem("Properties", "Properties", SimpleType.STRING);
         }
 
-        public Map getFields(Object o) throws OpenDataException {
+        public Map<String, Object> getFields(Object o) throws OpenDataException {
             ActiveMQMessage m = (ActiveMQMessage)o;
-            Map rc = super.getFields(o);
+            Map<String, Object> rc = super.getFields(o);
             rc.put("JMSCorrelationID", m.getJMSCorrelationID());
             rc.put("JMSDestination", "" + m.getJMSDestination());
             rc.put("JMSMessageID", m.getJMSMessageID());
@@ -148,9 +149,9 @@
             addItem("BodyPreview", "Body preview", new ArrayType(1, SimpleType.BYTE));
         }
 
-        public Map getFields(Object o) throws OpenDataException {
+        public Map<String, Object> getFields(Object o) throws OpenDataException {
             ActiveMQBytesMessage m = (ActiveMQBytesMessage)o;
-            Map rc = super.getFields(o);
+            Map<String, Object> rc = super.getFields(o);
             long length = 0;
             try {
                 length = m.getBodyLength();
@@ -189,10 +190,9 @@
             addItem("ContentMap", "Content map", SimpleType.STRING);
         }
 
-        public Map getFields(Object o) throws OpenDataException {
+        public Map<String, Object> getFields(Object o) throws OpenDataException {
             ActiveMQMapMessage m = (ActiveMQMapMessage)o;
-            Map rc = super.getFields(o);
-            long length = 0;
+            Map<String, Object> rc = super.getFields(o);
             try {
                 rc.put("ContentMap", "" + m.getContentMap());
             } catch (JMSException e) {
@@ -211,9 +211,8 @@
             super.init();
         }
 
-        public Map getFields(Object o) throws OpenDataException {
-            ActiveMQObjectMessage m = (ActiveMQObjectMessage)o;
-            Map rc = super.getFields(o);
+        public Map<String, Object> getFields(Object o) throws OpenDataException {
+            Map<String, Object> rc = super.getFields(o);
             return rc;
         }
     }
@@ -227,9 +226,8 @@
             super.init();
         }
 
-        public Map getFields(Object o) throws OpenDataException {
-            ActiveMQStreamMessage m = (ActiveMQStreamMessage)o;
-            Map rc = super.getFields(o);
+        public Map<String, Object> getFields(Object o) throws OpenDataException {
+            Map<String, Object> rc = super.getFields(o);
             return rc;
         }
     }
@@ -244,9 +242,9 @@
             addItem("Text", "Text", SimpleType.STRING);
         }
 
-        public Map getFields(Object o) throws OpenDataException {
+        public Map<String, Object> getFields(Object o) throws OpenDataException {
             ActiveMQTextMessage m = (ActiveMQTextMessage)o;
-            Map rc = super.getFields(o);
+            Map<String, Object> rc = super.getFields(o);
             try {
                 rc.put("Text", "" + m.getText());
             } catch (JMSException e) {
@@ -265,16 +263,20 @@
         OPEN_TYPE_FACTORIES.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory());
     }
 
-    public static OpenTypeFactory getFactory(Class clazz) throws OpenDataException {
-        return (OpenTypeFactory)OPEN_TYPE_FACTORIES.get(clazz);
+    private OpenTypeSupport() {
+    }
+    
+    public static OpenTypeFactory getFactory(Class<? extends Message> clazz) throws OpenDataException {
+        return OPEN_TYPE_FACTORIES.get(clazz);
     }
 
     public static CompositeData convert(Message message) throws OpenDataException {
         OpenTypeFactory f = getFactory(message.getClass());
-        if (f == null)
+        if (f == null) {
             throw new OpenDataException("Cannot create a CompositeData for type: " + message.getClass().getName());
+        }
         CompositeType ct = f.getCompositeType();
-        Map fields = f.getFields(message);
+        Map<String, Object> fields = f.getFields(message);
         return new CompositeDataSupport(ct, fields);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java Fri Aug 10 09:57:01 2007
@@ -34,8 +34,9 @@
 
     public CompositeData getMessage(String messageId) throws OpenDataException {
         Message rc = ((Queue)destination).getMessage(messageId);
-        if (rc == null)
+        if (rc == null) {
             return null;
+        }
         return OpenTypeSupport.convert(rc);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Aug 10 09:57:01 2007
@@ -31,6 +31,7 @@
 import org.apache.activemq.broker.DestinationAlreadyExistsException;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
@@ -52,9 +53,9 @@
 
     private static final Log LOG = LogFactory.getLog(AbstractRegion.class);
 
-    protected final ConcurrentHashMap destinations = new ConcurrentHashMap();
+    protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
     protected final DestinationMap destinationMap = new DestinationMap();
-    protected final ConcurrentHashMap subscriptions = new ConcurrentHashMap();
+    protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
     protected final UsageManager memoryManager;
     protected final DestinationFactory destinationFactory;
     protected final DestinationStatistics destinationStatistics;
@@ -62,7 +63,7 @@
     protected boolean autoCreateDestinations = true;
     protected final TaskRunnerFactory taskRunnerFactory;
     protected final Object destinationsMutex = new Object();
-    protected final Map consumerChangeMutexMap = new HashMap();
+    protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
     protected boolean started;
 
     public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
@@ -83,25 +84,25 @@
     public void start() throws Exception {
         started = true;
 
-        Set inactiveDests = getInactiveDestinations();
-        for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
-            ActiveMQDestination dest = (ActiveMQDestination)iter.next();
+        Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
+        for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
+            ActiveMQDestination dest = iter.next();
 
             ConnectionContext context = new ConnectionContext();
             context.setBroker(broker.getBrokerService().getBroker());
             context.getBroker().addDestination(context, dest);
         }
 
-        for (Iterator i = destinations.values().iterator(); i.hasNext();) {
-            Destination dest = (Destination)i.next();
+        for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
+            Destination dest = i.next();
             dest.start();
         }
     }
 
     public void stop() throws Exception {
         started = false;
-        for (Iterator i = destinations.values().iterator(); i.hasNext();) {
-            Destination dest = (Destination)i.next();
+        for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
+            Destination dest = i.next();
             dest.stop();
         }
         destinations.clear();
@@ -110,7 +111,7 @@
     public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
         LOG.debug("Adding destination: " + destination);
         synchronized (destinationsMutex) {
-            Destination dest = (Destination)destinations.get(destination);
+            Destination dest = destinations.get(destination);
             if (dest == null) {
                 dest = createDestination(context, destination);
                 // intercept if there is a valid interceptor defined
@@ -129,10 +130,10 @@
 
     protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
 
-        ArrayList<Subscription> rc = new ArrayList<Subscription>();
+        List<Subscription> rc = new ArrayList<Subscription>();
         // Add all consumers that are interested in the destination.
-        for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
-            Subscription sub = (Subscription)iter.next();
+        for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
+            Subscription sub = iter.next();
             if (sub.matches(dest.getActiveMQDestination())) {
                 dest.addSubscription(context, sub);
                 rc.add(sub);
@@ -147,8 +148,8 @@
         // No timeout.. then try to shut down right way, fails if there are
         // current subscribers.
         if (timeout == 0) {
-            for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
-                Subscription sub = (Subscription)iter.next();
+            for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
+                Subscription sub = iter.next();
                 if (sub.matches(destination)) {
                     throw new JMSException("Destination still has an active subscription: " + destination);
                 }
@@ -166,13 +167,13 @@
 
         LOG.debug("Removing destination: " + destination);
         synchronized (destinationsMutex) {
-            Destination dest = (Destination)destinations.remove(destination);
+            Destination dest = destinations.remove(destination);
             if (dest != null) {
 
                 // timeout<0 or we timed out, we now force any remaining
                 // subscriptions to un-subscribe.
-                for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
-                    Subscription sub = (Subscription)iter.next();
+                for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
+                    Subscription sub = iter.next();
                     if (sub.matches(destination)) {
                         dest.removeSubscription(context, sub);
                     }
@@ -199,9 +200,9 @@
         }
     }
 
-    public Map getDestinationMap() {
+    public Map<ActiveMQDestination, Destination> getDestinationMap() {
         synchronized (destinationsMutex) {
-            return new HashMap(destinations);
+            return new HashMap<ActiveMQDestination, Destination>(destinations);
         }
     }
 
@@ -222,10 +223,10 @@
             }
         }
         synchronized (addGuard) {
-            Object o = subscriptions.get(info.getConsumerId());
+            Subscription o = subscriptions.get(info.getConsumerId());
             if (o != null) {
                 LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
-                return (Subscription)o;
+                return o;
             }
 
             // We may need to add some destinations that are in persistent store
@@ -281,8 +282,8 @@
     /**
      * @return all Destinations that don't have active consumers
      */
-    protected Set getInactiveDestinations() {
-        Set inactiveDests = destinationFactory.getDestinations();
+    protected Set<ActiveMQDestination> getInactiveDestinations() {
+        Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
         inactiveDests.removeAll(destinations.keySet());
         return inactiveDests;
     }
@@ -290,9 +291,10 @@
     public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
         LOG.debug("Removing consumer: " + info.getConsumerId());
 
-        Subscription sub = (Subscription)subscriptions.remove(info.getConsumerId());
-        if (sub == null)
+        Subscription sub = subscriptions.remove(info.getConsumerId());
+        if (sub == null) {
             throw new IllegalArgumentException("The subscription does not exist: " + info.getConsumerId());
+        }
 
         // remove the subscription from all the matching queues.
         for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
@@ -329,7 +331,7 @@
     public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
         Subscription sub = consumerExchange.getSubscription();
         if (sub == null) {
-            sub = (Subscription)subscriptions.get(ack.getConsumerId());
+            sub = subscriptions.get(ack.getConsumerId());
             if (sub == null) {
                 throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
             }
@@ -339,15 +341,16 @@
     }
 
     public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
-        Subscription sub = (Subscription)subscriptions.get(pull.getConsumerId());
-        if (sub == null)
+        Subscription sub = subscriptions.get(pull.getConsumerId());
+        if (sub == null) {
             throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
+        }
         return sub.pullMessage(context, pull);
     }
 
     protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception {
         synchronized (destinationsMutex) {
-            Destination dest = (Destination)destinations.get(destination);
+            Destination dest = destinations.get(destination);
             if (dest == null) {
                 if (autoCreateDestinations) {
                     // Try to auto create the destination... re-invoke broker
@@ -362,7 +365,7 @@
                         // this error
                     }
                     // We should now have the dest created.
-                    dest = (Destination)destinations.get(destination);
+                    dest = destinations.get(destination);
                 }
                 if (dest == null) {
                     throw new JMSException("The destination " + destination + " does not exist.");
@@ -373,19 +376,19 @@
     }
 
     public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
-        Subscription sub = (Subscription)subscriptions.get(messageDispatchNotification.getConsumerId());
+        Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
         if (sub != null) {
             sub.processMessageDispatchNotification(messageDispatchNotification);
         }
     }
 
     public void gc() {
-        for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
-            Subscription sub = (Subscription)iter.next();
+        for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
+            Subscription sub = iter.next();
             sub.gc();
         }
-        for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
-            Destination dest = (Destination)iter.next();
+        for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
+            Destination dest = iter.next();
             dest.gc();
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Fri Aug 10 09:57:01 2007
@@ -45,7 +45,7 @@
     protected ConnectionContext context;
     protected ConsumerInfo info;
     protected final DestinationFilter destinationFilter;
-    protected final CopyOnWriteArrayList destinations = new CopyOnWriteArrayList();
+    protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
     private BooleanExpression selectorExpression;
     private ObjectName objectName;
 
@@ -83,8 +83,9 @@
     public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
         ConsumerId targetConsumerId = node.getTargetConsumerId();
         if (targetConsumerId != null) {
-            if (!targetConsumerId.equals(info.getConsumerId()))
+            if (!targetConsumerId.equals(info.getConsumerId())) {
                 return false;
+            }
         }
         try {
             return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java Fri Aug 10 09:57:01 2007
@@ -41,7 +41,7 @@
      * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
      * objects that the persistence store is aware exist.
      */
-    public abstract Set getDestinations();
+    public abstract Set<ActiveMQDestination> getDestinations();
 
     /**
      * Lists all the durable subscirptions for a given destination.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Fri Aug 10 09:57:01 2007
@@ -140,11 +140,13 @@
     }
 
     public boolean lock(LockOwner subscription) {
-        if (!regionDestination.lock(this, subscription))
+        if (!regionDestination.lock(this, subscription)) {
             return false;
+        }
         synchronized (this) {
-            if (dropped || (lockOwner != null && lockOwner != subscription))
+            if (dropped || (lockOwner != null && lockOwner != subscription)) {
                 return false;
+            }
             lockOwner = subscription;
             return true;
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Aug 10 09:57:01 2007
@@ -50,7 +50,7 @@
 
     private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
     protected PendingMessageCursor pending;
-    protected final LinkedList dispatched = new LinkedList();
+    protected final LinkedList<MessageReference> dispatched = new LinkedList<MessageReference>();
     protected int prefetchExtension;
     protected long enqueueCounter;
     protected long dispatchCounter;
@@ -158,8 +158,8 @@
             // acknowledgment.
             int index = 0;
             boolean inAckRange = false;
-            for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-                final MessageReference node = (MessageReference)iter.next();
+            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
+                final MessageReference node = iter.next();
                 MessageId messageId = node.getMessageId();
                 if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
                     inAckRange = true;
@@ -219,8 +219,8 @@
             // Acknowledge all dispatched messages up till the message id of the
             // acknowledgment.
             int index = 0;
-            for (Iterator iter = dispatched.iterator(); iter.hasNext(); index++) {
-                final MessageReference node = (MessageReference)iter.next();
+            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
+                final MessageReference node = iter.next();
                 if (ack.getLastMessageId().equals(node.getMessageId())) {
                     prefetchExtension = Math.max(prefetchExtension, index + 1);
                     callDispatchMatched = true;
@@ -233,14 +233,15 @@
         } else if (ack.isPoisonAck()) {
             // TODO: what if the message is already in a DLQ???
             // Handle the poison ACK case: we need to send the message to a DLQ
-            if (ack.isInTransaction())
+            if (ack.isInTransaction()) {
                 throw new JMSException("Poison ack cannot be transacted: " + ack);
+            }
             // Acknowledge all dispatched messages up till the message id of the
             // acknowledgment.
             int index = 0;
             boolean inAckRange = false;
-            for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
-                final MessageReference node = (MessageReference)iter.next();
+            for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
+                final MessageReference node = iter.next();
                 MessageId messageId = node.getMessageId();
                 if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
                     inAckRange = true;
@@ -380,8 +381,9 @@
                     pending.reset();
                     while (pending.hasNext() && !isFull() && count < numberToDispatch) {
                         MessageReference node = pending.next();
-                        if (node == null)
+                        if (node == null) {
                             break;
+                        }
                         if (canDispatch(node)) {
                             pending.remove();
                             // Message may have been sitting in the pending list

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Aug 10 09:57:01 2007
@@ -76,12 +76,12 @@
 
     private final Log log;
     private final ActiveMQDestination destination;
-    private final List consumers = new CopyOnWriteArrayList();
+    private final List<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
     private final Valve dispatchValve = new Valve(true);
     private final UsageManager usageManager;
     private final DestinationStatistics destinationStatistics = new DestinationStatistics();
     private PendingMessageCursor messages;
-    private final LinkedList pagedInMessages = new LinkedList();
+    private final LinkedList<MessageReference> pagedInMessages = new LinkedList<MessageReference>();
     private LockOwner exclusiveOwner;
     private MessageGroupMap messageGroupOwners;
 
@@ -96,7 +96,16 @@
     private final Object exclusiveLockMutex = new Object();
     private final Object doDispatchMutex = new Object();
     private TaskRunner taskRunner;
-    private boolean started;
+    
+    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
+        public void run() {
+            try {
+                taskRunner.wakeup();
+            } catch (InterruptedException e) {
+            }
+        };
+    };
 
     public Queue(Broker broker, ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
                  TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
@@ -161,9 +170,6 @@
                         throw new RuntimeException("Should not be called.");
                     }
 
-                    public void finished() {
-                    }
-
                     public boolean hasSpace() {
                         return true;
                     }
@@ -183,10 +189,12 @@
      */
     public boolean lock(MessageReference node, LockOwner lockOwner) {
         synchronized (exclusiveLockMutex) {
-            if (exclusiveOwner == lockOwner)
+            if (exclusiveOwner == lockOwner) {
                 return true;
-            if (exclusiveOwner != null)
+            }
+            if (exclusiveOwner != null) {
                 return false;
+            }
         }
         return true;
     }
@@ -225,7 +233,7 @@
                 synchronized (pagedInMessages) {
                     // Add all the matching messages in the queue to the
                     // subscription.
-                    for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+                    for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
                         QueueMessageReference node = (QueueMessageReference)i.next();
                         if (node.isDropped()) {
                             continue;
@@ -269,11 +277,12 @@
                         // Find the exclusive consumer with the higest Lock
                         // Priority.
                         exclusiveOwner = null;
-                        for (Iterator iter = consumers.iterator(); iter.hasNext();) {
-                            Subscription s = (Subscription)iter.next();
+                        for (Iterator<Subscription> iter = consumers.iterator(); iter.hasNext();) {
+                            Subscription s = iter.next();
                             LockOwner so = (LockOwner)s;
-                            if (s.getConsumerInfo().isExclusive() && (exclusiveOwner == null || so.getLockPriority() > exclusiveOwner.getLockPriority()))
+                            if (s.getConsumerInfo().isExclusive() && (exclusiveOwner == null || so.getLockPriority() > exclusiveOwner.getLockPriority())) {
                                 exclusiveOwner = so;
+                            }
                         }
                     }
                 }
@@ -299,9 +308,9 @@
                     msgContext.setDestination(destination);
 
                     // lets copy the messages to dispatch to avoid deadlock
-                    List messagesToDispatch = new ArrayList();
+                    List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
                     synchronized (pagedInMessages) {
-                        for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+                        for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
                             QueueMessageReference node = (QueueMessageReference)i.next();
                             if (node.isDropped()) {
                                 continue;
@@ -318,8 +327,8 @@
 
                     // now lets dispatch from the copy of the collection to
                     // avoid deadlocks
-                    for (Iterator iter = messagesToDispatch.iterator(); iter.hasNext();) {
-                        QueueMessageReference node = (QueueMessageReference)iter.next();
+                    for (Iterator<QueueMessageReference> iter = messagesToDispatch.iterator(); iter.hasNext();) {
+                        QueueMessageReference node = iter.next();
                         node.incrementRedeliveryCounter();
                         node.unlock();
                         msgContext.setMessageReference(node);
@@ -335,16 +344,6 @@
 
     }
 
-    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
-    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
-        public void run() {
-            try {
-                taskRunner.wakeup();
-            } catch (InterruptedException e) {
-            }
-        };
-    };
-
     public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
         // There is delay between the client sending it and it arriving at the
@@ -419,8 +418,9 @@
                 // control at the broker
                 // by blocking this thread until there is space available.
                 while (!usageManager.waitForSpace(1000)) {
-                    if (context.getStopping().get())
+                    if (context.getStopping().get()) {
                         throw new IOException("Connection closed, send aborted.");
+                    }
                 }
 
                 // The usage manager could have delayed us by the time
@@ -508,7 +508,7 @@
 
     public void gc() {
         synchronized (pagedInMessages) {
-            for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
                 // Remove dropped messages from the queue.
                 QueueMessageReference node = (QueueMessageReference)i.next();
                 if (node.isDropped()) {
@@ -557,7 +557,6 @@
     }
 
     public void start() throws Exception {
-        started = true;
         if (usageManager != null) {
             usageManager.start();
         }
@@ -566,7 +565,6 @@
     }
 
     public void stop() throws Exception {
-        started = false;
         if (taskRunner != null) {
             taskRunner.shutdown();
         }
@@ -652,15 +650,15 @@
     }
 
     public Message[] browse() {
-        ArrayList l = new ArrayList();
+        List<Message> l = new ArrayList<Message>();
         try {
             doPageIn(true);
         } catch (Exception e) {
             log.error("caught an exception browsing " + this, e);
         }
         synchronized (pagedInMessages) {
-            for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
-                MessageReference r = (MessageReference)i.next();
+            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
+                MessageReference r = i.next();
                 r.incrementReferenceCount();
                 try {
                     Message m = r.getMessage();
@@ -698,7 +696,7 @@
             }
         }
 
-        return (Message[])l.toArray(new Message[l.size()]);
+        return l.toArray(new Message[l.size()]);
     }
 
     public Message getMessage(String messageId) {
@@ -737,7 +735,7 @@
 
         synchronized (pagedInMessages) {
             ConnectionContext c = createConnectionContext();
-            for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
                 try {
                     QueueMessageReference r = (QueueMessageReference)i.next();
 
@@ -798,7 +796,7 @@
         int counter = 0;
         synchronized (pagedInMessages) {
             ConnectionContext c = createConnectionContext();
-            for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
                 IndirectMessageReference r = (IndirectMessageReference)i.next();
                 if (filter.evaluate(c, r)) {
                     removeMessage(c, r);
@@ -848,8 +846,8 @@
         pageInMessages();
         int counter = 0;
         synchronized (pagedInMessages) {
-            for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
-                MessageReference r = (MessageReference)i.next();
+            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
+                MessageReference r = i.next();
                 if (filter.evaluate(context, r)) {
                     r.incrementReferenceCount();
                     try {
@@ -899,7 +897,7 @@
         pageInMessages();
         int counter = 0;
         synchronized (pagedInMessages) {
-            for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+            for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
                 IndirectMessageReference r = (IndirectMessageReference)i.next();
                 if (filter.evaluate(context, r)) {
                     // We should only move messages that can be locked.
@@ -995,20 +993,20 @@
         pageInMessages(false);
     }
 
-    private List doPageIn() throws Exception {
+    private List<MessageReference> doPageIn() throws Exception {
         return doPageIn(true);
     }
 
-    private List doPageIn(boolean force) throws Exception {
+    private List<MessageReference> doPageIn(boolean force) throws Exception {
 
         final int toPageIn = maximumPagedInMessages - pagedInMessages.size();
-        List result = null;
+        List<MessageReference> result = null;
         if ((force || !consumers.isEmpty()) && toPageIn > 0) {
             messages.setMaxBatchSize(toPageIn);
             try {
                 dispatchValve.increment();
                 int count = 0;
-                result = new ArrayList(toPageIn);
+                result = new ArrayList<MessageReference>(toPageIn);
                 synchronized (messages) {
 
                     try {
@@ -1040,12 +1038,12 @@
         return result;
     }
 
-    private void doDispatch(List list) throws Exception {
+    private void doDispatch(List<MessageReference> list) throws Exception {
         if (list != null && !list.isEmpty()) {
             try {
                 dispatchValve.increment();
                 for (int i = 0; i < list.size(); i++) {
-                    MessageReference node = (MessageReference)list.get(i);
+                    MessageReference node = list.get(i);
                     queueMsgConext.setDestination(destination);
                     queueMsgConext.setMessageReference(node);
                     dispatchPolicy.dispatch(node, queueMsgConext, consumers);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Fri Aug 10 09:57:01 2007
@@ -53,12 +53,13 @@
         }
     }
 
-    protected Set getInactiveDestinations() {
-        Set inactiveDestinations = super.getInactiveDestinations();
-        for (Iterator iter = inactiveDestinations.iterator(); iter.hasNext();) {
-            ActiveMQDestination dest = (ActiveMQDestination)iter.next();
-            if (!dest.isQueue())
+    protected Set<ActiveMQDestination> getInactiveDestinations() {
+        Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
+        for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
+            ActiveMQDestination dest = iter.next();
+            if (!dest.isQueue()) {
                 iter.remove();
+            }
         }
         return inactiveDestinations;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Fri Aug 10 09:57:01 2007
@@ -72,8 +72,9 @@
 
     protected boolean canDispatch(MessageReference n) throws IOException {
         QueueMessageReference node = (QueueMessageReference)n;
-        if (node.isAcked())
+        if (node.isAcked()) {
             return false;
+        }
         // Keep message groups together.
         String groupId = node.getGroupID();
         int sequence = node.getGroupSequence();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java Fri Aug 10 09:57:01 2007
@@ -69,7 +69,7 @@
      * 
      * @return a copy of the regions currently active at the time of the call with the key the destination and the value the Destination.
      */
-    Map getDestinationMap();
+    Map<ActiveMQDestination, Destination> getDestinationMap();
     
 
     /**

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Aug 10 09:57:01 2007
@@ -76,6 +76,10 @@
     private static final Log LOG = LogFactory.getLog(RegionBroker.class);
     private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
 
+    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
+    protected DestinationFactory destinationFactory;
+    protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
+
     private final Region queueRegion;
     private final Region topicRegion;
     private final Region tempQueueRegion;
@@ -84,23 +88,16 @@
     private boolean started;
     private boolean keepDurableSubsActive;
 
-    protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
-
-    private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
-    private final Map destinations = new ConcurrentHashMap();
-    private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList();
+    private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
+    private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
+    private final CopyOnWriteArrayList<BrokerInfo> brokerInfos = new CopyOnWriteArrayList<BrokerInfo>();
 
     private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
     private BrokerId brokerId;
     private String brokerName;
-    private Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>(); // we
-    // will
-    // synchronize
-    // access
+    private Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>(); 
     private final DestinationInterceptor destinationInterceptor;
     private ConnectionContext adminConnectionContext;
-    protected DestinationFactory destinationFactory;
-    protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
 
     public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory,
                         DestinationInterceptor destinationInterceptor) throws IOException {
@@ -243,7 +240,7 @@
     }
 
     public Connection[] getClients() throws Exception {
-        ArrayList l = new ArrayList(connections);
+        ArrayList<Connection> l = new ArrayList<Connection>(connections);
         Connection rc[] = new Connection[l.size()];
         l.toArray(rc);
         return rc;
@@ -253,9 +250,10 @@
 
         Destination answer;
 
-        answer = (Destination)destinations.get(destination);
-        if (answer != null)
+        answer = destinations.get(destination);
+        if (answer != null) {
             return answer;
+        }
 
         switch (destination.getDestinationType()) {
         case ActiveMQDestination.QUEUE_TYPE:
@@ -313,9 +311,9 @@
     }
 
     public ActiveMQDestination[] getDestinations() throws Exception {
-        ArrayList l;
+        ArrayList<Destination> l;
 
-        l = new ArrayList(destinations.values());
+        l = new ArrayList<Destination>(destinations.values());
 
         ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
         l.toArray(rc);
@@ -530,7 +528,7 @@
 
     public synchronized BrokerInfo[] getPeerBrokerInfos() {
         BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
-        result = (BrokerInfo[])brokerInfos.toArray(result);
+        result = brokerInfos.toArray(result);
         return result;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Fri Aug 10 09:57:01 2007
@@ -73,8 +73,9 @@
         // Force a timeout value so that we don't get an error that
         // there is still an active sub. Temp destination may be removed
         // while a network sub is still active which is valid.
-        if (timeout == 0)
+        if (timeout == 0) {
             timeout = 1;
+        }
 
         super.removeDestination(context, destination, timeout);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Fri Aug 10 09:57:01 2007
@@ -75,8 +75,9 @@
         // Force a timeout value so that we don't get an error that
         // there is still an active sub. Temp destination may be removed
         // while a network sub is still active which is valid.
-        if (timeout == 0)
+        if (timeout == 0) {
             timeout = 1;
+        }
 
         super.removeDestination(context, destination, timeout);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Aug 10 09:57:01 2007
@@ -18,6 +18,7 @@
 
 import java.io.IOException;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -62,7 +63,7 @@
 public class Topic implements Destination {
     private static final Log LOG = LogFactory.getLog(Topic.class);
     protected final ActiveMQDestination destination;
-    protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
+    protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
     protected final Valve dispatchValve = new Valve(true);
     // this could be NULL! (If an advisory)
     protected final TopicMessageStore store;
@@ -73,14 +74,32 @@
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedSizedSubscriptionRecoveryPolicy();
     private boolean sendAdvisoryIfNoConsumers;
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
-    private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap();
-    final Broker broker;
+    private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
+    
+    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
+        public void run() {
+
+            // We may need to do this in async thread since this is run for
+            // within a synchronization
+            // that the UsageManager is holding.
+
+            synchronized (messagesWaitingForSpace) {
+                while (!usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
+                    Runnable op = messagesWaitingForSpace.removeFirst();
+                    op.run();
+                }
+            }
+
+        };
+    };
+    private final Broker broker;
 
     public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
                  TaskRunnerFactory taskFactory) {
         this.broker = broker;
         this.destination = destination;
-        this.store = store; // this could be NULL! (If an advsiory)
+        this.store = store; // this could be NULL! (If an advisory)
         this.usageManager = new UsageManager(memoryManager, destination.toString());
         this.usageManager.setUsagePortion(1.0f);
 
@@ -168,8 +187,9 @@
                 consumers.add(subscription);
             }
 
-            if (store == null)
+            if (store == null) {
                 return;
+            }
 
             // Recover the durable subscription.
             String clientId = subscription.getClientId();
@@ -228,9 +248,6 @@
                         throw new RuntimeException("Should not be called.");
                     }
 
-                    public void finished() {
-                    }
-
                     public boolean hasSpace() {
                         return true;
                     }
@@ -255,24 +272,6 @@
         }
     }
 
-    private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
-    private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
-        public void run() {
-
-            // We may need to do this in async thread since this is run for
-            // within a synchronization
-            // that the UsageManager is holding.
-
-            synchronized (messagesWaitingForSpace) {
-                while (!usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
-                    Runnable op = messagesWaitingForSpace.removeFirst();
-                    op.run();
-                }
-            }
-
-        };
-    };
-
     public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
 
@@ -342,8 +341,9 @@
                 // control at the broker
                 // by blocking this thread until there is space available.
                 while (!usageManager.waitForSpace(1000)) {
-                    if (context.getStopping().get())
+                    if (context.getStopping().get()) {
                         throw new IOException("Connection closed, send aborted.");
+                    }
                 }
 
                 // The usage manager could have delayed us by the time
@@ -364,8 +364,9 @@
         final ConnectionContext context = producerExchange.getConnectionContext();
         message.setRegionDestination(this);
 
-        if (store != null && message.isPersistent() && !canOptimizeOutPersistence())
+        if (store != null && message.isPersistent() && !canOptimizeOutPersistence()) {
             store.addMessage(context, message);
+        }
 
         message.incrementReferenceCount();
         try {
@@ -440,7 +441,7 @@
     }
 
     public Message[] browse() {
-        final Set result = new CopyOnWriteArraySet();
+        final Set<Message> result = new CopyOnWriteArraySet<Message>();
         try {
             if (store != null) {
                 store.recover(new MessageRecoveryListener() {
@@ -453,9 +454,6 @@
                         return true;
                     }
 
-                    public void finished() {
-                    }
-
                     public boolean hasSpace() {
                         return true;
                     }
@@ -470,7 +468,7 @@
         } catch (Throwable e) {
             LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
         }
-        return (Message[])result.toArray(new Message[result.size()]);
+        return result.toArray(new Message[result.size()]);
     }
 
     // Properties
@@ -576,10 +574,12 @@
                     // filled when the message is first sent,
                     // it is only populated if the message is routed to another
                     // destination like the DLQ
-                    if (message.getOriginalDestination() != null)
+                    if (message.getOriginalDestination() != null) {
                         message.setOriginalDestination(message.getDestination());
-                    if (message.getOriginalTransactionId() != null)
+                    }
+                    if (message.getOriginalTransactionId() != null) {
                         message.setOriginalTransactionId(message.getTransactionId());
+                    }
 
                     ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
                     message.setDestination(advisoryTopic);