You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/02/20 14:33:36 UTC

svn commit: r1448161 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq-unit-tests/pom.xml

Author: gtully
Date: Wed Feb 20 13:33:36 2013
New Revision: 1448161

URL: http://svn.apache.org/r1448161
Log:
Revert "Fix for https://issues.apache.org/jira/browse/AMQ-4000 Durable subscription not getting unregistered on networked broker, thanks torsten for the unit test!"
added DurableSubInBrokerNetworkTest to broken test profile till we get this resolved.

This reverts commit b7c32d924af5ada1a2068c77f3bf8e44267edea4.

Conflicts:
	activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubInBrokerNetworkTest.java

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-unit-tests/pom.xml

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=1448161&r1=1448160&r2=1448161&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Wed Feb 20 13:33:36 2013
@@ -29,13 +29,25 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
-import org.apache.activemq.util.SubscriptionKey;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +65,6 @@ public class AdvisoryBroker extends Brok
     protected final ConcurrentHashMap<ProducerId, ProducerInfo> producers = new ConcurrentHashMap<ProducerId, ProducerInfo>();
     protected final ConcurrentHashMap<ActiveMQDestination, DestinationInfo> destinations = new ConcurrentHashMap<ActiveMQDestination, DestinationInfo>();
     protected final ConcurrentHashMap<BrokerInfo, ActiveMQMessage> networkBridges = new ConcurrentHashMap<BrokerInfo, ActiveMQMessage>();
-    protected final ConcurrentHashMap<SubscriptionKey, ActiveMQTopic> durableSubscriptions = new ConcurrentHashMap<SubscriptionKey, ActiveMQTopic>();
     protected final ProducerId advisoryProducerId = new ProducerId();
 
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
@@ -81,12 +92,6 @@ public class AdvisoryBroker extends Brok
 
         // Don't advise advisory topics.
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
-            if (info.getDestination().isTopic() && info.isDurable()) {
-                SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
-                if (!this.durableSubscriptions.contains(key)) {
-                    this.durableSubscriptions.put(key, (ActiveMQTopic)info.getDestination());
-                }
-            }
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
             consumers.put(info.getConsumerId(), info);
             fireConsumerAdvisory(context, info.getDestination(), topic, info);
@@ -259,26 +264,6 @@ public class AdvisoryBroker extends Brok
     }
 
     @Override
-    public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
-        super.removeSubscription(context, info);
-
-        SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
-
-        ActiveMQTopic dest = durableSubscriptions.get(key);
-        if (dest == null) {
-            LOG.warn("We cannot send an advisory message for a durable sub removal when we don't know about the durable sub");
-        }
-
-        // Don't advise advisory topics.
-        if (!AdvisorySupport.isAdvisoryTopic(dest)) {
-            ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(dest);
-            durableSubscriptions.remove(key);
-            fireConsumerAdvisory(context,dest, topic, info);
-        }
-
-    }
-
-    @Override
     public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
         super.removeProducer(context, info);
 

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1448161&r1=1448160&r2=1448161&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Feb 20 13:33:36 2013
@@ -50,7 +50,32 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.NetworkBridgeFilter;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.security.SecurityContext;
@@ -791,11 +816,6 @@ public abstract class DemandForwardingBr
         } else if (data.getClass() == RemoveInfo.class) {
             ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
             removeDemandSubscription(id);
-        } else if (data.getClass() == RemoveSubscriptionInfo.class) {
-            RemoveSubscriptionInfo durableSub = (RemoveSubscriptionInfo)data;
-            LOG.debug("Removing durable subscription: clientId: "  + durableSub.getClientId()
-                    + ", durableName: " + durableSub.getSubcriptionName());
-            localBroker.oneway(data);
         }
     }
 

Modified: activemq/trunk/activemq-unit-tests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/pom.xml?rev=1448161&r1=1448160&r2=1448161&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/pom.xml (original)
+++ activemq/trunk/activemq-unit-tests/pom.xml Wed Feb 20 13:33:36 2013
@@ -570,6 +570,8 @@
                 <exclude>**/StoreQueueCursorJournalNoDuplicateTest.*</exclude>
                 <exclude>**/org.apache.activemq.usecases.ThreeBrokerVirtualTopicNetworkAMQPATest.*</exclude>
                 <exclude>**/LevelDBXARecoveryBrokerTest.*</exclude>
+                <!-- https://issues.apache.org/jira/browse/AMQ-4000 -->
+                <exclude>**/DurableSubInBrokerNetworkTest.*</exclude>
               </excludes>
             </configuration>
           </plugin>