You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/10 18:57:10 UTC
svn commit: r564679 [2/8] - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/
activemq-core/src/main/java/org/apache/activemq/broker/
activemq-core/src/main/java/org/apache/activemq/broker/jmx/
activemq-core/src/main/java/org/apache/...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Fri Aug 10 09:57:01 2007
@@ -233,8 +233,9 @@
public void serviceTransportException(IOException e) {
if (!disposed.get()) {
transportException.set(e);
- if (TRANSPORTLOG.isDebugEnabled())
+ if (TRANSPORTLOG.isDebugEnabled()) {
TRANSPORTLOG.debug("Transport failed: " + e, e);
+ }
ServiceSupport.dispose(this);
}
}
@@ -263,17 +264,20 @@
* error transmitted to the client before stopping it's transport.
*/
public void serviceException(Throwable e) {
+
// are we a transport exception such as not being able to dispatch
// synchronously to a transport
if (e instanceof IOException) {
serviceTransportException((IOException)e);
- }
- // Handle the case where the broker is stopped
- // But the client is still connected.
- else if (e.getClass() == BrokerStoppedException.class) {
+ } else if (e.getClass() == BrokerStoppedException.class) {
+ // Handle the case where the broker is stopped
+ // But the client is still connected.
+
if (!disposed.get()) {
- if (SERVICELOG.isDebugEnabled())
+ if (SERVICELOG.isDebugEnabled()) {
SERVICELOG.debug("Broker has been stopped. Notifying client and closing his connection.");
+ }
+
ConnectionError ce = new ConnectionError();
ce.setException(e);
dispatchSync(ce);
@@ -310,8 +314,9 @@
response = command.visit(this);
} catch (Throwable e) {
if (responseRequired) {
- if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class)
+ if (SERVICELOG.isDebugEnabled() && e.getClass() != BrokerStoppedException.class) {
SERVICELOG.debug("Error occured while processing sync command: " + e, e);
+ }
response = new ExceptionResponse(e);
} else {
serviceException(e);
@@ -402,8 +407,9 @@
throw new NullPointerException("Context is null");
}
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
- if (transactionState == null)
+ if (transactionState == null) {
throw new IllegalStateException("Cannot prepare a transaction that had not been started: " + info.getTransactionId());
+ }
// Avoid dups.
if (!transactionState.isPrepared()) {
transactionState.setPrepared(true);
@@ -500,8 +506,9 @@
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
- if (ss == null)
+ if (ss == null) {
throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
+ }
// Avoid replaying dup commands
if (!ss.getProducerIds().contains(info.getProducerId())) {
broker.addProducer(cs.getContext(), info);
@@ -519,11 +526,13 @@
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
- if (ss == null)
+ if (ss == null) {
throw new IllegalStateException("Cannot remove a producer from a session that had not been registered: " + sessionId);
+ }
ProducerState ps = ss.removeProducer(id);
- if (ps == null)
+ if (ps == null) {
throw new IllegalStateException("Cannot remove a producer that had not been registered: " + id);
+ }
removeProducerBrokerExchange(id);
broker.removeProducer(cs.getContext(), ps.getInfo());
return null;
@@ -534,8 +543,9 @@
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
- if (ss == null)
+ if (ss == null) {
throw new IllegalStateException("Cannot add a consumer to a session that had not been registered: " + sessionId);
+ }
// Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) {
broker.addConsumer(cs.getContext(), info);
@@ -553,11 +563,13 @@
ConnectionId connectionId = sessionId.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
- if (ss == null)
+ if (ss == null) {
throw new IllegalStateException("Cannot remove a consumer from a session that had not been registered: " + sessionId);
+ }
ConsumerState consumerState = ss.removeConsumer(id);
- if (consumerState == null)
+ if (consumerState == null) {
throw new IllegalStateException("Cannot remove a consumer that had not been registered: " + id);
+ }
broker.removeConsumer(cs.getContext(), consumerState.getInfo());
removeConsumerBrokerExchange(id);
return null;
@@ -582,8 +594,9 @@
ConnectionId connectionId = id.getParentId();
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState session = cs.getSessionState(id);
- if (session == null)
+ if (session == null) {
throw new IllegalStateException("Cannot remove session that had not been registered: " + id);
+ }
// Don't let new consumers or producers get added while we are closing
// this down.
session.shutdown();
@@ -912,8 +925,9 @@
disposeTransport();
}
- if (taskRunner != null)
+ if (taskRunner != null) {
taskRunner.shutdown();
+ }
// Run the MessageDispatch callbacks so that message references get
// cleaned up.
@@ -1090,12 +1104,13 @@
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...
try {
- Properties props = MarshallingSupport.stringToProperties(info.getNetworkProperties());
+ Properties properties = MarshallingSupport.stringToProperties(info.getNetworkProperties());
+ Map<String, String> props = new HashMap(properties);
NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
IntrospectionSupport.setProperties(config, props, "");
config.setBrokerName(broker.getBrokerName());
URI uri = broker.getVmConnectorURI();
- HashMap map = new HashMap(URISupport.parseParamters(uri));
+ HashMap<String,String> map = new HashMap<String,String>(URISupport.parseParamters(uri));
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
@@ -1145,8 +1160,9 @@
public String getConnectionId() {
List<TransportConnectionState> connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
- if (cs.getInfo().getClientId() != null)
+ if (cs.getInfo().getClientId() != null) {
return cs.getInfo().getClientId();
+ }
return cs.getInfo().getConnectionId().toString();
}
return null;
@@ -1197,7 +1213,7 @@
if (cs != null) {
ConsumerInfo info = cs.getInfo();
if (info != null) {
- if (info.getDestination() != null && info.getDestination().isPattern()) {
+ if (info.getDestination() != null && info.getDestination().isPattern()) {
result.setWildcard(true);
}
}
@@ -1233,8 +1249,9 @@
public Response processControlCommand(ControlCommand command) throws Exception {
String control = command.getCommand();
- if (control != null && control.equals("shutdown"))
+ if (control != null && control.equals("shutdown")) {
System.exit(0);
+ }
return null;
}
@@ -1283,7 +1300,7 @@
}
protected List<TransportConnectionState> listConnectionStates() {
- ArrayList<TransportConnectionState> rc = new ArrayList<TransportConnectionState>();
+ List<TransportConnectionState> rc = new ArrayList<TransportConnectionState>();
if (connectionState != null) {
rc.add(connectionState);
}
@@ -1292,36 +1309,41 @@
protected TransportConnectionState lookupConnectionState(String connectionId) {
TransportConnectionState cs = connectionState;
- if (cs == null)
+ if (cs == null) {
throw new IllegalStateException("Cannot lookup a connectionId for a connection that had not been registered: " + connectionId);
+ }
return cs;
}
protected TransportConnectionState lookupConnectionState(ConsumerId id) {
TransportConnectionState cs = connectionState;
- if (cs == null)
+ if (cs == null) {
throw new IllegalStateException("Cannot lookup a consumer from a connection that had not been registered: " + id.getParentId().getParentId());
+ }
return cs;
}
protected TransportConnectionState lookupConnectionState(ProducerId id) {
TransportConnectionState cs = connectionState;
- if (cs == null)
+ if (cs == null) {
throw new IllegalStateException("Cannot lookup a producer from a connection that had not been registered: " + id.getParentId().getParentId());
+ }
return cs;
}
protected TransportConnectionState lookupConnectionState(SessionId id) {
TransportConnectionState cs = connectionState;
- if (cs == null)
+ if (cs == null) {
throw new IllegalStateException("Cannot lookup a session from a connection that had not been registered: " + id.getParentId());
+ }
return cs;
}
protected TransportConnectionState lookupConnectionState(ConnectionId connectionId) {
TransportConnectionState cs = connectionState;
- if (cs == null)
+ if (cs == null) {
throw new IllegalStateException("Cannot lookup a connection that had not been registered: " + connectionId);
+ }
return cs;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Fri Aug 10 09:57:01 2007
@@ -49,6 +49,9 @@
private static final Log LOG = LogFactory.getLog(TransportConnector.class);
+ protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
+ protected TransportStatusDetector statusDector;
+
private Broker broker;
private TransportServer server;
private URI uri;
@@ -56,8 +59,6 @@
private TaskRunnerFactory taskRunnerFactory;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private DiscoveryAgent discoveryAgent;
- protected CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
- protected TransportStatusDetector statusDector;
private ConnectorStatistics statistics = new ConnectorStatistics();
private URI discoveryUri;
private URI connectUri;
@@ -65,13 +66,6 @@
private boolean disableAsyncDispatch;
private boolean enableStatusMonitor = true;
- /**
- * @return Returns the connections.
- */
- public CopyOnWriteArrayList getConnections() {
- return connections;
- }
-
public TransportConnector() {
}
@@ -88,6 +82,14 @@
}
+
+ /**
+ * @return Returns the connections.
+ */
+ public CopyOnWriteArrayList<TransportConnection> getConnections() {
+ return connections;
+ }
+
/**
* Factory method to create a JMX managed version of this transport
* connector
@@ -251,8 +253,8 @@
this.statusDector.stop();
}
- for (Iterator iter = connections.iterator(); iter.hasNext();) {
- TransportConnection c = (TransportConnection)iter.next();
+ for (Iterator<TransportConnection> iter = connections.iterator(); iter.hasNext();) {
+ TransportConnection c = iter.next();
ss.stop(c);
}
ss.throwFirstException();
@@ -342,8 +344,9 @@
public String toString() {
String rc = getName();
- if (rc == null)
+ if (rc == null) {
rc = super.toString();
+ }
return rc;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri Aug 10 09:57:01 2007
@@ -76,19 +76,19 @@
private static final Log LOG = LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer;
private final ObjectName brokerObjectName;
- private final Map topics = new ConcurrentHashMap();
- private final Map queues = new ConcurrentHashMap();
- private final Map temporaryQueues = new ConcurrentHashMap();
- private final Map temporaryTopics = new ConcurrentHashMap();
- private final Map queueSubscribers = new ConcurrentHashMap();
- private final Map topicSubscribers = new ConcurrentHashMap();
- private final Map durableTopicSubscribers = new ConcurrentHashMap();
- private final Map inactiveDurableTopicSubscribers = new ConcurrentHashMap();
- private final Map temporaryQueueSubscribers = new ConcurrentHashMap();
- private final Map temporaryTopicSubscribers = new ConcurrentHashMap();
- private final Map subscriptionKeys = new ConcurrentHashMap();
- private final Map subscriptionMap = new ConcurrentHashMap();
- private final Set registeredMBeans = new CopyOnWriteArraySet();
+ private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>();
+ private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>();
+ private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>();
+ private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>();
+ private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+ private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+ private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+ private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+ private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+ private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>();
+ private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
+ private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
+ private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
@@ -108,8 +108,8 @@
protected void doStop(ServiceStopper stopper) {
super.doStop(stopper);
// lets remove any mbeans not yet removed
- for (Iterator iter = registeredMBeans.iterator(); iter.hasNext();) {
- ObjectName name = (ObjectName)iter.next();
+ for (Iterator<ObjectName> iter = registeredMBeans.iterator(); iter.hasNext();) {
+ ObjectName name = iter.next();
try {
mbeanServer.unregisterMBean(name);
} catch (InstanceNotFoundException e) {
@@ -211,7 +211,7 @@
}
public void unregisterSubscription(Subscription sub) {
- ObjectName name = (ObjectName)subscriptionMap.remove(sub);
+ ObjectName name = subscriptionMap.remove(sub);
if (name != null) {
try {
unregisterSubscription(name);
@@ -275,7 +275,7 @@
durableTopicSubscribers.put(key, view);
// unregister any inactive durable subs
try {
- ObjectName inactiveName = (ObjectName)subscriptionKeys.get(subscriptionKey);
+ ObjectName inactiveName = subscriptionKeys.get(subscriptionKey);
if (inactiveName != null) {
inactiveDurableTopicSubscribers.remove(inactiveName);
registeredMBeans.remove(inactiveName);
@@ -327,7 +327,7 @@
}
protected void buildExistingSubscriptions() throws Exception {
- Map subscriptions = new HashMap();
+ Map<SubscriptionKey, SubscriptionInfo> subscriptions = new HashMap<SubscriptionKey, SubscriptionInfo>();
Set destinations = destinationFactory.getDestinations();
if (destinations != null) {
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
@@ -376,11 +376,11 @@
}
public CompositeData[] browse(SubscriptionView view) throws OpenDataException {
- List messages = getSubscriberMessages(view);
+ List<Message> messages = getSubscriberMessages(view);
CompositeData c[] = new CompositeData[messages.size()];
for (int i = 0; i < c.length; i++) {
try {
- c[i] = OpenTypeSupport.convert((Message)messages.get(i));
+ c[i] = OpenTypeSupport.convert(messages.get(i));
} catch (Throwable e) {
LOG.error("failed to browse : " + view, e);
}
@@ -390,7 +390,7 @@
public TabularData browseAsTable(SubscriptionView view) throws OpenDataException {
OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
- List messages = getSubscriberMessages(view);
+ List<Message> messages = getSubscriberMessages(view);
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[] {"JMSMessageID"});
TabularDataSupport rc = new TabularDataSupport(tt);
@@ -400,13 +400,13 @@
return rc;
}
- protected List getSubscriberMessages(SubscriptionView view) {
+ protected List<Message> getSubscriberMessages(SubscriptionView view) {
// TODO It is very dangerous operation for big backlogs
if (!(destinationFactory instanceof DestinationFactoryImpl)) {
throw new RuntimeException("unsupported by " + destinationFactory);
}
PersistenceAdapter adapter = ((DestinationFactoryImpl)destinationFactory).getPersistenceAdapter();
- final List result = new ArrayList();
+ final List<Message> result = new ArrayList<Message>();
try {
ActiveMQTopic topic = new ActiveMQTopic(view.getDestinationName());
TopicMessageStore store = adapter.createTopicMessageStore(topic);
@@ -435,53 +435,53 @@
}
protected ObjectName[] getTopics() {
- Set set = topics.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set<ObjectName> set = topics.keySet();
+ return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getQueues() {
- Set set = queues.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set<ObjectName> set = queues.keySet();
+ return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryTopics() {
- Set set = temporaryTopics.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set<ObjectName> set = temporaryTopics.keySet();
+ return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryQueues() {
- Set set = temporaryQueues.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set<ObjectName> set = temporaryQueues.keySet();
+ return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTopicSubscribers() {
- Set set = topicSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set<ObjectName> set = topicSubscribers.keySet();
+ return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getDurableTopicSubscribers() {
- Set set = durableTopicSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set<ObjectName> set = durableTopicSubscribers.keySet();
+ return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getQueueSubscribers() {
- Set set = queueSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set<ObjectName> set = queueSubscribers.keySet();
+ return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryTopicSubscribers() {
- Set set = temporaryTopicSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set<ObjectName> set = temporaryTopicSubscribers.keySet();
+ return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getTemporaryQueueSubscribers() {
- Set set = temporaryQueueSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set<ObjectName> set = temporaryQueueSubscribers.keySet();
+ return set.toArray(new ObjectName[set.size()]);
}
protected ObjectName[] getInactiveDurableTopicSubscribers() {
- Set set = inactiveDurableTopicSubscribers.keySet();
- return (ObjectName[])set.toArray(new ObjectName[set.size()]);
+ Set<ObjectName> set = inactiveDurableTopicSubscribers.keySet();
+ return set.toArray(new ObjectName[set.size()]);
}
public Broker getContextBroker() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/NetworkConnectorView.java Fri Aug 10 09:57:01 2007
@@ -105,8 +105,9 @@
public String getPassword() {
String pw = connector.getPassword();
// Hide the password for security reasons.
- if (pw != null)
+ if (pw != null) {
pw = pw.replaceAll(".", "*");
+ }
return pw;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/OpenTypeSupport.java Fri Aug 10 09:57:01 2007
@@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.jms.DeliveryMode;
@@ -40,22 +41,22 @@
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message;
-public class OpenTypeSupport {
+public final class OpenTypeSupport {
interface OpenTypeFactory {
CompositeType getCompositeType() throws OpenDataException;
- Map getFields(Object o) throws OpenDataException;
+ Map<String, Object> getFields(Object o) throws OpenDataException;
}
- private static final HashMap OPEN_TYPE_FACTORIES = new HashMap();
+ private static final Map<Class, MessageOpenTypeFactory> OPEN_TYPE_FACTORIES = new HashMap<Class, MessageOpenTypeFactory>();
abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
private CompositeType compositeType;
- ArrayList itemNamesList = new ArrayList();
- ArrayList itemDescriptionsList = new ArrayList();
- ArrayList itemTypesList = new ArrayList();
+ private List<String> itemNamesList = new ArrayList<String>();
+ private List<String> itemDescriptionsList = new ArrayList<String>();
+ private List<OpenType> itemTypesList = new ArrayList<OpenType>();
public CompositeType getCompositeType() throws OpenDataException {
if (compositeType == null) {
@@ -69,9 +70,9 @@
}
protected CompositeType createCompositeType() throws OpenDataException {
- String[] itemNames = (String[])itemNamesList.toArray(new String[itemNamesList.size()]);
- String[] itemDescriptions = (String[])itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
- OpenType[] itemTypes = (OpenType[])itemTypesList.toArray(new OpenType[itemTypesList.size()]);
+ String[] itemNames = itemNamesList.toArray(new String[itemNamesList.size()]);
+ String[] itemDescriptions = itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
+ OpenType[] itemTypes = itemTypesList.toArray(new OpenType[itemTypesList.size()]);
return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes);
}
@@ -87,8 +88,8 @@
return getTypeName();
}
- public Map getFields(Object o) throws OpenDataException {
- HashMap rc = new HashMap();
+ public Map<String, Object> getFields(Object o) throws OpenDataException {
+ Map<String, Object> rc = new HashMap<String, Object>();
return rc;
}
}
@@ -114,9 +115,9 @@
addItem("Properties", "Properties", SimpleType.STRING);
}
- public Map getFields(Object o) throws OpenDataException {
+ public Map<String, Object> getFields(Object o) throws OpenDataException {
ActiveMQMessage m = (ActiveMQMessage)o;
- Map rc = super.getFields(o);
+ Map<String, Object> rc = super.getFields(o);
rc.put("JMSCorrelationID", m.getJMSCorrelationID());
rc.put("JMSDestination", "" + m.getJMSDestination());
rc.put("JMSMessageID", m.getJMSMessageID());
@@ -148,9 +149,9 @@
addItem("BodyPreview", "Body preview", new ArrayType(1, SimpleType.BYTE));
}
- public Map getFields(Object o) throws OpenDataException {
+ public Map<String, Object> getFields(Object o) throws OpenDataException {
ActiveMQBytesMessage m = (ActiveMQBytesMessage)o;
- Map rc = super.getFields(o);
+ Map<String, Object> rc = super.getFields(o);
long length = 0;
try {
length = m.getBodyLength();
@@ -189,10 +190,9 @@
addItem("ContentMap", "Content map", SimpleType.STRING);
}
- public Map getFields(Object o) throws OpenDataException {
+ public Map<String, Object> getFields(Object o) throws OpenDataException {
ActiveMQMapMessage m = (ActiveMQMapMessage)o;
- Map rc = super.getFields(o);
- long length = 0;
+ Map<String, Object> rc = super.getFields(o);
try {
rc.put("ContentMap", "" + m.getContentMap());
} catch (JMSException e) {
@@ -211,9 +211,8 @@
super.init();
}
- public Map getFields(Object o) throws OpenDataException {
- ActiveMQObjectMessage m = (ActiveMQObjectMessage)o;
- Map rc = super.getFields(o);
+ public Map<String, Object> getFields(Object o) throws OpenDataException {
+ Map<String, Object> rc = super.getFields(o);
return rc;
}
}
@@ -227,9 +226,8 @@
super.init();
}
- public Map getFields(Object o) throws OpenDataException {
- ActiveMQStreamMessage m = (ActiveMQStreamMessage)o;
- Map rc = super.getFields(o);
+ public Map<String, Object> getFields(Object o) throws OpenDataException {
+ Map<String, Object> rc = super.getFields(o);
return rc;
}
}
@@ -244,9 +242,9 @@
addItem("Text", "Text", SimpleType.STRING);
}
- public Map getFields(Object o) throws OpenDataException {
+ public Map<String, Object> getFields(Object o) throws OpenDataException {
ActiveMQTextMessage m = (ActiveMQTextMessage)o;
- Map rc = super.getFields(o);
+ Map<String, Object> rc = super.getFields(o);
try {
rc.put("Text", "" + m.getText());
} catch (JMSException e) {
@@ -265,16 +263,20 @@
OPEN_TYPE_FACTORIES.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory());
}
- public static OpenTypeFactory getFactory(Class clazz) throws OpenDataException {
- return (OpenTypeFactory)OPEN_TYPE_FACTORIES.get(clazz);
+ private OpenTypeSupport() {
+ }
+
+ public static OpenTypeFactory getFactory(Class<? extends Message> clazz) throws OpenDataException {
+ return OPEN_TYPE_FACTORIES.get(clazz);
}
public static CompositeData convert(Message message) throws OpenDataException {
OpenTypeFactory f = getFactory(message.getClass());
- if (f == null)
+ if (f == null) {
throw new OpenDataException("Cannot create a CompositeData for type: " + message.getClass().getName());
+ }
CompositeType ct = f.getCompositeType();
- Map fields = f.getFields(message);
+ Map<String, Object> fields = f.getFields(message);
return new CompositeDataSupport(ct, fields);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/QueueView.java Fri Aug 10 09:57:01 2007
@@ -34,8 +34,9 @@
public CompositeData getMessage(String messageId) throws OpenDataException {
Message rc = ((Queue)destination).getMessage(messageId);
- if (rc == null)
+ if (rc == null) {
return null;
+ }
return OpenTypeSupport.convert(rc);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Fri Aug 10 09:57:01 2007
@@ -31,6 +31,7 @@
import org.apache.activemq.broker.DestinationAlreadyExistsException;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
@@ -52,9 +53,9 @@
private static final Log LOG = LogFactory.getLog(AbstractRegion.class);
- protected final ConcurrentHashMap destinations = new ConcurrentHashMap();
+ protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
protected final DestinationMap destinationMap = new DestinationMap();
- protected final ConcurrentHashMap subscriptions = new ConcurrentHashMap();
+ protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
protected final UsageManager memoryManager;
protected final DestinationFactory destinationFactory;
protected final DestinationStatistics destinationStatistics;
@@ -62,7 +63,7 @@
protected boolean autoCreateDestinations = true;
protected final TaskRunnerFactory taskRunnerFactory;
protected final Object destinationsMutex = new Object();
- protected final Map consumerChangeMutexMap = new HashMap();
+ protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
protected boolean started;
public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
@@ -83,25 +84,25 @@
public void start() throws Exception {
started = true;
- Set inactiveDests = getInactiveDestinations();
- for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
- ActiveMQDestination dest = (ActiveMQDestination)iter.next();
+ Set<ActiveMQDestination> inactiveDests = getInactiveDestinations();
+ for (Iterator<ActiveMQDestination> iter = inactiveDests.iterator(); iter.hasNext();) {
+ ActiveMQDestination dest = iter.next();
ConnectionContext context = new ConnectionContext();
context.setBroker(broker.getBrokerService().getBroker());
context.getBroker().addDestination(context, dest);
}
- for (Iterator i = destinations.values().iterator(); i.hasNext();) {
- Destination dest = (Destination)i.next();
+ for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
+ Destination dest = i.next();
dest.start();
}
}
public void stop() throws Exception {
started = false;
- for (Iterator i = destinations.values().iterator(); i.hasNext();) {
- Destination dest = (Destination)i.next();
+ for (Iterator<Destination> i = destinations.values().iterator(); i.hasNext();) {
+ Destination dest = i.next();
dest.stop();
}
destinations.clear();
@@ -110,7 +111,7 @@
public Destination addDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception {
LOG.debug("Adding destination: " + destination);
synchronized (destinationsMutex) {
- Destination dest = (Destination)destinations.get(destination);
+ Destination dest = destinations.get(destination);
if (dest == null) {
dest = createDestination(context, destination);
// intercept if there is a valid interceptor defined
@@ -129,10 +130,10 @@
protected List<Subscription> addSubscriptionsForDestination(ConnectionContext context, Destination dest) throws Exception {
- ArrayList<Subscription> rc = new ArrayList<Subscription>();
+ List<Subscription> rc = new ArrayList<Subscription>();
// Add all consumers that are interested in the destination.
- for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
- Subscription sub = (Subscription)iter.next();
+ for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
+ Subscription sub = iter.next();
if (sub.matches(dest.getActiveMQDestination())) {
dest.addSubscription(context, sub);
rc.add(sub);
@@ -147,8 +148,8 @@
// No timeout.. then try to shut down right way, fails if there are
// current subscribers.
if (timeout == 0) {
- for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
- Subscription sub = (Subscription)iter.next();
+ for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
+ Subscription sub = iter.next();
if (sub.matches(destination)) {
throw new JMSException("Destination still has an active subscription: " + destination);
}
@@ -166,13 +167,13 @@
LOG.debug("Removing destination: " + destination);
synchronized (destinationsMutex) {
- Destination dest = (Destination)destinations.remove(destination);
+ Destination dest = destinations.remove(destination);
if (dest != null) {
// timeout<0 or we timed out, we now force any remaining
// subscriptions to un-subscribe.
- for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
- Subscription sub = (Subscription)iter.next();
+ for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
+ Subscription sub = iter.next();
if (sub.matches(destination)) {
dest.removeSubscription(context, sub);
}
@@ -199,9 +200,9 @@
}
}
- public Map getDestinationMap() {
+ public Map<ActiveMQDestination, Destination> getDestinationMap() {
synchronized (destinationsMutex) {
- return new HashMap(destinations);
+ return new HashMap<ActiveMQDestination, Destination>(destinations);
}
}
@@ -222,10 +223,10 @@
}
}
synchronized (addGuard) {
- Object o = subscriptions.get(info.getConsumerId());
+ Subscription o = subscriptions.get(info.getConsumerId());
if (o != null) {
LOG.warn("A duplicate subscription was detected. Clients may be misbehaving. Later warnings you may see about subscription removal are a consequence of this.");
- return (Subscription)o;
+ return o;
}
// We may need to add some destinations that are in persistent store
@@ -281,8 +282,8 @@
/**
* @return all Destinations that don't have active consumers
*/
- protected Set getInactiveDestinations() {
- Set inactiveDests = destinationFactory.getDestinations();
+ protected Set<ActiveMQDestination> getInactiveDestinations() {
+ Set<ActiveMQDestination> inactiveDests = destinationFactory.getDestinations();
inactiveDests.removeAll(destinations.keySet());
return inactiveDests;
}
@@ -290,9 +291,10 @@
public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug("Removing consumer: " + info.getConsumerId());
- Subscription sub = (Subscription)subscriptions.remove(info.getConsumerId());
- if (sub == null)
+ Subscription sub = subscriptions.remove(info.getConsumerId());
+ if (sub == null) {
throw new IllegalArgumentException("The subscription does not exist: " + info.getConsumerId());
+ }
// remove the subscription from all the matching queues.
for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
@@ -329,7 +331,7 @@
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
Subscription sub = consumerExchange.getSubscription();
if (sub == null) {
- sub = (Subscription)subscriptions.get(ack.getConsumerId());
+ sub = subscriptions.get(ack.getConsumerId());
if (sub == null) {
throw new IllegalArgumentException("The subscription does not exist: " + ack.getConsumerId());
}
@@ -339,15 +341,16 @@
}
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
- Subscription sub = (Subscription)subscriptions.get(pull.getConsumerId());
- if (sub == null)
+ Subscription sub = subscriptions.get(pull.getConsumerId());
+ if (sub == null) {
throw new IllegalArgumentException("The subscription does not exist: " + pull.getConsumerId());
+ }
return sub.pullMessage(context, pull);
}
protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception {
synchronized (destinationsMutex) {
- Destination dest = (Destination)destinations.get(destination);
+ Destination dest = destinations.get(destination);
if (dest == null) {
if (autoCreateDestinations) {
// Try to auto create the destination... re-invoke broker
@@ -362,7 +365,7 @@
// this error
}
// We should now have the dest created.
- dest = (Destination)destinations.get(destination);
+ dest = destinations.get(destination);
}
if (dest == null) {
throw new JMSException("The destination " + destination + " does not exist.");
@@ -373,19 +376,19 @@
}
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
- Subscription sub = (Subscription)subscriptions.get(messageDispatchNotification.getConsumerId());
+ Subscription sub = subscriptions.get(messageDispatchNotification.getConsumerId());
if (sub != null) {
sub.processMessageDispatchNotification(messageDispatchNotification);
}
}
public void gc() {
- for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
- Subscription sub = (Subscription)iter.next();
+ for (Iterator<Subscription> iter = subscriptions.values().iterator(); iter.hasNext();) {
+ Subscription sub = iter.next();
sub.gc();
}
- for (Iterator iter = destinations.values().iterator(); iter.hasNext();) {
- Destination dest = (Destination)iter.next();
+ for (Iterator<Destination> iter = destinations.values().iterator(); iter.hasNext();) {
+ Destination dest = iter.next();
dest.gc();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java Fri Aug 10 09:57:01 2007
@@ -45,7 +45,7 @@
protected ConnectionContext context;
protected ConsumerInfo info;
protected final DestinationFilter destinationFilter;
- protected final CopyOnWriteArrayList destinations = new CopyOnWriteArrayList();
+ protected final CopyOnWriteArrayList<Destination> destinations = new CopyOnWriteArrayList<Destination>();
private BooleanExpression selectorExpression;
private ObjectName objectName;
@@ -83,8 +83,9 @@
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
ConsumerId targetConsumerId = node.getTargetConsumerId();
if (targetConsumerId != null) {
- if (!targetConsumerId.equals(info.getConsumerId()))
+ if (!targetConsumerId.equals(info.getConsumerId())) {
return false;
+ }
}
try {
return (selectorExpression == null || selectorExpression.matches(context)) && this.context.isAllowedToConsume(node);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java Fri Aug 10 09:57:01 2007
@@ -41,7 +41,7 @@
* Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
* objects that the persistence store is aware exist.
*/
- public abstract Set getDestinations();
+ public abstract Set<ActiveMQDestination> getDestinations();
/**
* Lists all the durable subscirptions for a given destination.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java Fri Aug 10 09:57:01 2007
@@ -140,11 +140,13 @@
}
public boolean lock(LockOwner subscription) {
- if (!regionDestination.lock(this, subscription))
+ if (!regionDestination.lock(this, subscription)) {
return false;
+ }
synchronized (this) {
- if (dropped || (lockOwner != null && lockOwner != subscription))
+ if (dropped || (lockOwner != null && lockOwner != subscription)) {
return false;
+ }
lockOwner = subscription;
return true;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Aug 10 09:57:01 2007
@@ -50,7 +50,7 @@
private static final Log LOG = LogFactory.getLog(PrefetchSubscription.class);
protected PendingMessageCursor pending;
- protected final LinkedList dispatched = new LinkedList();
+ protected final LinkedList<MessageReference> dispatched = new LinkedList<MessageReference>();
protected int prefetchExtension;
protected long enqueueCounter;
protected long dispatchCounter;
@@ -158,8 +158,8 @@
// acknowledgment.
int index = 0;
boolean inAckRange = false;
- for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
- final MessageReference node = (MessageReference)iter.next();
+ for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
+ final MessageReference node = iter.next();
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
@@ -219,8 +219,8 @@
// Acknowledge all dispatched messages up till the message id of the
// acknowledgment.
int index = 0;
- for (Iterator iter = dispatched.iterator(); iter.hasNext(); index++) {
- final MessageReference node = (MessageReference)iter.next();
+ for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
+ final MessageReference node = iter.next();
if (ack.getLastMessageId().equals(node.getMessageId())) {
prefetchExtension = Math.max(prefetchExtension, index + 1);
callDispatchMatched = true;
@@ -233,14 +233,15 @@
} else if (ack.isPoisonAck()) {
// TODO: what if the message is already in a DLQ???
// Handle the poison ACK case: we need to send the message to a DLQ
- if (ack.isInTransaction())
+ if (ack.isInTransaction()) {
throw new JMSException("Poison ack cannot be transacted: " + ack);
+ }
// Acknowledge all dispatched messages up till the message id of the
// acknowledgment.
int index = 0;
boolean inAckRange = false;
- for (Iterator iter = dispatched.iterator(); iter.hasNext();) {
- final MessageReference node = (MessageReference)iter.next();
+ for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext();) {
+ final MessageReference node = iter.next();
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null || ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
@@ -380,8 +381,9 @@
pending.reset();
while (pending.hasNext() && !isFull() && count < numberToDispatch) {
MessageReference node = pending.next();
- if (node == null)
+ if (node == null) {
break;
+ }
if (canDispatch(node)) {
pending.remove();
// Message may have been sitting in the pending list
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Aug 10 09:57:01 2007
@@ -76,12 +76,12 @@
private final Log log;
private final ActiveMQDestination destination;
- private final List consumers = new CopyOnWriteArrayList();
+ private final List<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
private final Valve dispatchValve = new Valve(true);
private final UsageManager usageManager;
private final DestinationStatistics destinationStatistics = new DestinationStatistics();
private PendingMessageCursor messages;
- private final LinkedList pagedInMessages = new LinkedList();
+ private final LinkedList<MessageReference> pagedInMessages = new LinkedList<MessageReference>();
private LockOwner exclusiveOwner;
private MessageGroupMap messageGroupOwners;
@@ -96,7 +96,16 @@
private final Object exclusiveLockMutex = new Object();
private final Object doDispatchMutex = new Object();
private TaskRunner taskRunner;
- private boolean started;
+
+ private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+ private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
+ public void run() {
+ try {
+ taskRunner.wakeup();
+ } catch (InterruptedException e) {
+ }
+ };
+ };
public Queue(Broker broker, ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
@@ -161,9 +170,6 @@
throw new RuntimeException("Should not be called.");
}
- public void finished() {
- }
-
public boolean hasSpace() {
return true;
}
@@ -183,10 +189,12 @@
*/
public boolean lock(MessageReference node, LockOwner lockOwner) {
synchronized (exclusiveLockMutex) {
- if (exclusiveOwner == lockOwner)
+ if (exclusiveOwner == lockOwner) {
return true;
- if (exclusiveOwner != null)
+ }
+ if (exclusiveOwner != null) {
return false;
+ }
}
return true;
}
@@ -225,7 +233,7 @@
synchronized (pagedInMessages) {
// Add all the matching messages in the queue to the
// subscription.
- for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+ for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference)i.next();
if (node.isDropped()) {
continue;
@@ -269,11 +277,12 @@
// Find the exclusive consumer with the higest Lock
// Priority.
exclusiveOwner = null;
- for (Iterator iter = consumers.iterator(); iter.hasNext();) {
- Subscription s = (Subscription)iter.next();
+ for (Iterator<Subscription> iter = consumers.iterator(); iter.hasNext();) {
+ Subscription s = iter.next();
LockOwner so = (LockOwner)s;
- if (s.getConsumerInfo().isExclusive() && (exclusiveOwner == null || so.getLockPriority() > exclusiveOwner.getLockPriority()))
+ if (s.getConsumerInfo().isExclusive() && (exclusiveOwner == null || so.getLockPriority() > exclusiveOwner.getLockPriority())) {
exclusiveOwner = so;
+ }
}
}
}
@@ -299,9 +308,9 @@
msgContext.setDestination(destination);
// lets copy the messages to dispatch to avoid deadlock
- List messagesToDispatch = new ArrayList();
+ List<QueueMessageReference> messagesToDispatch = new ArrayList<QueueMessageReference>();
synchronized (pagedInMessages) {
- for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+ for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
QueueMessageReference node = (QueueMessageReference)i.next();
if (node.isDropped()) {
continue;
@@ -318,8 +327,8 @@
// now lets dispatch from the copy of the collection to
// avoid deadlocks
- for (Iterator iter = messagesToDispatch.iterator(); iter.hasNext();) {
- QueueMessageReference node = (QueueMessageReference)iter.next();
+ for (Iterator<QueueMessageReference> iter = messagesToDispatch.iterator(); iter.hasNext();) {
+ QueueMessageReference node = iter.next();
node.incrementRedeliveryCounter();
node.unlock();
msgContext.setMessageReference(node);
@@ -335,16 +344,6 @@
}
- private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
- private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
- public void run() {
- try {
- taskRunner.wakeup();
- } catch (InterruptedException e) {
- }
- };
- };
-
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
// There is delay between the client sending it and it arriving at the
@@ -419,8 +418,9 @@
// control at the broker
// by blocking this thread until there is space available.
while (!usageManager.waitForSpace(1000)) {
- if (context.getStopping().get())
+ if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
+ }
}
// The usage manager could have delayed us by the time
@@ -508,7 +508,7 @@
public void gc() {
synchronized (pagedInMessages) {
- for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+ for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
// Remove dropped messages from the queue.
QueueMessageReference node = (QueueMessageReference)i.next();
if (node.isDropped()) {
@@ -557,7 +557,6 @@
}
public void start() throws Exception {
- started = true;
if (usageManager != null) {
usageManager.start();
}
@@ -566,7 +565,6 @@
}
public void stop() throws Exception {
- started = false;
if (taskRunner != null) {
taskRunner.shutdown();
}
@@ -652,15 +650,15 @@
}
public Message[] browse() {
- ArrayList l = new ArrayList();
+ List<Message> l = new ArrayList<Message>();
try {
doPageIn(true);
} catch (Exception e) {
log.error("caught an exception browsing " + this, e);
}
synchronized (pagedInMessages) {
- for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
- MessageReference r = (MessageReference)i.next();
+ for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
+ MessageReference r = i.next();
r.incrementReferenceCount();
try {
Message m = r.getMessage();
@@ -698,7 +696,7 @@
}
}
- return (Message[])l.toArray(new Message[l.size()]);
+ return l.toArray(new Message[l.size()]);
}
public Message getMessage(String messageId) {
@@ -737,7 +735,7 @@
synchronized (pagedInMessages) {
ConnectionContext c = createConnectionContext();
- for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+ for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
try {
QueueMessageReference r = (QueueMessageReference)i.next();
@@ -798,7 +796,7 @@
int counter = 0;
synchronized (pagedInMessages) {
ConnectionContext c = createConnectionContext();
- for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+ for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
IndirectMessageReference r = (IndirectMessageReference)i.next();
if (filter.evaluate(c, r)) {
removeMessage(c, r);
@@ -848,8 +846,8 @@
pageInMessages();
int counter = 0;
synchronized (pagedInMessages) {
- for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
- MessageReference r = (MessageReference)i.next();
+ for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
+ MessageReference r = i.next();
if (filter.evaluate(context, r)) {
r.incrementReferenceCount();
try {
@@ -899,7 +897,7 @@
pageInMessages();
int counter = 0;
synchronized (pagedInMessages) {
- for (Iterator i = pagedInMessages.iterator(); i.hasNext();) {
+ for (Iterator<MessageReference> i = pagedInMessages.iterator(); i.hasNext();) {
IndirectMessageReference r = (IndirectMessageReference)i.next();
if (filter.evaluate(context, r)) {
// We should only move messages that can be locked.
@@ -995,20 +993,20 @@
pageInMessages(false);
}
- private List doPageIn() throws Exception {
+ private List<MessageReference> doPageIn() throws Exception {
return doPageIn(true);
}
- private List doPageIn(boolean force) throws Exception {
+ private List<MessageReference> doPageIn(boolean force) throws Exception {
final int toPageIn = maximumPagedInMessages - pagedInMessages.size();
- List result = null;
+ List<MessageReference> result = null;
if ((force || !consumers.isEmpty()) && toPageIn > 0) {
messages.setMaxBatchSize(toPageIn);
try {
dispatchValve.increment();
int count = 0;
- result = new ArrayList(toPageIn);
+ result = new ArrayList<MessageReference>(toPageIn);
synchronized (messages) {
try {
@@ -1040,12 +1038,12 @@
return result;
}
- private void doDispatch(List list) throws Exception {
+ private void doDispatch(List<MessageReference> list) throws Exception {
if (list != null && !list.isEmpty()) {
try {
dispatchValve.increment();
for (int i = 0; i < list.size(); i++) {
- MessageReference node = (MessageReference)list.get(i);
+ MessageReference node = list.get(i);
queueMsgConext.setDestination(destination);
queueMsgConext.setMessageReference(node);
dispatchPolicy.dispatch(node, queueMsgConext, consumers);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueRegion.java Fri Aug 10 09:57:01 2007
@@ -53,12 +53,13 @@
}
}
- protected Set getInactiveDestinations() {
- Set inactiveDestinations = super.getInactiveDestinations();
- for (Iterator iter = inactiveDestinations.iterator(); iter.hasNext();) {
- ActiveMQDestination dest = (ActiveMQDestination)iter.next();
- if (!dest.isQueue())
+ protected Set<ActiveMQDestination> getInactiveDestinations() {
+ Set<ActiveMQDestination> inactiveDestinations = super.getInactiveDestinations();
+ for (Iterator<ActiveMQDestination> iter = inactiveDestinations.iterator(); iter.hasNext();) {
+ ActiveMQDestination dest = iter.next();
+ if (!dest.isQueue()) {
iter.remove();
+ }
}
return inactiveDestinations;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java Fri Aug 10 09:57:01 2007
@@ -72,8 +72,9 @@
protected boolean canDispatch(MessageReference n) throws IOException {
QueueMessageReference node = (QueueMessageReference)n;
- if (node.isAcked())
+ if (node.isAcked()) {
return false;
+ }
// Keep message groups together.
String groupId = node.getGroupID();
int sequence = node.getGroupSequence();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java Fri Aug 10 09:57:01 2007
@@ -69,7 +69,7 @@
*
* @return a copy of the regions currently active at the time of the call with the key the destination and the value the Destination.
*/
- Map getDestinationMap();
+ Map<ActiveMQDestination, Destination> getDestinationMap();
/**
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Aug 10 09:57:01 2007
@@ -76,6 +76,10 @@
private static final Log LOG = LogFactory.getLog(RegionBroker.class);
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
+ protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
+ protected DestinationFactory destinationFactory;
+ protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
+
private final Region queueRegion;
private final Region topicRegion;
private final Region tempQueueRegion;
@@ -84,23 +88,16 @@
private boolean started;
private boolean keepDurableSubsActive;
- protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
-
- private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList();
- private final Map destinations = new ConcurrentHashMap();
- private final CopyOnWriteArrayList brokerInfos = new CopyOnWriteArrayList();
+ private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
+ private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
+ private final CopyOnWriteArrayList<BrokerInfo> brokerInfos = new CopyOnWriteArrayList<BrokerInfo>();
private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
private BrokerId brokerId;
private String brokerName;
- private Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>(); // we
- // will
- // synchronize
- // access
+ private Map<String, ConnectionContext> clientIdSet = new HashMap<String, ConnectionContext>();
private final DestinationInterceptor destinationInterceptor;
private ConnectionContext adminConnectionContext;
- protected DestinationFactory destinationFactory;
- protected final Map<ConnectionId, ConnectionState> connectionStates = Collections.synchronizedMap(new HashMap<ConnectionId, ConnectionState>());
public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory,
DestinationInterceptor destinationInterceptor) throws IOException {
@@ -243,7 +240,7 @@
}
public Connection[] getClients() throws Exception {
- ArrayList l = new ArrayList(connections);
+ ArrayList<Connection> l = new ArrayList<Connection>(connections);
Connection rc[] = new Connection[l.size()];
l.toArray(rc);
return rc;
@@ -253,9 +250,10 @@
Destination answer;
- answer = (Destination)destinations.get(destination);
- if (answer != null)
+ answer = destinations.get(destination);
+ if (answer != null) {
return answer;
+ }
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
@@ -313,9 +311,9 @@
}
public ActiveMQDestination[] getDestinations() throws Exception {
- ArrayList l;
+ ArrayList<Destination> l;
- l = new ArrayList(destinations.values());
+ l = new ArrayList<Destination>(destinations.values());
ActiveMQDestination rc[] = new ActiveMQDestination[l.size()];
l.toArray(rc);
@@ -530,7 +528,7 @@
public synchronized BrokerInfo[] getPeerBrokerInfos() {
BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
- result = (BrokerInfo[])brokerInfos.toArray(result);
+ result = brokerInfos.toArray(result);
return result;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Fri Aug 10 09:57:01 2007
@@ -73,8 +73,9 @@
// Force a timeout value so that we don't get an error that
// there is still an active sub. Temp destination may be removed
// while a network sub is still active which is valid.
- if (timeout == 0)
+ if (timeout == 0) {
timeout = 1;
+ }
super.removeDestination(context, destination, timeout);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempTopicRegion.java Fri Aug 10 09:57:01 2007
@@ -75,8 +75,9 @@
// Force a timeout value so that we don't get an error that
// there is still an active sub. Temp destination may be removed
// while a network sub is still active which is valid.
- if (timeout == 0)
+ if (timeout == 0) {
timeout = 1;
+ }
super.removeDestination(context, destination, timeout);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Aug 10 09:57:01 2007
@@ -18,6 +18,7 @@
import java.io.IOException;
import java.util.LinkedList;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -62,7 +63,7 @@
public class Topic implements Destination {
private static final Log LOG = LogFactory.getLog(Topic.class);
protected final ActiveMQDestination destination;
- protected final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
+ protected final CopyOnWriteArrayList<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
protected final Valve dispatchValve = new Valve(true);
// this could be NULL! (If an advisory)
protected final TopicMessageStore store;
@@ -73,14 +74,32 @@
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedSizedSubscriptionRecoveryPolicy();
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
- private final ConcurrentHashMap durableSubcribers = new ConcurrentHashMap();
- final Broker broker;
+ private final ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription> durableSubcribers = new ConcurrentHashMap<SubscriptionKey, DurableTopicSubscription>();
+
+ private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
+ private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
+ public void run() {
+
+ // We may need to do this in async thread since this is run for
+ // within a synchronization
+ // that the UsageManager is holding.
+
+ synchronized (messagesWaitingForSpace) {
+ while (!usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
+ Runnable op = messagesWaitingForSpace.removeFirst();
+ op.run();
+ }
+ }
+
+ };
+ };
+ private final Broker broker;
public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) {
this.broker = broker;
this.destination = destination;
- this.store = store; // this could be NULL! (If an advsiory)
+ this.store = store; // this could be NULL! (If an advisory)
this.usageManager = new UsageManager(memoryManager, destination.toString());
this.usageManager.setUsagePortion(1.0f);
@@ -168,8 +187,9 @@
consumers.add(subscription);
}
- if (store == null)
+ if (store == null) {
return;
+ }
// Recover the durable subscription.
String clientId = subscription.getClientId();
@@ -228,9 +248,6 @@
throw new RuntimeException("Should not be called.");
}
- public void finished() {
- }
-
public boolean hasSpace() {
return true;
}
@@ -255,24 +272,6 @@
}
}
- private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
- private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
- public void run() {
-
- // We may need to do this in async thread since this is run for
- // within a synchronization
- // that the UsageManager is holding.
-
- synchronized (messagesWaitingForSpace) {
- while (!usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
- Runnable op = messagesWaitingForSpace.removeFirst();
- op.run();
- }
- }
-
- };
- };
-
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
@@ -342,8 +341,9 @@
// control at the broker
// by blocking this thread until there is space available.
while (!usageManager.waitForSpace(1000)) {
- if (context.getStopping().get())
+ if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
+ }
}
// The usage manager could have delayed us by the time
@@ -364,8 +364,9 @@
final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
- if (store != null && message.isPersistent() && !canOptimizeOutPersistence())
+ if (store != null && message.isPersistent() && !canOptimizeOutPersistence()) {
store.addMessage(context, message);
+ }
message.incrementReferenceCount();
try {
@@ -440,7 +441,7 @@
}
public Message[] browse() {
- final Set result = new CopyOnWriteArraySet();
+ final Set<Message> result = new CopyOnWriteArraySet<Message>();
try {
if (store != null) {
store.recover(new MessageRecoveryListener() {
@@ -453,9 +454,6 @@
return true;
}
- public void finished() {
- }
-
public boolean hasSpace() {
return true;
}
@@ -470,7 +468,7 @@
} catch (Throwable e) {
LOG.warn("Failed to browse Topic: " + getActiveMQDestination().getPhysicalName(), e);
}
- return (Message[])result.toArray(new Message[result.size()]);
+ return result.toArray(new Message[result.size()]);
}
// Properties
@@ -576,10 +574,12 @@
// filled when the message is first sent,
// it is only populated if the message is routed to another
// destination like the DLQ
- if (message.getOriginalDestination() != null)
+ if (message.getOriginalDestination() != null) {
message.setOriginalDestination(message.getDestination());
- if (message.getOriginalTransactionId() != null)
+ }
+ if (message.getOriginalTransactionId() != null) {
message.setOriginalTransactionId(message.getTransactionId());
+ }
ActiveMQTopic advisoryTopic = AdvisorySupport.getNoTopicConsumersAdvisoryTopic(destination);
message.setDestination(advisoryTopic);