You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/11/29 23:05:46 UTC
svn commit: r1415406 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
Author: gtully
Date: Thu Nov 29 22:05:45 2012
New Revision: 1415406
URL: http://svn.apache.org/viewvc?rev=1415406&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4196 - fix order of removedest and removesubscription commands in network bridge async advisory processing - https://issues.apache.org/jira/browse/AMQ-3038 revisit - temp dest should not be deleted client side before close b/c open consumers will not get advisories due to no destination - just clearing the map is sufficient; the broker will deal with removal of the temp dest as part of the removeInfo
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1415406&r1=1415405&r2=1415406&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Thu Nov 29 22:05:45 2012
@@ -25,6 +25,9 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -137,6 +140,7 @@ public abstract class DemandForwardingBr
private TransportConnection duplexInitiatingConnection;
private BrokerService brokerService = null;
private ObjectName mbeanObjectName;
+ private ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
this.configuration = configuration;
@@ -355,6 +359,11 @@ public abstract class DemandForwardingBr
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
try {
+ serialExecutor.shutdown();
+ if (!serialExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ List<Runnable> pendingTasks = serialExecutor.shutdownNow();
+ LOG.info("pending tasks on stop" + pendingTasks);
+ }
localBroker.oneway(new ShutdownInfo());
remoteBroker.oneway(new ShutdownInfo());
} catch (Throwable e) {
@@ -594,7 +603,7 @@ public abstract class DemandForwardingBr
}
} else if (data.getClass() == DestinationInfo.class) {
// It's a destination info - we want to pass up information about temporary destinations
- DestinationInfo destInfo = (DestinationInfo) data;
+ final DestinationInfo destInfo = (DestinationInfo) data;
BrokerId[] path = destInfo.getBrokerPath();
if (path != null && path.length >= networkTTL) {
if (LOG.isDebugEnabled()) {
@@ -619,7 +628,20 @@ public abstract class DemandForwardingBr
if (LOG.isTraceEnabled()) {
LOG.trace(configuration.getBrokerName() + " bridging " + (destInfo.isAddOperation() ? "add" : "remove") + " destination on " + localBroker + " from " + remoteBrokerName + ", destination: " + destInfo);
}
- localBroker.oneway(destInfo);
+ if (destInfo.isRemoveOperation()) {
+ // serialise with removeSub operations such that all removeSub advisories are generated
+ serialExecutor.execute(new Runnable() {
+ public void run() {
+ try {
+ localBroker.oneway(destInfo);
+ } catch (IOException e) {
+ LOG.warn("failed to deliver remove command for destination:" + destInfo.getDestination(), e);
+ }
+ }
+ });
+ } else {
+ localBroker.oneway(destInfo);
+ }
} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);
@@ -658,7 +680,9 @@ public abstract class DemandForwardingBr
subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
// continue removal in separate thread to free up this thread for outstanding responses
- brokerService.getTaskRunnerFactory().execute(new Runnable() {
+ // serialise with removeDestination operations so that removeSubs are serialised with removeDestinations
+ // such that all removeSub advisories are generated
+ serialExecutor.execute(new Runnable() {
public void run() {
sub.waitForCompletion();
try {
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=1415406&r1=1415405&r2=1415406&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnection.java Thu Nov 29 22:05:45 2012
@@ -662,10 +662,7 @@ public class ActiveMQConnection implemen
c.dispose();
}
- // As TemporaryQueue and TemporaryTopic instances are bound
- // to a connection we should just delete them after the connection
- // is closed to free up memory
- cleanUpTempDestinations();
+ this.activeTempDestinations.clear();
if (isConnectionInfoSentToBroker) {
// If we announced ourselves to the broker.. Try to let the broker
@@ -2527,6 +2524,7 @@ public class ActiveMQConnection implemen
* Removes any TempDestinations that this connection has cached, ignoring
* any exceptions generated because the destination is in use as they should
* not be removed.
+ * Used from a pooled connection, b/c it will not be explicitly closed.
*/
public void cleanUpTempDestinations() {
@@ -2612,7 +2610,7 @@ public class ActiveMQConnection implemen
* Sets the amount of time between scheduled sends of any outstanding Message Acks for consumers that
* have been configured with optimizeAcknowledge enabled.
*
- * @param scheduledOptimizedAckInterval the scheduledOptimizedAckInterval to set
+ * @param optimizedAckScheduledAckInterval the scheduledOptimizedAckInterval to set
*/
public void setOptimizedAckScheduledAckInterval(long optimizedAckScheduledAckInterval) {
this.optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;