You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/07 17:19:35 UTC

svn commit: r1429878 - /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java

Author: tabish
Date: Mon Jan  7 16:19:35 2013
New Revision: 1429878

URL: http://svn.apache.org/viewvc?rev=1429878&view=rev
Log:
fixes: https://issues.apache.org/jira/browse/AMQ-4246

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1429878&r1=1429877&r2=1429878&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Jan  7 16:19:35 2013
@@ -224,8 +224,8 @@ public class BrokerService implements Se
     private boolean allowTempAutoCreationOnSend;
     private JobSchedulerStore jobSchedulerStore;
 
-    private int offlineDurableSubscriberTimeout = -1;
-    private int offlineDurableSubscriberTaskSchedule = 300000;
+    private long offlineDurableSubscriberTimeout = -1;
+    private long offlineDurableSubscriberTaskSchedule = 300000;
     private DestinationFilter virtualConsumerDestinationFilter;
 
     private final Object persistenceAdapterLock = new Object();
@@ -812,8 +812,7 @@ public class BrokerService implements Se
      * @param pollInterval
      * @throws Exception
      */
-    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
-            throws Exception {
+    public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
         if (isUseJmx()) {
             if (connectorName == null || queueName == null || timeout <= 0) {
                 throw new Exception(
@@ -1349,14 +1348,13 @@ public class BrokerService implements Se
      *                            nestedType="org.apache.activemq.broker.TransportConnector"
      */
     public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
-        for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
-            TransportConnector connector = iter.next();
+        for (TransportConnector connector : transportConnectors) {
             addConnector(connector);
         }
     }
 
     public TransportConnector getTransportConnectorByName(String name){
-        for (TransportConnector transportConnector:transportConnectors){
+        for (TransportConnector transportConnector : transportConnectors){
            if (name.equals(transportConnector.getName())){
                return transportConnector;
            }
@@ -1365,7 +1363,7 @@ public class BrokerService implements Se
     }
 
     public TransportConnector getTransportConnectorByScheme(String scheme){
-        for (TransportConnector transportConnector:transportConnectors){
+        for (TransportConnector transportConnector : transportConnectors){
             if (scheme.equals(transportConnector.getUri().getScheme())){
                 return transportConnector;
             }
@@ -1388,10 +1386,9 @@ public class BrokerService implements Se
      * @org.apache.xbean.Property
      *                            nestedType="org.apache.activemq.network.NetworkConnector"
      */
-    public void setNetworkConnectors(List networkConnectors) throws Exception {
-        for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
-            NetworkConnector connector = (NetworkConnector) iter.next();
-            addNetworkConnector(connector);
+    public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
+        for (Object connector : networkConnectors) {
+            addNetworkConnector((NetworkConnector) connector);
         }
     }
 
@@ -1399,10 +1396,9 @@ public class BrokerService implements Se
      * Sets the network connectors which this broker will use to connect to
      * other brokers in a federated network
      */
-    public void setProxyConnectors(List proxyConnectors) throws Exception {
-        for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
-            ProxyConnector connector = (ProxyConnector) iter.next();
-            addProxyConnector(connector);
+    public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
+        for (Object connector : proxyConnectors) {
+            addProxyConnector((ProxyConnector) connector);
         }
     }
 
@@ -1480,33 +1476,32 @@ public class BrokerService implements Se
     }
 
     public String getDefaultSocketURIString() {
-
-            if (started.get()) {
-                if (this.defaultSocketURIString == null) {
-                    for (TransportConnector tc:this.transportConnectors) {
-                        String result = null;
-                        try {
-                            result = tc.getPublishableConnectString();
-                        } catch (Exception e) {
-                          LOG.warn("Failed to get the ConnectURI for "+tc,e);
-                        }
-                        if (result != null) {
-                            // find first publishable uri
-                            if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
+        if (started.get()) {
+            if (this.defaultSocketURIString == null) {
+                for (TransportConnector tc:this.transportConnectors) {
+                    String result = null;
+                    try {
+                        result = tc.getPublishableConnectString();
+                    } catch (Exception e) {
+                      LOG.warn("Failed to get the ConnectURI for "+tc,e);
+                    }
+                    if (result != null) {
+                        // find first publishable uri
+                        if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
+                            this.defaultSocketURIString = result;
+                            break;
+                        } else {
+                        // or use the first defined
+                            if (this.defaultSocketURIString == null) {
                                 this.defaultSocketURIString = result;
-                                break;
-                            } else {
-                            // or use the first defined
-                                if (this.defaultSocketURIString == null) {
-                                    this.defaultSocketURIString = result;
-                                }
                             }
                         }
                     }
-
                 }
-                return this.defaultSocketURIString;
+
             }
+            return this.defaultSocketURIString;
+        }
        return null;
     }
 
@@ -1815,7 +1810,6 @@ public class BrokerService implements Se
      * @throws Exception
      */
     protected void processHelperProperties() throws Exception {
-        boolean masterServiceExists = false;
         if (transportConnectorURIs != null) {
             for (int i = 0; i < transportConnectorURIs.length; i++) {
                 String uri = transportConnectorURIs[i];
@@ -1994,8 +1988,7 @@ public class BrokerService implements Se
     }
 
     protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
-        if (isUseJmx()) {
-        }
+        if (isUseJmx()) {}
     }
 
     private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
@@ -2015,16 +2008,13 @@ public class BrokerService implements Se
         }
     }
 
-    protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
-            throws MalformedObjectNameException {
+    protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
         String objectNameStr = getBrokerObjectName().toString();
         objectNameStr += ",connector=networkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(connector.getName());
         return new ObjectName(objectNameStr);
     }
 
-
-    public ObjectName createDuplexNetworkConnectorObjectName(String transport)
-            throws MalformedObjectNameException {
+    public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
         String objectNameStr = getBrokerObjectName().toString();
         objectNameStr += ",connector=duplexNetworkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(transport);
         return new ObjectName(objectNameStr);
@@ -2053,8 +2043,6 @@ public class BrokerService implements Se
         }
     }
 
-
-
     protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
         JmsConnectorView view = new JmsConnectorView(connector);
         try {
@@ -2125,9 +2113,9 @@ public class BrokerService implements Se
         RegionBroker regionBroker;
         if (isUseJmx()) {
             try {
-            regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
+                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
                     getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
-            }catch(MalformedObjectNameException me){
+            } catch(MalformedObjectNameException me){
                 LOG.error("Couldn't create ManagedRegionBroker",me);
                 throw new IOException(me);
             }
@@ -2179,7 +2167,6 @@ public class BrokerService implements Se
             if (isUseJmx()) {
                 JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
                 try {
-
                     String objectNameStr = getBrokerObjectName().toString();
                     objectNameStr += ",service=JobScheduler,name=JMS";
                     ObjectName objectName = new ObjectName(objectNameStr);
@@ -2189,7 +2176,6 @@ public class BrokerService implements Se
                     throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
                             + e.getMessage(), e);
                 }
-
             }
             broker = sb;
         }
@@ -2262,7 +2248,7 @@ public class BrokerService implements Se
     /**
      * Extracts the port from the options
      */
-    protected Object getPort(Map options) {
+    protected Object getPort(Map<?,?> options) {
         Object port = options.get("port");
         if (port == null) {
             port = DEFAULT_PORT;
@@ -2395,16 +2381,16 @@ public class BrokerService implements Se
             if (isNetworkConnectorStartAsync()) {
                 // spin up as many threads as needed
                 networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
-                        10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
-                        new ThreadFactory() {
-                            int count=0;
-                            @Override
-                            public Thread newThread(Runnable runnable) {
-                                Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
-                                thread.setDaemon(true);
-                                return thread;
-                            }
-                        });
+                    10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+                    new ThreadFactory() {
+                        int count=0;
+                        @Override
+                        public Thread newThread(Runnable runnable) {
+                            Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
+                            thread.setDaemon(true);
+                            return thread;
+                        }
+                    });
             }
 
             for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
@@ -2819,25 +2805,25 @@ public class BrokerService implements Se
         this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
     }
 
-    public int getOfflineDurableSubscriberTimeout() {
+    public long getOfflineDurableSubscriberTimeout() {
         return offlineDurableSubscriberTimeout;
     }
 
-    public void setOfflineDurableSubscriberTimeout(int offlineDurableSubscriberTimeout) {
+    public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) {
         this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
     }
 
-    public int getOfflineDurableSubscriberTaskSchedule() {
+    public long getOfflineDurableSubscriberTaskSchedule() {
         return offlineDurableSubscriberTaskSchedule;
     }
 
-    public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) {
+    public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) {
         this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
     }
 
     public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
         return isUseVirtualTopics() && destination.isQueue() &&
-                getVirtualTopicConsumerDestinationFilter().matches(destination);
+               getVirtualTopicConsumerDestinationFilter().matches(destination);
     }
 
     public Throwable getStartException() {