You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/10 18:57:10 UTC
svn commit: r564679 [5/8] - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/
activemq-core/src/main/java/org/apache/activemq/broker/
activemq-core/src/main/java/org/apache/activemq/broker/jmx/
activemq-core/src/main/java/org/apache/...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/memory/list/DestinationBasedMessageList.java Fri Aug 10 09:57:01 2007
@@ -42,7 +42,7 @@
public class DestinationBasedMessageList implements MessageList {
private MessageBuffer messageBuffer;
- private Map queueIndex = new HashMap();
+ private Map<ActiveMQDestination, MessageQueue> queueIndex = new HashMap<ActiveMQDestination, MessageQueue>();
private DestinationMap subscriptionIndex = new DestinationMap();
private Object lock = new Object();
@@ -59,7 +59,7 @@
ActiveMQDestination destination = message.getDestination();
MessageQueue queue = null;
synchronized (lock) {
- queue = (MessageQueue) queueIndex.get(destination);
+ queue = queueIndex.get(destination);
if (queue == null) {
queue = messageBuffer.createMessageQueue();
queueIndex.put(destination, queue);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java Fri Aug 10 09:57:01 2007
@@ -42,8 +42,7 @@
* @param localBroker
* @param remoteBroker
*/
- public ConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker,
- Transport remoteBroker) {
+ public ConduitBridge(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
super(configuration, localBroker, remoteBroker);
}
@@ -57,8 +56,9 @@
protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info) {
- if (info.getSelector() != null)
+ if (info.getSelector() != null) {
return false;
+ }
// search through existing subscriptions and see if we have a match
boolean matched = false;
@@ -78,7 +78,7 @@
}
protected void removeDemandSubscription(ConsumerId id) throws IOException {
- List tmpList = new ArrayList();
+ List<DemandSubscription> tmpList = new ArrayList<DemandSubscription>();
for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext();) {
DemandSubscription ds = (DemandSubscription)i.next();
@@ -87,13 +87,13 @@
tmpList.add(ds);
}
}
- for (Iterator i = tmpList.iterator(); i.hasNext();) {
- DemandSubscription ds = (DemandSubscription)i.next();
+ for (Iterator<DemandSubscription> i = tmpList.iterator(); i.hasNext();) {
+ DemandSubscription ds = i.next();
subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId());
removeSubscription(ds);
- if (LOG.isTraceEnabled())
- LOG.trace("removing sub on " + localBroker + " from " + remoteBrokerName + " : "
- + ds.getRemoteInfo());
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo());
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri Aug 10 09:57:01 2007
@@ -96,22 +96,23 @@
protected ActiveMQDestination[] dynamicallyIncludedDestinations;
protected ActiveMQDestination[] staticallyIncludedDestinations;
protected ActiveMQDestination[] durableDestinations;
- protected final ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap();
- protected final ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap();
+ protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
+ protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
protected final BrokerId localBrokerPath[] = new BrokerId[] {null};
protected CountDownLatch startedLatch = new CountDownLatch(2);
protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
protected NetworkBridgeConfiguration configuration;
+
+ final AtomicLong enqueueCounter = new AtomicLong();
+ final AtomicLong dequeueCounter = new AtomicLong();
+
private NetworkBridgeListener networkBridgeListener;
private boolean createdByDuplex;
-
private BrokerInfo localBrokerInfo;
private BrokerInfo remoteBrokerInfo;
- final AtomicLong enqueueCounter = new AtomicLong();
- final AtomicLong dequeueCounter = new AtomicLong();
private AtomicBoolean started = new AtomicBoolean();
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
@@ -381,7 +382,7 @@
}
} else if (command.isBrokerInfo()) {
lastConnectSucceeded.set(true);
- remoteBrokerInfo = ((BrokerInfo)command);
+ remoteBrokerInfo = (BrokerInfo)command;
serviceRemoteBrokerInfo(command);
// Let the local broker know the remote broker's ID.
localBroker.oneway(command);
@@ -406,13 +407,15 @@
break;
case ConsumerInfo.DATA_STRUCTURE_TYPE:
if (!addConsumerInfo((ConsumerInfo)command)) {
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring ConsumerInfo: " + command);
+ }
}
break;
default:
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring remote command: " + command);
+ }
}
}
} else {
@@ -439,33 +442,38 @@
ConsumerInfo info = (ConsumerInfo)data;
BrokerId[] path = info.getBrokerPath();
if (path != null && path.length >= networkTTL) {
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
+ }
return;
}
if (contains(info.getBrokerPath(), localBrokerPath[0])) {
// Ignore this consumer as it's a consumer we locally sent to
// the broker.
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already routed through this broker once");
+ }
return;
}
if (!isPermissableDestination(info.getDestination())) {
// ignore if not in the permited or in the excluded list
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
+ }
return;
}
if (addConsumerInfo(info)) {
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Forwarding sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
+ }
} else {
- if (LOG.isDebugEnabled())
+ if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination");
+ }
}
} else if (data.getClass() == DestinationInfo.class) {
// It's a destination info - we want to pass up
- // infomation about temporary destinations
+ // information about temporary destinations
DestinationInfo destInfo = (DestinationInfo)data;
BrokerId[] path = destInfo.getBrokerPath();
if (path != null && path.length >= networkTTL) {
@@ -502,7 +510,6 @@
LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
LOG.debug("The local Exception was:" + error, error);
new Thread() {
-
public void run() {
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
}
@@ -525,7 +532,7 @@
}
protected DemandSubscription getDemandSubscription(MessageDispatch md) {
- return (DemandSubscription)subscriptionMapByLocalId.get(md.getConsumerId());
+ return subscriptionMapByLocalId.get(md.getConsumerId());
}
protected Message configureMessage(MessageDispatch md) {
@@ -534,8 +541,9 @@
message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
message.setProducerId(producerInfo.getProducerId());
message.setDestination(md.getDestination());
- if (message.getOriginalTransactionId() == null)
+ if (message.getOriginalTransactionId() == null) {
message.setOriginalTransactionId(message.getTransactionId());
+ }
message.setTransactionId(null);
return message;
}
@@ -548,11 +556,12 @@
enqueueCounter.incrementAndGet();
waitStarted();
final MessageDispatch md = (MessageDispatch)command;
- DemandSubscription sub = (DemandSubscription)subscriptionMapByLocalId.get(md.getConsumerId());
+ DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null) {
Message message = configureMessage(md);
- if (trace)
+ if (trace) {
LOG.trace("bridging " + configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
+ }
if (!message.isResponseRequired()) {
@@ -591,11 +600,12 @@
}
} else {
- if (trace)
+ if (trace) {
LOG.trace("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
+ }
}
} else if (command.isBrokerInfo()) {
- localBrokerInfo = ((BrokerInfo)command);
+ localBrokerInfo = (BrokerInfo)command;
serviceLocalBrokerInfo(command);
} else if (command.isShutdownInfo()) {
LOG.info(configuration.getBrokerName() + " Shutting down");
@@ -713,16 +723,18 @@
public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
if (brokerPath != null) {
for (int i = 0; i < brokerPath.length; i++) {
- if (brokerId.equals(brokerPath[i]))
+ if (brokerId.equals(brokerPath[i])) {
return true;
+ }
}
}
return false;
}
protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId[] pathsToAppend) {
- if (brokerPath == null || brokerPath.length == 0)
+ if (brokerPath == null || brokerPath.length == 0) {
return pathsToAppend;
+ }
BrokerId rc[] = new BrokerId[brokerPath.length + pathsToAppend.length];
System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
System.arraycopy(pathsToAppend, 0, rc, brokerPath.length, pathsToAppend.length);
@@ -730,8 +742,9 @@
}
protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
- if (brokerPath == null || brokerPath.length == 0)
+ if (brokerPath == null || brokerPath.length == 0) {
return new BrokerId[] {idToAppend};
+ }
BrokerId rc[] = new BrokerId[brokerPath.length + 1];
System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
rc[brokerPath.length] = idToAppend;
@@ -781,8 +794,9 @@
} catch (IOException e) {
LOG.error("Failed to add static destination " + dest, e);
}
- if (LOG.isTraceEnabled())
+ if (LOG.isTraceEnabled()) {
LOG.trace("Forwarding messages for static destination: " + dest);
+ }
}
}
}
@@ -850,11 +864,12 @@
}
protected void removeDemandSubscription(ConsumerId id) throws IOException {
- DemandSubscription sub = (DemandSubscription)subscriptionMapByRemoteId.remove(id);
+ DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
if (sub != null) {
removeSubscription(sub);
- if (LOG.isTraceEnabled())
+ if (LOG.isTraceEnabled()) {
LOG.trace("removing sub on " + localBroker + " from " + remoteBrokerName + " : " + sub.getRemoteInfo());
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Fri Aug 10 09:57:01 2007
@@ -38,14 +38,13 @@
* available and setup a connection to each available remote broker
*
* @org.apache.xbean.XBean element="networkConnector"
- *
* @version $Revision$
*/
public class DiscoveryNetworkConnector extends NetworkConnector implements DiscoveryListener {
private static final Log LOG = LogFactory.getLog(DiscoveryNetworkConnector.class);
private DiscoveryAgent discoveryAgent;
- private ConcurrentHashMap bridges = new ConcurrentHashMap();
+ private ConcurrentHashMap<URI, NetworkBridge> bridges = new ConcurrentHashMap<URI, NetworkBridge>();
public DiscoveryNetworkConnector() {
}
@@ -61,8 +60,9 @@
public void onServiceAdd(DiscoveryEvent event) {
String localURIName = localURI.getScheme() + "://" + localURI.getHost();
// Ignore events once we start stopping.
- if (serviceSupport.isStopped() || serviceSupport.isStopping())
+ if (serviceSupport.isStopped() || serviceSupport.isStopping()) {
return;
+ }
String url = event.getServiceName();
if (url != null) {
URI uri;
@@ -73,9 +73,9 @@
return;
}
// Should we try to connect to that URI?
- if (bridges.containsKey(uri) || localURI.equals(uri)
- || (connectionFilter != null && !connectionFilter.connectTo(uri)))
+ if (bridges.containsKey(uri) || localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) {
return;
+ }
URI connectUri = uri;
LOG.info("Establishing network connection between from " + localURIName + " to " + connectUri);
Transport remoteTransport;
@@ -102,8 +102,7 @@
} catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
- LOG.warn("Could not start network bridge between: " + localURIName + " and: " + uri
- + " due to: " + e);
+ LOG.warn("Could not start network bridge between: " + localURIName + " and: " + uri + " due to: " + e);
LOG.debug("Start failure exception: " + e, e);
try {
discoveryAgent.serviceFailed(event);
@@ -125,9 +124,10 @@
return;
}
- NetworkBridge bridge = (NetworkBridge)bridges.remove(uri);
- if (bridge == null)
+ NetworkBridge bridge = bridges.remove(uri);
+ if (bridge == null) {
return;
+ }
ServiceSupport.dispose(bridge);
}
@@ -154,8 +154,8 @@
}
protected void handleStop(ServiceStopper stopper) throws Exception {
- for (Iterator i = bridges.values().iterator(); i.hasNext();) {
- NetworkBridge bridge = (NetworkBridge)i.next();
+ for (Iterator<NetworkBridge> i = bridges.values().iterator(); i.hasNext();) {
+ NetworkBridge bridge = i.next();
try {
bridge.stop();
} catch (Exception e) {
@@ -171,8 +171,7 @@
super.handleStop(stopper);
}
- protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport,
- final DiscoveryEvent event) {
+ protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
NetworkBridgeListener listener = new NetworkBridgeListener() {
public void bridgeFailed() {
@@ -194,8 +193,7 @@
}
};
- DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport,
- remoteTransport, listener);
+ DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener);
return configureBridge(result);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java Fri Aug 10 09:57:01 2007
@@ -68,8 +68,9 @@
} catch (IOException e) {
LOG.error("Failed to add static destination " + dest, e);
}
- if (LOG.isTraceEnabled())
+ if (LOG.isTraceEnabled()) {
LOG.trace("Forwarding messages for durable destination: " + dest);
+ }
}
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java Fri Aug 10 09:57:01 2007
@@ -55,33 +55,29 @@
*/
public class ForwardingBridge implements Service {
+ private static final IdGenerator ID_GENERATOR = new IdGenerator();
private static final Log LOG = LogFactory.getLog(ForwardingBridge.class);
- private final Transport localBroker;
- private final Transport remoteBroker;
-
- IdGenerator idGenerator = new IdGenerator();
+ final AtomicLong enqueueCounter = new AtomicLong();
+ final AtomicLong dequeueCounter = new AtomicLong();
ConnectionInfo connectionInfo;
SessionInfo sessionInfo;
ProducerInfo producerInfo;
ConsumerInfo queueConsumerInfo;
ConsumerInfo topicConsumerInfo;
+ BrokerId localBrokerId;
+ BrokerId remoteBrokerId;
+ BrokerInfo localBrokerInfo;
+ BrokerInfo remoteBrokerInfo;
+ private final Transport localBroker;
+ private final Transport remoteBroker;
private String clientId;
private int prefetchSize = 1000;
private boolean dispatchAsync;
private String destinationFilter = ">";
-
- BrokerId localBrokerId;
- BrokerId remoteBrokerId;
private NetworkBridgeListener bridgeFailedListener;
- BrokerInfo localBrokerInfo;
- BrokerInfo remoteBrokerInfo;
-
- final AtomicLong enqueueCounter = new AtomicLong();
- final AtomicLong dequeueCounter = new AtomicLong();
-
public ForwardingBridge(Transport localBroker, Transport remoteBroker) {
this.localBroker = localBroker;
this.remoteBroker = remoteBroker;
@@ -135,7 +131,7 @@
*/
final void startBridge() throws IOException {
connectionInfo = new ConnectionInfo();
- connectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
+ connectionInfo.setConnectionId(new ConnectionId(ID_GENERATOR.generateId()));
connectionInfo.setClientId(clientId);
localBroker.oneway(connectionInfo);
remoteBroker.oneway(connectionInfo);
@@ -196,7 +192,7 @@
try {
if (command.isBrokerInfo()) {
synchronized (this) {
- remoteBrokerInfo = ((BrokerInfo)command);
+ remoteBrokerInfo = (BrokerInfo)command;
remoteBrokerId = remoteBrokerInfo.getBrokerId();
if (localBrokerId != null) {
if (localBrokerId.equals(remoteBrokerId)) {
@@ -231,8 +227,9 @@
Message message = md.getMessage();
message.setProducerId(producerInfo.getProducerId());
- if (message.getOriginalTransactionId() == null)
+ if (message.getOriginalTransactionId() == null) {
message.setOriginalTransactionId(message.getTransactionId());
+ }
message.setTransactionId(null);
if (!message.isResponseRequired()) {
@@ -299,7 +296,7 @@
// }
} else if (command.isBrokerInfo()) {
synchronized (this) {
- localBrokerInfo = ((BrokerInfo)command);
+ localBrokerInfo = (BrokerInfo)command;
localBrokerId = localBrokerInfo.getBrokerId();
if (remoteBrokerId != null) {
if (remoteBrokerId.equals(localBrokerId)) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java Fri Aug 10 09:57:01 2007
@@ -18,8 +18,9 @@
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Hashtable;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -47,14 +48,7 @@
private static final Log LOG = LogFactory.getLog(NetworkConnector.class);
protected URI localURI;
- private Set durableDestinations;
- private List excludedDestinations = new CopyOnWriteArrayList();
- private List dynamicallyIncludedDestinations = new CopyOnWriteArrayList();
- private List staticallyIncludedDestinations = new CopyOnWriteArrayList();
protected ConnectionFilter connectionFilter;
- private BrokerService brokerService;
- private ObjectName objectName;
-
protected ServiceSupport serviceSupport = new ServiceSupport() {
protected void doStart() throws Exception {
@@ -66,6 +60,13 @@
}
};
+ private Set<ActiveMQDestination> durableDestinations;
+ private List<ActiveMQDestination> excludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
+ private List<ActiveMQDestination> dynamicallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
+ private List<ActiveMQDestination> staticallyIncludedDestinations = new CopyOnWriteArrayList<ActiveMQDestination>();
+ private BrokerService brokerService;
+ private ObjectName objectName;
+
public NetworkConnector() {
}
@@ -91,21 +92,21 @@
/**
* @param durableDestinations The durableDestinations to set.
*/
- public void setDurableDestinations(Set durableDestinations) {
+ public void setDurableDestinations(Set<ActiveMQDestination> durableDestinations) {
this.durableDestinations = durableDestinations;
}
/**
* @return Returns the excludedDestinations.
*/
- public List getExcludedDestinations() {
+ public List<ActiveMQDestination> getExcludedDestinations() {
return excludedDestinations;
}
/**
* @param excludedDestinations The excludedDestinations to set.
*/
- public void setExcludedDestinations(List excludedDestinations) {
+ public void setExcludedDestinations(List<ActiveMQDestination> excludedDestinations) {
this.excludedDestinations = excludedDestinations;
}
@@ -116,7 +117,7 @@
/**
* @return Returns the staticallyIncludedDestinations.
*/
- public List getStaticallyIncludedDestinations() {
+ public List<ActiveMQDestination> getStaticallyIncludedDestinations() {
return staticallyIncludedDestinations;
}
@@ -124,7 +125,7 @@
* @param staticallyIncludedDestinations The staticallyIncludedDestinations
* to set.
*/
- public void setStaticallyIncludedDestinations(List staticallyIncludedDestinations) {
+ public void setStaticallyIncludedDestinations(List<ActiveMQDestination> staticallyIncludedDestinations) {
this.staticallyIncludedDestinations = staticallyIncludedDestinations;
}
@@ -135,7 +136,7 @@
/**
* @return Returns the dynamicallyIncludedDestinations.
*/
- public List getDynamicallyIncludedDestinations() {
+ public List<ActiveMQDestination> getDynamicallyIncludedDestinations() {
return dynamicallyIncludedDestinations;
}
@@ -143,7 +144,7 @@
* @param dynamicallyIncludedDestinations The
* dynamicallyIncludedDestinations to set.
*/
- public void setDynamicallyIncludedDestinations(List dynamicallyIncludedDestinations) {
+ public void setDynamicallyIncludedDestinations(List<ActiveMQDestination> dynamicallyIncludedDestinations) {
this.dynamicallyIncludedDestinations = dynamicallyIncludedDestinations;
}
@@ -162,15 +163,14 @@
// Implementation methods
// -------------------------------------------------------------------------
protected NetworkBridge configureBridge(DemandForwardingBridgeSupport result) {
- List destsList = getDynamicallyIncludedDestinations();
- ActiveMQDestination dests[] = (ActiveMQDestination[])destsList
- .toArray(new ActiveMQDestination[destsList.size()]);
+ List<ActiveMQDestination> destsList = getDynamicallyIncludedDestinations();
+ ActiveMQDestination dests[] = destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setDynamicallyIncludedDestinations(dests);
destsList = getExcludedDestinations();
- dests = (ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
+ dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setExcludedDestinations(dests);
destsList = getStaticallyIncludedDestinations();
- dests = (ActiveMQDestination[])destsList.toArray(new ActiveMQDestination[destsList.size()]);
+ dests = destsList.toArray(new ActiveMQDestination[destsList.size()]);
result.setStaticallyIncludedDestinations(dests);
if (durableDestinations != null) {
ActiveMQDestination[] dest = new ActiveMQDestination[durableDestinations.size()];
@@ -222,8 +222,9 @@
}
protected void registerNetworkBridgeMBean(NetworkBridge bridge) {
- if (!getBrokerService().isUseJmx())
+ if (!getBrokerService().isUseJmx()) {
return;
+ }
MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer();
if (mbeanServer != null) {
@@ -238,8 +239,9 @@
}
protected void unregisterNetworkBridgeMBean(NetworkBridge bridge) {
- if (!getBrokerService().isUseJmx())
+ if (!getBrokerService().isUseJmx()) {
return;
+ }
MBeanServer mbeanServer = getBrokerService().getManagementContext().getMBeanServer();
if (mbeanServer != null) {
@@ -252,22 +254,13 @@
}
}
- protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge)
- throws MalformedObjectNameException {
+ @SuppressWarnings("unchecked")
+ protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
ObjectName connectorName = getObjectName();
- Hashtable map = connectorName.getKeyPropertyList();
- return new ObjectName(connectorName.getDomain()
- + ":"
- + "BrokerName="
- + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName"))
- + ","
- + "Type=NetworkBridge,"
- + "NetworkConnectorName="
- + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName"))
- + ","
- + "Name="
- + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge
- .getRemoteAddress())));
+ Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList());
+ return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName")) + "," + "Type=NetworkBridge,"
+ + "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName")) + "," + "Name="
+ + JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress())));
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java Fri Aug 10 09:57:01 2007
@@ -44,13 +44,13 @@
*/
public abstract class JmsConnector implements Service {
+ private static int nextId;
private static final Log LOG = LogFactory.getLog(JmsConnector.class);
+
protected JndiTemplate jndiLocalTemplate;
protected JndiTemplate jndiOutboundTemplate;
protected JmsMesageConvertor inboundMessageConvertor;
protected JmsMesageConvertor outboundMessageConvertor;
- private List inboundBridges = new CopyOnWriteArrayList();
- private List outboundBridges = new CopyOnWriteArrayList();
protected AtomicBoolean initialized = new AtomicBoolean(false);
protected AtomicBoolean started = new AtomicBoolean(false);
protected ActiveMQConnectionFactory embeddedConnectionFactory;
@@ -59,9 +59,12 @@
protected String outboundPassword;
protected String localUsername;
protected String localPassword;
+ protected LRUCache replyToBridges = createLRUCache();
+
+ private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
+ private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
private String name;
- protected LRUCache replyToBridges = createLRUCache();
private static LRUCache createLRUCache() {
return new LRUCache() {
@@ -111,11 +114,11 @@
init();
if (started.compareAndSet(false, true)) {
for (int i = 0; i < inboundBridges.size(); i++) {
- DestinationBridge bridge = (DestinationBridge)inboundBridges.get(i);
+ DestinationBridge bridge = inboundBridges.get(i);
bridge.start();
}
for (int i = 0; i < outboundBridges.size(); i++) {
- DestinationBridge bridge = (DestinationBridge)outboundBridges.get(i);
+ DestinationBridge bridge = outboundBridges.get(i);
bridge.start();
}
LOG.info("JMS Connector " + getName() + " Started");
@@ -125,11 +128,11 @@
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
for (int i = 0; i < inboundBridges.size(); i++) {
- DestinationBridge bridge = (DestinationBridge)inboundBridges.get(i);
+ DestinationBridge bridge = inboundBridges.get(i);
bridge.stop();
}
for (int i = 0; i < outboundBridges.size(); i++) {
- DestinationBridge bridge = (DestinationBridge)outboundBridges.get(i);
+ DestinationBridge bridge = outboundBridges.get(i);
bridge.stop();
}
LOG.info("JMS Connector " + getName() + " Stopped");
@@ -297,8 +300,6 @@
}
return name;
}
-
- static int nextId;
private static synchronized int getNextId() {
return nextId++;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/BooleanStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/BooleanStream.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/BooleanStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/BooleanStream.java Fri Aug 10 09:57:01 2007
@@ -51,7 +51,7 @@
}
}
if (value) {
- data[arrayPos] |= (0x01 << bytePos);
+ data[arrayPos] |= 0x01 << bytePos;
}
bytePos++;
if (bytePos >= 8) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/OpenWireFormat.java Fri Aug 10 09:57:01 2007
@@ -21,17 +21,16 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.HashMap;
+import java.util.Map;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.MarshallAware;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.ClassLoading;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
-import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.wireformat.WireFormat;
/**
@@ -48,16 +47,16 @@
private DataStreamMarshaller dataMarshallers[];
private int version;
- private boolean stackTraceEnabled = false;
- private boolean tcpNoDelayEnabled = false;
- private boolean cacheEnabled = false;
- private boolean tightEncodingEnabled = false;
- private boolean sizePrefixDisabled = false;
+ private boolean stackTraceEnabled;
+ private boolean tcpNoDelayEnabled;
+ private boolean cacheEnabled;
+ private boolean tightEncodingEnabled;
+ private boolean sizePrefixDisabled;
// The following fields are used for value caching
- private short nextMarshallCacheIndex = 0;
- private short nextMarshallCacheEvictionIndex = 0;
- private HashMap marshallCacheMap = new HashMap();
+ private short nextMarshallCacheIndex;
+ private short nextMarshallCacheEvictionIndex;
+ private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
private DataStructure marshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
private DataStructure unmarshallCache[] = new DataStructure[MARSHAL_CACHE_SIZE];
private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
@@ -92,16 +91,15 @@
}
public boolean equals(Object object) {
- if (object == null)
+ if (object == null) {
return false;
+ }
OpenWireFormat o = (OpenWireFormat)object;
return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
&& o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
&& o.sizePrefixDisabled == sizePrefixDisabled;
}
- static IdGenerator g = new IdGenerator();
- String id = g.generateId();
public String toString() {
return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
@@ -120,12 +118,12 @@
runMarshallCacheEvictionSweep();
}
- MarshallAware ma = null;
- // If not using value caching, then the marshaled form is always the
- // same
- if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
- ma = (MarshallAware)command;
- }
+// MarshallAware ma = null;
+// // If not using value caching, then the marshaled form is always the
+// // same
+// if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
+// ma = (MarshallAware)command;
+// }
ByteSequence sequence = null;
// if( ma!=null ) {
@@ -140,9 +138,9 @@
DataStructure c = (DataStructure)command;
byte type = c.getDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
- if (dsm == null)
+ if (dsm == null) {
throw new IOException("Unknown data type: " + type);
-
+ }
if (tightEncodingEnabled) {
BooleanStream bs = new BooleanStream();
@@ -223,9 +221,9 @@
DataStructure c = (DataStructure)o;
byte type = c.getDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
- if (dsm == null)
+ if (dsm == null) {
throw new IOException("Unknown data type: " + type);
-
+ }
if (tightEncodingEnabled) {
BooleanStream bs = new BooleanStream();
size += dsm.tightMarshal1(this, c, bs);
@@ -267,7 +265,8 @@
public Object unmarshal(DataInput dis) throws IOException {
DataInput dataIn = dis;
if (!sizePrefixDisabled) {
- int size = dis.readInt();
+ dis.readInt();
+ // int size = dis.readInt();
// byte[] data = new byte[size];
// dis.readFully(data);
// bytesIn.restart(data);
@@ -285,8 +284,9 @@
DataStructure c = (DataStructure)o;
byte type = c.getDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
- if (dsm == null)
+ if (dsm == null) {
throw new IOException("Unknown data type: " + type);
+ }
size += dsm.tightMarshal1(this, c, bs);
size += bs.marshalledSize();
@@ -307,9 +307,9 @@
DataStructure c = (DataStructure)o;
byte type = c.getDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
- if (dsm == null)
+ if (dsm == null) {
throw new IOException("Unknown data type: " + type);
-
+ }
ds.writeByte(type);
bs.marshal(ds);
dsm.tightMarshal2(this, c, ds, bs);
@@ -351,8 +351,9 @@
byte dataType = dis.readByte();
if (dataType != NULL_TYPE) {
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
- if (dsm == null)
+ if (dsm == null) {
throw new IOException("Unknown data type: " + dataType);
+ }
Object data = dsm.createObject();
if (this.tightEncodingEnabled) {
BooleanStream bs = new BooleanStream();
@@ -373,11 +374,12 @@
// }
public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
bs.writeBoolean(o != null);
- if (o == null)
+ if (o == null) {
return 0;
+ }
if (o.isMarshallAware()) {
- MarshallAware ma = (MarshallAware)o;
+ // MarshallAware ma = (MarshallAware)o;
ByteSequence sequence = null;
// sequence=ma.getCachedMarshalledForm(this);
bs.writeBoolean(sequence != null);
@@ -388,15 +390,17 @@
byte type = o.getDataStructureType();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
- if (dsm == null)
+ if (dsm == null) {
throw new IOException("Unknown data type: " + type);
+ }
return 1 + dsm.tightMarshal1(this, o, bs);
}
public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
throws IOException {
- if (!bs.readBoolean())
+ if (!bs.readBoolean()) {
return;
+ }
byte type = o.getDataStructureType();
ds.writeByte(type);
@@ -413,8 +417,9 @@
} else {
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
- if (dsm == null)
+ if (dsm == null) {
throw new IOException("Unknown data type: " + type);
+ }
dsm.tightMarshal2(this, o, ds, bs);
}
@@ -425,8 +430,9 @@
byte dataType = dis.readByte();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
- if (dsm == null)
+ if (dsm == null) {
throw new IOException("Unknown data type: " + dataType);
+ }
DataStructure data = dsm.createObject();
if (data.isMarshallAware() && bs.readBoolean()) {
@@ -457,8 +463,9 @@
byte dataType = dis.readByte();
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
- if (dsm == null)
+ if (dsm == null) {
throw new IOException("Unknown data type: " + dataType);
+ }
DataStructure data = dsm.createObject();
dsm.looseUnmarshal(this, data, dis);
return data;
@@ -474,8 +481,9 @@
byte type = o.getDataStructureType();
dataOut.writeByte(type);
DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
- if (dsm == null)
+ if (dsm == null) {
throw new IOException("Unknown data type: " + type);
+ }
dsm.looseMarshal(this, o, dataOut);
}
}
@@ -496,7 +504,7 @@
}
public Short getMarshallCacheIndex(DataStructure o) {
- return (Short)marshallCacheMap.get(o);
+ return marshallCacheMap.get(o);
}
public Short addToMarshallCache(DataStructure o) {
@@ -522,8 +530,9 @@
// There was no space left in the cache, so we can't
// put this in the cache.
- if (index == -1)
+ if (index == -1) {
return;
+ }
unmarshallCache[index] = o;
}
@@ -582,8 +591,9 @@
public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
- if (preferedWireFormatInfo == null)
+ if (preferedWireFormatInfo == null) {
throw new IllegalStateException("Wireformat cannot not be renegotiated.");
+ }
this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
info.setVersion(this.getVersion());
@@ -618,7 +628,7 @@
unmarshallCache = new DataStructure[size];
nextMarshallCacheIndex = 0;
nextMarshallCacheEvictionIndex = 0;
- marshallCacheMap = new HashMap();
+ marshallCacheMap = new HashMap<DataStructure, Short>();
} else {
marshallCache = null;
unmarshallCache = null;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/MessageMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/MessageMarshaller.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/MessageMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v2/MessageMarshaller.java Fri Aug 10 09:57:01 2007
@@ -26,24 +26,19 @@
import org.apache.activemq.openwire.BooleanStream;
import org.apache.activemq.openwire.OpenWireFormat;
-
-
/**
- * Marshalling code for Open Wire Format for MessageMarshaller
- *
- *
- * NOTE!: This file is auto generated - do not modify!
- * if you need to make a change, please see the modify the groovy scripts in the
- * under src/gram/script and then use maven openwire:generate to regenerate
- * this file.
- *
+ * Marshalling code for Open Wire Format for MessageMarshaller NOTE!: This file
+ * is auto generated - do not modify! if you need to make a change, please see
+ * the modify the groovy scripts in the under src/gram/script and then use maven
+ * openwire:generate to regenerate this file.
+ *
* @version $Revision$
*/
public abstract class MessageMarshaller extends BaseCommandMarshaller {
/**
* Un-marshal an object instance from the data input stream
- *
+ *
* @param o the object to un-marshal
* @param dataIn the data input stream to build the object from
* @throws IOException
@@ -54,38 +49,37 @@
Message info = (Message)o;
info.beforeUnmarshall(wireFormat);
-
- info.setProducerId((org.apache.activemq.command.ProducerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
- info.setDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
- info.setTransactionId((org.apache.activemq.command.TransactionId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
- info.setOriginalDestination((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
- info.setMessageId((org.apache.activemq.command.MessageId) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
- info.setOriginalTransactionId((org.apache.activemq.command.TransactionId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
+
+ info.setProducerId((org.apache.activemq.command.ProducerId)tightUnmarsalCachedObject(wireFormat, dataIn, bs));
+ info.setDestination((org.apache.activemq.command.ActiveMQDestination)tightUnmarsalCachedObject(wireFormat, dataIn, bs));
+ info.setTransactionId((org.apache.activemq.command.TransactionId)tightUnmarsalCachedObject(wireFormat, dataIn, bs));
+ info.setOriginalDestination((org.apache.activemq.command.ActiveMQDestination)tightUnmarsalCachedObject(wireFormat, dataIn, bs));
+ info.setMessageId((org.apache.activemq.command.MessageId)tightUnmarsalNestedObject(wireFormat, dataIn, bs));
+ info.setOriginalTransactionId((org.apache.activemq.command.TransactionId)tightUnmarsalCachedObject(wireFormat, dataIn, bs));
info.setGroupID(tightUnmarshalString(dataIn, bs));
info.setGroupSequence(dataIn.readInt());
info.setCorrelationId(tightUnmarshalString(dataIn, bs));
info.setPersistent(bs.readBoolean());
info.setExpiration(tightUnmarshalLong(wireFormat, dataIn, bs));
info.setPriority(dataIn.readByte());
- info.setReplyTo((org.apache.activemq.command.ActiveMQDestination) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
+ info.setReplyTo((org.apache.activemq.command.ActiveMQDestination)tightUnmarsalNestedObject(wireFormat, dataIn, bs));
info.setTimestamp(tightUnmarshalLong(wireFormat, dataIn, bs));
info.setType(tightUnmarshalString(dataIn, bs));
info.setContent(tightUnmarshalByteSequence(dataIn, bs));
info.setMarshalledProperties(tightUnmarshalByteSequence(dataIn, bs));
- info.setDataStructure((org.apache.activemq.command.DataStructure) tightUnmarsalNestedObject(wireFormat, dataIn, bs));
- info.setTargetConsumerId((org.apache.activemq.command.ConsumerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
+ info.setDataStructure((org.apache.activemq.command.DataStructure)tightUnmarsalNestedObject(wireFormat, dataIn, bs));
+ info.setTargetConsumerId((org.apache.activemq.command.ConsumerId)tightUnmarsalCachedObject(wireFormat, dataIn, bs));
info.setCompressed(bs.readBoolean());
info.setRedeliveryCounter(dataIn.readInt());
if (bs.readBoolean()) {
short size = dataIn.readShort();
org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
- for( int i=0; i < size; i++ ) {
- value[i] = (org.apache.activemq.command.BrokerId) tightUnmarsalNestedObject(wireFormat,dataIn, bs);
+ for (int i = 0; i < size; i++) {
+ value[i] = (org.apache.activemq.command.BrokerId)tightUnmarsalNestedObject(wireFormat, dataIn, bs);
}
info.setBrokerPath(value);
- }
- else {
+ } else {
info.setBrokerPath(null);
}
info.setArrival(tightUnmarshalLong(wireFormat, dataIn, bs));
@@ -97,7 +91,6 @@
}
-
/**
* Write the booleans that this object uses to a BooleanStream
*/
@@ -117,9 +110,9 @@
rc += tightMarshalString1(info.getGroupID(), bs);
rc += tightMarshalString1(info.getCorrelationId(), bs);
bs.writeBoolean(info.isPersistent());
- rc+=tightMarshalLong1(wireFormat, info.getExpiration(), bs);
+ rc += tightMarshalLong1(wireFormat, info.getExpiration(), bs);
rc += tightMarshalNestedObject1(wireFormat, (DataStructure)info.getReplyTo(), bs);
- rc+=tightMarshalLong1(wireFormat, info.getTimestamp(), bs);
+ rc += tightMarshalLong1(wireFormat, info.getTimestamp(), bs);
rc += tightMarshalString1(info.getType(), bs);
rc += tightMarshalByteSequence1(info.getContent(), bs);
rc += tightMarshalByteSequence1(info.getMarshalledProperties(), bs);
@@ -127,7 +120,7 @@
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getTargetConsumerId(), bs);
bs.writeBoolean(info.isCompressed());
rc += tightMarshalObjectArray1(wireFormat, info.getBrokerPath(), bs);
- rc+=tightMarshalLong1(wireFormat, info.getArrival(), bs);
+ rc += tightMarshalLong1(wireFormat, info.getArrival(), bs);
rc += tightMarshalString1(info.getUserID(), bs);
bs.writeBoolean(info.isRecievedByDFBridge());
bs.writeBoolean(info.isDroppable());
@@ -137,7 +130,7 @@
/**
* Write a object instance to data output stream
- *
+ *
* @param o the instance to be marshaled
* @param dataOut the output stream
* @throws IOException thrown if an error occurs
@@ -179,7 +172,7 @@
/**
* Un-marshal an object instance from the data input stream
- *
+ *
* @param o the object to un-marshal
* @param dataIn the data input stream to build the object from
* @throws IOException
@@ -190,38 +183,37 @@
Message info = (Message)o;
info.beforeUnmarshall(wireFormat);
-
- info.setProducerId((org.apache.activemq.command.ProducerId) looseUnmarsalCachedObject(wireFormat, dataIn));
- info.setDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));
- info.setTransactionId((org.apache.activemq.command.TransactionId) looseUnmarsalCachedObject(wireFormat, dataIn));
- info.setOriginalDestination((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalCachedObject(wireFormat, dataIn));
- info.setMessageId((org.apache.activemq.command.MessageId) looseUnmarsalNestedObject(wireFormat, dataIn));
- info.setOriginalTransactionId((org.apache.activemq.command.TransactionId) looseUnmarsalCachedObject(wireFormat, dataIn));
+
+ info.setProducerId((org.apache.activemq.command.ProducerId)looseUnmarsalCachedObject(wireFormat, dataIn));
+ info.setDestination((org.apache.activemq.command.ActiveMQDestination)looseUnmarsalCachedObject(wireFormat, dataIn));
+ info.setTransactionId((org.apache.activemq.command.TransactionId)looseUnmarsalCachedObject(wireFormat, dataIn));
+ info.setOriginalDestination((org.apache.activemq.command.ActiveMQDestination)looseUnmarsalCachedObject(wireFormat, dataIn));
+ info.setMessageId((org.apache.activemq.command.MessageId)looseUnmarsalNestedObject(wireFormat, dataIn));
+ info.setOriginalTransactionId((org.apache.activemq.command.TransactionId)looseUnmarsalCachedObject(wireFormat, dataIn));
info.setGroupID(looseUnmarshalString(dataIn));
info.setGroupSequence(dataIn.readInt());
info.setCorrelationId(looseUnmarshalString(dataIn));
info.setPersistent(dataIn.readBoolean());
info.setExpiration(looseUnmarshalLong(wireFormat, dataIn));
info.setPriority(dataIn.readByte());
- info.setReplyTo((org.apache.activemq.command.ActiveMQDestination) looseUnmarsalNestedObject(wireFormat, dataIn));
+ info.setReplyTo((org.apache.activemq.command.ActiveMQDestination)looseUnmarsalNestedObject(wireFormat, dataIn));
info.setTimestamp(looseUnmarshalLong(wireFormat, dataIn));
info.setType(looseUnmarshalString(dataIn));
info.setContent(looseUnmarshalByteSequence(dataIn));
info.setMarshalledProperties(looseUnmarshalByteSequence(dataIn));
- info.setDataStructure((org.apache.activemq.command.DataStructure) looseUnmarsalNestedObject(wireFormat, dataIn));
- info.setTargetConsumerId((org.apache.activemq.command.ConsumerId) looseUnmarsalCachedObject(wireFormat, dataIn));
+ info.setDataStructure((org.apache.activemq.command.DataStructure)looseUnmarsalNestedObject(wireFormat, dataIn));
+ info.setTargetConsumerId((org.apache.activemq.command.ConsumerId)looseUnmarsalCachedObject(wireFormat, dataIn));
info.setCompressed(dataIn.readBoolean());
info.setRedeliveryCounter(dataIn.readInt());
if (dataIn.readBoolean()) {
short size = dataIn.readShort();
org.apache.activemq.command.BrokerId value[] = new org.apache.activemq.command.BrokerId[size];
- for( int i=0; i < size; i++ ) {
- value[i] = (org.apache.activemq.command.BrokerId) looseUnmarsalNestedObject(wireFormat,dataIn);
+ for (int i = 0; i < size; i++) {
+ value[i] = (org.apache.activemq.command.BrokerId)looseUnmarsalNestedObject(wireFormat, dataIn);
}
info.setBrokerPath(value);
- }
- else {
+ } else {
info.setBrokerPath(null);
}
info.setArrival(looseUnmarshalLong(wireFormat, dataIn));
@@ -232,7 +224,6 @@
info.afterUnmarshall(wireFormat);
}
-
/**
* Write the booleans that this object uses to a BooleanStream
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionKey.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionKey.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionKey.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionKey.java Fri Aug 10 09:57:01 2007
@@ -69,7 +69,7 @@
if (o1 == o2) {
return true;
}
- return (o1 != null && o2 != null && o1.equals(o2));
+ return o1 != null && o2 != null && o1.equals(o2);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java Fri Aug 10 09:57:01 2007
@@ -96,7 +96,7 @@
public Session createSession(boolean transacted, int ackMode) throws JMSException {
try {
- boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
+ boolean isXa = transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION;
if (isXa) {
transacted = true;
ackMode = Session.SESSION_TRANSACTED;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnection.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnection.java Fri Aug 10 09:57:01 2007
@@ -66,8 +66,9 @@
}
try {
remoteTransport.oneway(command);
- if (shutdown)
+ if (shutdown) {
stop();
+ }
} catch (IOException error) {
onFailure(error);
} catch (Exception error) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/JaasCertificateAuthenticationBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/JaasCertificateAuthenticationBroker.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/JaasCertificateAuthenticationBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/JaasCertificateAuthenticationBroker.java Fri Aug 10 09:57:01 2007
@@ -33,69 +33,70 @@
import org.apache.activemq.jaas.UserPrincipal;
/**
- * A JAAS Authentication Broker that uses SSL Certificates.
- *
- * This class will provide the JAAS framework with a JaasCertificateCallbackHandler that will grant JAAS access to
- * incoming connections' SSL certificate chains.
- * NOTE: There is a chance that the incoming connection does not have a valid certificate (has null).
+ * A JAAS Authentication Broker that uses SSL Certificates. This class will
+ * provide the JAAS framework with a JaasCertificateCallbackHandler that will
+ * grant JAAS access to incoming connections' SSL certificate chains. NOTE:
+ * There is a chance that the incoming connection does not have a valid
+ * certificate (has null).
*
* @author sepandm@gmail.com (Sepand)
*/
public class JaasCertificateAuthenticationBroker extends BrokerFilter {
private final String jaasConfiguration;
-
+
/**
* Simple constructor. Leaves everything to superclass.
*
* @param next The Broker that does the actual work for this Filter.
- * @param jassConfiguration The JAAS domain configuration name (refere to JAAS documentation).
+ * @param jassConfiguration The JAAS domain configuration name (refere to
+ * JAAS documentation).
*/
public JaasCertificateAuthenticationBroker(Broker next, String jaasConfiguration) {
super(next);
-
- this.jaasConfiguration = jaasConfiguration;
+
+ this.jaasConfiguration = jaasConfiguration;
}
-
+
/**
* Overridden to allow for authentication based on client certificates.
- *
- * Connections being added will be authenticated based on their certificate chain and the JAAS module specified
- * through the JAAS framework.
- * NOTE: The security context's username will be set to the first UserPrincipal created by the login module.
+ * Connections being added will be authenticated based on their certificate
+ * chain and the JAAS module specified through the JAAS framework. NOTE: The
+ * security context's username will be set to the first UserPrincipal
+ * created by the login module.
*
* @param context The context for the incoming Connection.
- * @param info The ConnectionInfo Command representing the incoming connection.
+ * @param info The ConnectionInfo Command representing the incoming
+ * connection.
*/
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
if (context.getSecurityContext() == null) {
- if (!( info.getTransportContext() instanceof X509Certificate[] )) {
+ if (!(info.getTransportContext() instanceof X509Certificate[])) {
throw new SecurityException("Unable to authenticate transport without SSL certificate.");
}
-
- // Set the TCCL since it seems JAAS needs it to find the login module classes.
+
+ // Set the TCCL since it seems JAAS needs it to find the login
+ // module classes.
ClassLoader original = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(JaasAuthenticationBroker.class.getClassLoader());
try {
// Do the login.
try {
- CallbackHandler callback =
- new JaasCertificateCallbackHandler((X509Certificate[])info.getTransportContext());
+ CallbackHandler callback = new JaasCertificateCallbackHandler((X509Certificate[])info.getTransportContext());
LoginContext lc = new LoginContext(jaasConfiguration, callback);
lc.login();
Subject subject = lc.getSubject();
-
+
String dnName = "";
-
- for (Iterator iter = subject.getPrincipals().iterator(); iter.hasNext(); ) {
+
+ for (Iterator iter = subject.getPrincipals().iterator(); iter.hasNext();) {
Principal nextPrincipal = (Principal)iter.next();
if (nextPrincipal instanceof UserPrincipal) {
dnName = ((UserPrincipal)nextPrincipal).getName();
break;
}
}
- SecurityContext s = new JaasCertificateSecurityContext(
- dnName, subject, (X509Certificate[])info.getTransportContext());
+ SecurityContext s = new JaasCertificateSecurityContext(dnName, subject, (X509Certificate[])info.getTransportContext());
context.setSecurityContext(s);
} catch (Exception e) {
throw new SecurityException("User name or password is invalid: " + e.getMessage(), e);
@@ -106,13 +107,13 @@
}
super.addConnection(context, info);
}
-
+
/**
* Overriding removeConnection to make sure the security context is cleaned.
*/
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error) throws Exception {
super.removeConnection(context, info, error);
-
+
context.setSecurityContext(null);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/SimpleAuthenticationBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/SimpleAuthenticationBroker.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/SimpleAuthenticationBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/SimpleAuthenticationBroker.java Fri Aug 10 09:57:01 2007
@@ -35,7 +35,7 @@
private final Map userPasswords;
private final Map userGroups;
- private final CopyOnWriteArrayList securityContexts = new CopyOnWriteArrayList();
+ private final CopyOnWriteArrayList<SecurityContext> securityContexts = new CopyOnWriteArrayList<SecurityContext>();
public SimpleAuthenticationBroker(Broker next, Map userPasswords, Map userGroups) {
super(next);
@@ -48,8 +48,9 @@
if (context.getSecurityContext() == null) {
// Check the username and password.
String pw = (String)userPasswords.get(info.getUserName());
- if (pw == null || !pw.equals(info.getPassword()))
+ if (pw == null || !pw.equals(info.getPassword())) {
throw new SecurityException("User name or password is invalid.");
+ }
final Set groups = (Set)userGroups.get(info.getUserName());
SecurityContext s = new SecurityContext(info.getUserName()) {
@@ -77,8 +78,8 @@
* Refresh all the logged into users.
*/
public void refresh() {
- for (Iterator iter = securityContexts.iterator(); iter.hasNext();) {
- SecurityContext sc = (SecurityContext)iter.next();
+ for (Iterator<SecurityContext> iter = securityContexts.iterator(); iter.hasNext();) {
+ SecurityContext sc = iter.next();
sc.getAuthorizedReadDests().clear();
sc.getAuthorizedWriteDests().clear();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/SimpleAuthenticationPlugin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/SimpleAuthenticationPlugin.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/SimpleAuthenticationPlugin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/security/SimpleAuthenticationPlugin.java Fri Aug 10 09:57:01 2007
@@ -39,8 +39,8 @@
* @version $Revision$
*/
public class SimpleAuthenticationPlugin implements BrokerPlugin {
- private Map userPasswords;
- private Map userGroups;
+ private Map<String, String> userPasswords;
+ private Map<String, Set<GroupPrincipal>> userGroups;
public SimpleAuthenticationPlugin() {
}
@@ -53,7 +53,7 @@
return new SimpleAuthenticationBroker(broker, userPasswords, userGroups);
}
- public Map getUserGroups() {
+ public Map<String, Set<GroupPrincipal>> getUserGroups() {
return userGroups;
}
@@ -63,12 +63,12 @@
* @org.apache.xbean.ElementType class="org.apache.activemq.security.AuthenticationUser"
*/
public void setUsers(List users) {
- userPasswords = new HashMap();
- userGroups = new HashMap();
+ userPasswords = new HashMap<String, String>();
+ userGroups = new HashMap<String, Set<GroupPrincipal>>();
for (Iterator it = users.iterator(); it.hasNext();) {
AuthenticationUser user = (AuthenticationUser)it.next();
userPasswords.put(user.getUsername(), user.getPassword());
- Set groups = new HashSet();
+ Set<GroupPrincipal> groups = new HashSet<GroupPrincipal>();
StringTokenizer iter = new StringTokenizer(user.getGroups(), ",");
while (iter.hasMoreTokens()) {
String name = iter.nextToken().trim();
@@ -82,18 +82,18 @@
* Sets the groups a user is in. The key is the user name and the value is a
* Set of groups
*/
- public void setUserGroups(Map userGroups) {
+ public void setUserGroups(Map<String, Set<GroupPrincipal>> userGroups) {
this.userGroups = userGroups;
}
- public Map getUserPasswords() {
+ public Map<String, String> getUserPasswords() {
return userPasswords;
}
/**
* Sets the map indexed by user name with the value the password
*/
- public void setUserPasswords(Map userPasswords) {
+ public void setUserPasswords(Map<String, String> userPasswords) {
this.userPasswords = userPasswords;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java Fri Aug 10 09:57:01 2007
@@ -36,9 +36,9 @@
public class ConnectionState {
ConnectionInfo info;
- private final ConcurrentHashMap transactions = new ConcurrentHashMap();
- private final ConcurrentHashMap sessions = new ConcurrentHashMap();
- private final List tempDestinations = Collections.synchronizedList(new ArrayList());
+ private final ConcurrentHashMap<TransactionId, TransactionState> transactions = new ConcurrentHashMap<TransactionId, TransactionState>();
+ private final ConcurrentHashMap<SessionId, SessionState> sessions = new ConcurrentHashMap<SessionId, SessionState>();
+ private final List<DestinationInfo> tempDestinations = Collections.synchronizedList(new ArrayList<DestinationInfo>());
private final AtomicBoolean shutdown = new AtomicBoolean(false);
public ConnectionState(ConnectionInfo info) {
@@ -65,8 +65,8 @@
}
public void removeTempDestination(ActiveMQDestination destination) {
- for (Iterator iter = tempDestinations.iterator(); iter.hasNext();) {
- DestinationInfo di = (DestinationInfo)iter.next();
+ for (Iterator<DestinationInfo> iter = tempDestinations.iterator(); iter.hasNext();) {
+ DestinationInfo di = iter.next();
if (di.getDestination().equals(destination)) {
iter.remove();
}
@@ -79,15 +79,15 @@
}
public TransactionState getTransactionState(TransactionId id) {
- return (TransactionState)transactions.get(id);
+ return transactions.get(id);
}
- public Collection getTransactionStates() {
+ public Collection<TransactionState> getTransactionStates() {
return transactions.values();
}
public TransactionState removeTransactionState(TransactionId id) {
- return (TransactionState)transactions.remove(id);
+ return transactions.remove(id);
}
public void addSession(SessionInfo info) {
@@ -96,38 +96,39 @@
}
public SessionState removeSession(SessionId id) {
- return (SessionState)sessions.remove(id);
+ return sessions.remove(id);
}
public SessionState getSessionState(SessionId id) {
- return (SessionState)sessions.get(id);
+ return sessions.get(id);
}
public ConnectionInfo getInfo() {
return info;
}
- public Set getSessionIds() {
+ public Set<SessionId> getSessionIds() {
return sessions.keySet();
}
- public List getTempDesinations() {
+ public List<DestinationInfo> getTempDesinations() {
return tempDestinations;
}
- public Collection getSessionStates() {
+ public Collection<SessionState> getSessionStates() {
return sessions.values();
}
private void checkShutdown() {
- if (shutdown.get())
+ if (shutdown.get()) {
throw new IllegalStateException("Disposed");
+ }
}
public void shutdown() {
if (shutdown.compareAndSet(false, true)) {
- for (Iterator iter = sessions.values().iterator(); iter.hasNext();) {
- SessionState ss = (SessionState)iter.next();
+ for (Iterator<SessionState> iter = sessions.values().iterator(); iter.hasNext();) {
+ SessionState ss = iter.next();
ss.shutdown();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?view=diff&rev=564679&r1=564678&r2=564679
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Fri Aug 10 09:57:01 2007
@@ -47,14 +47,14 @@
private static final Tracked TRACKED_RESPONSE_MARKER = new Tracked(null);
- private boolean trackTransactions = false;
+ protected final ConcurrentHashMap<ConnectionId, ConnectionState> connectionStates = new ConcurrentHashMap<ConnectionId, ConnectionState>();
+ private boolean trackTransactions;
private boolean restoreSessions = true;
private boolean restoreConsumers = true;
private boolean restoreProducers = true;
private boolean restoreTransaction = true;
- protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
private class RemoveTransactionAction implements Runnable {
private final TransactionInfo info;
@@ -65,7 +65,7 @@
public void run() {
ConnectionId connectionId = info.getConnectionId();
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
cs.removeTransactionState(info.getTransactionId());
}
}
@@ -89,16 +89,18 @@
public void restore(Transport transport) throws IOException {
// Restore the connections.
- for (Iterator iter = connectionStates.values().iterator(); iter.hasNext();) {
- ConnectionState connectionState = (ConnectionState)iter.next();
+ for (Iterator<ConnectionState> iter = connectionStates.values().iterator(); iter.hasNext();) {
+ ConnectionState connectionState = iter.next();
transport.oneway(connectionState.getInfo());
restoreTempDestinations(transport, connectionState);
- if (restoreSessions)
+ if (restoreSessions) {
restoreSessions(transport, connectionState);
+ }
- if (restoreTransaction)
+ if (restoreTransaction) {
restoreTransactions(transport, connectionState);
+ }
}
}
@@ -123,11 +125,13 @@
SessionState sessionState = (SessionState)iter2.next();
transport.oneway(sessionState.getInfo());
- if (restoreProducers)
+ if (restoreProducers) {
restoreProducers(transport, sessionState);
+ }
- if (restoreConsumers)
+ if (restoreConsumers) {
restoreConsumers(transport, sessionState);
+ }
}
}
@@ -172,7 +176,7 @@
public Response processAddDestination(DestinationInfo info) {
if (info != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(info.getConnectionId());
+ ConnectionState cs = connectionStates.get(info.getConnectionId());
if (cs != null && info.getDestination().isTemporary()) {
cs.addTempDestination(info);
}
@@ -182,7 +186,7 @@
public Response processRemoveDestination(DestinationInfo info) {
if (info != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(info.getConnectionId());
+ ConnectionState cs = connectionStates.get(info.getConnectionId());
if (cs != null && info.getDestination().isTemporary()) {
cs.removeTempDestination(info.getDestination());
}
@@ -196,7 +200,7 @@
if (sessionId != null) {
ConnectionId connectionId = sessionId.getParentId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
SessionState ss = cs.getSessionState(sessionId);
if (ss != null) {
@@ -215,7 +219,7 @@
if (sessionId != null) {
ConnectionId connectionId = sessionId.getParentId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
SessionState ss = cs.getSessionState(sessionId);
if (ss != null) {
@@ -234,7 +238,7 @@
if (sessionId != null) {
ConnectionId connectionId = sessionId.getParentId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
SessionState ss = cs.getSessionState(sessionId);
if (ss != null) {
@@ -253,7 +257,7 @@
if (sessionId != null) {
ConnectionId connectionId = sessionId.getParentId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
SessionState ss = cs.getSessionState(sessionId);
if (ss != null) {
@@ -270,7 +274,7 @@
if (info != null) {
ConnectionId connectionId = info.getSessionId().getParentId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
cs.addSession(info);
}
@@ -283,7 +287,7 @@
if (id != null) {
ConnectionId connectionId = id.getParentId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
cs.removeSession(id);
}
@@ -310,7 +314,7 @@
if (trackTransactions && send != null && send.getTransactionId() != null) {
ConnectionId connectionId = send.getProducerId().getParentId().getParentId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
TransactionState transactionState = cs.getTransactionState(send.getTransactionId());
if (transactionState != null) {
@@ -327,7 +331,7 @@
if (trackTransactions && ack != null && ack.getTransactionId() != null) {
ConnectionId connectionId = ack.getConsumerId().getParentId().getParentId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
TransactionState transactionState = cs.getTransactionState(ack.getTransactionId());
if (transactionState != null) {
@@ -344,7 +348,7 @@
if (trackTransactions && info != null && info.getTransactionId() != null) {
ConnectionId connectionId = info.getConnectionId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
cs.addTransactionState(info.getTransactionId());
}
@@ -358,7 +362,7 @@
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if (transactionState != null) {
@@ -375,7 +379,7 @@
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if (transactionState != null) {
@@ -392,7 +396,7 @@
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if (transactionState != null) {
@@ -409,7 +413,7 @@
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if (transactionState != null) {
@@ -426,7 +430,7 @@
if (trackTransactions && info != null) {
ConnectionId connectionId = info.getConnectionId();
if (connectionId != null) {
- ConnectionState cs = (ConnectionState)connectionStates.get(connectionId);
+ ConnectionState cs = connectionStates.get(connectionId);
if (cs != null) {
TransactionState transactionState = cs.getTransactionState(info.getTransactionId());
if (transactionState != null) {