You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2013/05/17 16:27:48 UTC
svn commit: r1483827 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/network/
activemq-client/src/main/java/org/apache/activemq/
activemq-client/src/main/java/org/apache/activemq/command/
activemq-unit-tests/src/test/java/org/ap...
Author: dejanb
Date: Fri May 17 14:27:48 2013
New Revision: 1483827
URL: http://svn.apache.org/r1483827
Log:
https://issues.apache.org/jira/browse/AMQ-4000 - initial implementation of keeping track of durable subscribers over network and unregister them appropriately
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java Fri May 17 14:27:48 2013
@@ -24,6 +24,7 @@ import java.util.List;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.transport.Transport;
import org.slf4j.Logger;
@@ -84,7 +85,11 @@ public class ConduitBridge extends Deman
}
// add the interest in the subscription
if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
- ds.add(info.getConsumerId());
+ if (!info.isDurable()) {
+ ds.add(info.getConsumerId());
+ } else {
+ ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
+ }
}
matched = true;
// continue - we want interest to any existing DemandSubscriptions
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Fri May 17 14:27:48 2013
@@ -19,10 +19,7 @@ package org.apache.activemq.network;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.security.cert.X509Certificate;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -50,32 +47,7 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTempDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.NetworkBridgeFilter;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.*;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.security.SecurityContext;
@@ -147,7 +119,7 @@ public abstract class DemandForwardingBr
private final AtomicBoolean started = new AtomicBoolean();
private TransportConnection duplexInitiatingConnection;
- private BrokerService brokerService = null;
+ protected BrokerService brokerService = null;
private ObjectName mbeanObjectName;
private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
private Transport duplexInboundLocalBroker = null;
@@ -818,6 +790,28 @@ public abstract class DemandForwardingBr
} else if (data.getClass() == RemoveInfo.class) {
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);
+ } else if (data.getClass() == RemoveSubscriptionInfo.class) {
+ RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) data);
+ SubscriptionInfo subscriptionInfo = new SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
+ for (Iterator i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
+ DemandSubscription ds = (DemandSubscription) i.next();
+ boolean removed = ds.getDurableRemoteSubs().remove(subscriptionInfo);
+ if (removed) {
+ if (ds.getDurableRemoteSubs().isEmpty()) {
+
+ // deactivate subscriber
+ RemoveInfo removeInfo = new RemoveInfo(ds.getLocalInfo().getConsumerId());
+ localBroker.oneway(removeInfo);
+
+ // remove subscriber
+ RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo();
+ sending.setClientId(localClientId);
+ sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
+ sending.setConnectionId(this.localConnectionInfo.getConnectionId());
+ localBroker.oneway(sending);
+ }
+ }
+ }
}
}
@@ -1180,6 +1174,9 @@ public abstract class DemandForwardingBr
if (duplicateSuppressionIsRequired(sub)) {
undoMapRegistration(sub);
} else {
+ if (consumerInfo.isDurable()) {
+ sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
+ }
addSubscription(sub);
consumerAdded = true;
}
@@ -1274,7 +1271,7 @@ public abstract class DemandForwardingBr
return found;
}
- private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
+ protected final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
Region region;
Collection<Subscription> subs;
@@ -1370,8 +1367,7 @@ public abstract class DemandForwardingBr
// may need to change if we ever subscribe to a remote broker.
sub.getLocalInfo().setAdditionalPredicate(sub.getNetworkBridgeFilter());
} else {
- // need to ack this message if it is ignored as it is durable so
- // we check before we send. see: suppressMessageDispatch()
+ sub.setLocalDurableSubscriber(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
}
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java Fri May 17 14:27:48 2013
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.NetworkBridgeFilter;
+import org.apache.activemq.command.SubscriptionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +40,8 @@ public class DemandSubscription {
private final Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
private final AtomicInteger dispatched = new AtomicInteger(0);
private final AtomicBoolean activeWaiter = new AtomicBoolean();
+ private final Set<SubscriptionInfo> durableRemoteSubs = new CopyOnWriteArraySet<SubscriptionInfo>();
+ private SubscriptionInfo localDurableSubscriber;
private NetworkBridgeFilter networkBridgeFilter;
@@ -69,6 +72,10 @@ public class DemandSubscription {
return remoteSubsIds.remove(id);
}
+ public Set<SubscriptionInfo> getDurableRemoteSubs() {
+ return durableRemoteSubs;
+ }
+
/**
* @return true if there are no interested consumers
*/
@@ -138,4 +145,12 @@ public class DemandSubscription {
public void setNetworkBridgeFilter(NetworkBridgeFilter networkBridgeFilter) {
this.networkBridgeFilter = networkBridgeFilter;
}
+
+ public SubscriptionInfo getLocalDurableSubscriber() {
+ return localDurableSubscriber;
+ }
+
+ public void setLocalDurableSubscriber(SubscriptionInfo localDurableSubscriber) {
+ this.localDurableSubscriber = localDurableSubscriber;
+ }
}
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Fri May 17 14:27:48 2013
@@ -214,6 +214,7 @@ public class ActiveMQMessageConsumer imp
this.info = new ConsumerInfo(consumerId);
this.info.setExclusive(this.session.connection.isExclusiveConsumer());
+ this.info.setClientId(this.session.connection.getClientID());
this.info.setSubscriptionName(name);
this.info.setPrefetchSize(prefetch);
this.info.setCurrentPrefetchSize(prefetch);
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/ConsumerInfo.java Fri May 17 14:27:48 2013
@@ -42,6 +42,7 @@ public class ConsumerInfo extends BaseCo
protected boolean browser;
protected boolean dispatchAsync;
protected String selector;
+ protected String clientId;
protected String subscriptionName;
protected boolean noLocal;
protected boolean exclusive;
@@ -93,6 +94,7 @@ public class ConsumerInfo extends BaseCo
info.browser = browser;
info.dispatchAsync = dispatchAsync;
info.selector = selector;
+ info.clientId = clientId;
info.subscriptionName = subscriptionName;
info.noLocal = noLocal;
info.exclusive = exclusive;
@@ -217,6 +219,19 @@ public class ConsumerInfo extends BaseCo
}
/**
+ * Used to identify the id of a client connection.
+ *
+ * @openwire:property version=1
+ */
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ /**
* Used to identify the name of a durable subscription.
*
* @openwire:property version=1
Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Fri May 17 14:27:48 2013
@@ -34,6 +34,13 @@ public class SubscriptionInfo implements
protected String subscriptionName;
protected String selector;
+ public SubscriptionInfo() {}
+
+ public SubscriptionInfo(String clientId, String subscriptionName) {
+ this.clientId = clientId;
+ this.subscriptionName = subscriptionName;
+ }
+
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java?rev=1483827&r1=1483826&r2=1483827&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java Fri May 17 14:27:48 2013
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
*/
public class DurableSubInBrokerNetworkTest extends NetworkTestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(NetworkConnector.class);
+ private static final Logger LOG = LoggerFactory.getLogger(DurableSubInBrokerNetworkTest.class);
// protected BrokerService localBroker;
private final String subName = "Subscriber1";
private final String subName2 = "Subscriber2";
@@ -152,6 +152,16 @@ public class DurableSubInBrokerNetworkTe
assertTrue("Durable subscription should still be on remote broker",
foundSubInRemoteBrokerByTopicName(topicName));
+ sub2.close();
+ session.unsubscribe(subName2);
+
+ Thread.sleep(100);
+
+ assertFalse(foundSubInLocalBroker(subName2));
+
+ assertFalse("Durable subscription not unregistered on remote broker",
+ foundSubInRemoteBrokerByTopicName(topicName));
+
}
private boolean foundSubInRemoteBrokerByTopicName(String topicName) throws Exception {