You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/07 17:19:35 UTC
svn commit: r1429878 -
/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
Author: tabish
Date: Mon Jan 7 16:19:35 2013
New Revision: 1429878
URL: http://svn.apache.org/viewvc?rev=1429878&view=rev
Log:
fixes: https://issues.apache.org/jira/browse/AMQ-4246
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1429878&r1=1429877&r2=1429878&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Jan 7 16:19:35 2013
@@ -224,8 +224,8 @@ public class BrokerService implements Se
private boolean allowTempAutoCreationOnSend;
private JobSchedulerStore jobSchedulerStore;
- private int offlineDurableSubscriberTimeout = -1;
- private int offlineDurableSubscriberTaskSchedule = 300000;
+ private long offlineDurableSubscriberTimeout = -1;
+ private long offlineDurableSubscriberTaskSchedule = 300000;
private DestinationFilter virtualConsumerDestinationFilter;
private final Object persistenceAdapterLock = new Object();
@@ -812,8 +812,7 @@ public class BrokerService implements Se
* @param pollInterval
* @throws Exception
*/
- public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval)
- throws Exception {
+ public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
if (isUseJmx()) {
if (connectorName == null || queueName == null || timeout <= 0) {
throw new Exception(
@@ -1349,14 +1348,13 @@ public class BrokerService implements Se
* nestedType="org.apache.activemq.broker.TransportConnector"
*/
public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
- for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
- TransportConnector connector = iter.next();
+ for (TransportConnector connector : transportConnectors) {
addConnector(connector);
}
}
public TransportConnector getTransportConnectorByName(String name){
- for (TransportConnector transportConnector:transportConnectors){
+ for (TransportConnector transportConnector : transportConnectors){
if (name.equals(transportConnector.getName())){
return transportConnector;
}
@@ -1365,7 +1363,7 @@ public class BrokerService implements Se
}
public TransportConnector getTransportConnectorByScheme(String scheme){
- for (TransportConnector transportConnector:transportConnectors){
+ for (TransportConnector transportConnector : transportConnectors){
if (scheme.equals(transportConnector.getUri().getScheme())){
return transportConnector;
}
@@ -1388,10 +1386,9 @@ public class BrokerService implements Se
* @org.apache.xbean.Property
* nestedType="org.apache.activemq.network.NetworkConnector"
*/
- public void setNetworkConnectors(List networkConnectors) throws Exception {
- for (Iterator iter = networkConnectors.iterator(); iter.hasNext();) {
- NetworkConnector connector = (NetworkConnector) iter.next();
- addNetworkConnector(connector);
+ public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
+ for (Object connector : networkConnectors) {
+ addNetworkConnector((NetworkConnector) connector);
}
}
@@ -1399,10 +1396,9 @@ public class BrokerService implements Se
* Sets the network connectors which this broker will use to connect to
* other brokers in a federated network
*/
- public void setProxyConnectors(List proxyConnectors) throws Exception {
- for (Iterator iter = proxyConnectors.iterator(); iter.hasNext();) {
- ProxyConnector connector = (ProxyConnector) iter.next();
- addProxyConnector(connector);
+ public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
+ for (Object connector : proxyConnectors) {
+ addProxyConnector((ProxyConnector) connector);
}
}
@@ -1480,33 +1476,32 @@ public class BrokerService implements Se
}
public String getDefaultSocketURIString() {
-
- if (started.get()) {
- if (this.defaultSocketURIString == null) {
- for (TransportConnector tc:this.transportConnectors) {
- String result = null;
- try {
- result = tc.getPublishableConnectString();
- } catch (Exception e) {
- LOG.warn("Failed to get the ConnectURI for "+tc,e);
- }
- if (result != null) {
- // find first publishable uri
- if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
+ if (started.get()) {
+ if (this.defaultSocketURIString == null) {
+ for (TransportConnector tc:this.transportConnectors) {
+ String result = null;
+ try {
+ result = tc.getPublishableConnectString();
+ } catch (Exception e) {
+ LOG.warn("Failed to get the ConnectURI for "+tc,e);
+ }
+ if (result != null) {
+ // find first publishable uri
+ if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
+ this.defaultSocketURIString = result;
+ break;
+ } else {
+ // or use the first defined
+ if (this.defaultSocketURIString == null) {
this.defaultSocketURIString = result;
- break;
- } else {
- // or use the first defined
- if (this.defaultSocketURIString == null) {
- this.defaultSocketURIString = result;
- }
}
}
}
-
}
- return this.defaultSocketURIString;
+
}
+ return this.defaultSocketURIString;
+ }
return null;
}
@@ -1815,7 +1810,6 @@ public class BrokerService implements Se
* @throws Exception
*/
protected void processHelperProperties() throws Exception {
- boolean masterServiceExists = false;
if (transportConnectorURIs != null) {
for (int i = 0; i < transportConnectorURIs.length; i++) {
String uri = transportConnectorURIs[i];
@@ -1994,8 +1988,7 @@ public class BrokerService implements Se
}
protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
- if (isUseJmx()) {
- }
+ if (isUseJmx()) {}
}
private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
@@ -2015,16 +2008,13 @@ public class BrokerService implements Se
}
}
- protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector)
- throws MalformedObjectNameException {
+ protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",connector=networkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(connector.getName());
return new ObjectName(objectNameStr);
}
-
- public ObjectName createDuplexNetworkConnectorObjectName(String transport)
- throws MalformedObjectNameException {
+ public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",connector=duplexNetworkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(transport);
return new ObjectName(objectNameStr);
@@ -2053,8 +2043,6 @@ public class BrokerService implements Se
}
}
-
-
protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
JmsConnectorView view = new JmsConnectorView(connector);
try {
@@ -2125,9 +2113,9 @@ public class BrokerService implements Se
RegionBroker regionBroker;
if (isUseJmx()) {
try {
- regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
+ regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
- }catch(MalformedObjectNameException me){
+ } catch(MalformedObjectNameException me){
LOG.error("Couldn't create ManagedRegionBroker",me);
throw new IOException(me);
}
@@ -2179,7 +2167,6 @@ public class BrokerService implements Se
if (isUseJmx()) {
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
try {
-
String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",service=JobScheduler,name=JMS";
ObjectName objectName = new ObjectName(objectNameStr);
@@ -2189,7 +2176,6 @@ public class BrokerService implements Se
throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
+ e.getMessage(), e);
}
-
}
broker = sb;
}
@@ -2262,7 +2248,7 @@ public class BrokerService implements Se
/**
* Extracts the port from the options
*/
- protected Object getPort(Map options) {
+ protected Object getPort(Map<?,?> options) {
Object port = options.get("port");
if (port == null) {
port = DEFAULT_PORT;
@@ -2395,16 +2381,16 @@ public class BrokerService implements Se
if (isNetworkConnectorStartAsync()) {
// spin up as many threads as needed
networkConnectorStartExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
- 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
- new ThreadFactory() {
- int count=0;
- @Override
- public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
- thread.setDaemon(true);
- return thread;
- }
- });
+ 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new ThreadFactory() {
+ int count=0;
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread thread = new Thread(runnable, "NetworkConnector Start Thread-" +(count++));
+ thread.setDaemon(true);
+ return thread;
+ }
+ });
}
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
@@ -2819,25 +2805,25 @@ public class BrokerService implements Se
this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
}
- public int getOfflineDurableSubscriberTimeout() {
+ public long getOfflineDurableSubscriberTimeout() {
return offlineDurableSubscriberTimeout;
}
- public void setOfflineDurableSubscriberTimeout(int offlineDurableSubscriberTimeout) {
+ public void setOfflineDurableSubscriberTimeout(long offlineDurableSubscriberTimeout) {
this.offlineDurableSubscriberTimeout = offlineDurableSubscriberTimeout;
}
- public int getOfflineDurableSubscriberTaskSchedule() {
+ public long getOfflineDurableSubscriberTaskSchedule() {
return offlineDurableSubscriberTaskSchedule;
}
- public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) {
+ public void setOfflineDurableSubscriberTaskSchedule(long offlineDurableSubscriberTaskSchedule) {
this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
}
public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
return isUseVirtualTopics() && destination.isQueue() &&
- getVirtualTopicConsumerDestinationFilter().matches(destination);
+ getVirtualTopicConsumerDestinationFilter().matches(destination);
}
public Throwable getStartException() {