You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/02/27 18:19:13 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5597

Repository: activemq
Updated Branches:
  refs/heads/master 5667e4ddc -> 7af7c0143


https://issues.apache.org/jira/browse/AMQ-5597

Clean up the durable subscription unsubscribe handling to be in line
with the AMQP JMS mapping spec and switch to the QPid 0.32-SNAPSHOT
build for now to allow us to track and other changes we might want to
feed back there before release.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7af7c014
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7af7c014
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7af7c014

Branch: refs/heads/master
Commit: 7af7c0143fd86767035d79746a9d4e7b740fff29
Parents: 5667e4d
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Feb 27 12:18:49 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Feb 27 12:18:49 2015 -0500

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 180 +++++++++++--------
 .../activemq/broker/region/TopicRegion.java     |   9 +
 pom.xml                                         |   2 +-
 3 files changed, 120 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7af7c014/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 87fa7df..1b86040 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -30,10 +30,12 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.Destination;
 import javax.jms.InvalidClientIDException;
-import javax.jms.InvalidDestinationException;
 import javax.jms.InvalidSelectorException;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTempQueue;
@@ -85,6 +87,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
 import org.apache.qpid.proton.amqp.messaging.Released;
 import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.proton.amqp.transaction.Coordinator;
 import org.apache.qpid.proton.amqp.transaction.Declare;
 import org.apache.qpid.proton.amqp.transaction.Declared;
@@ -127,7 +130,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Symbol COPY = Symbol.getSymbol("copy");
     private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
     private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
-    private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
 
     private final AmqpTransport amqpTransport;
     private final AmqpWireFormat amqpWireFormat;
@@ -333,9 +335,13 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                             processSessionEvent(event.getSession());
                             break;
                         case LINK_REMOTE_OPEN:
-                        case LINK_REMOTE_CLOSE:
+                            processLinkOpen(event.getLink());
+                            break;
                         case LINK_REMOTE_DETACH:
-                            processLinkEvent(event.getLink());
+                            processLinkDetach(event.getLink());
+                            break;
+                        case LINK_REMOTE_CLOSE:
+                            processLinkClose(event.getLink());
                             break;
                         case LINK_FLOW:
                             processLinkFlow(event.getLink());
@@ -387,18 +393,26 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
     }
 
-    protected void processLinkEvent(Link link) throws Exception {
-        EndpointState remoteState = link.getRemoteState();
-        if (remoteState == EndpointState.ACTIVE) {
-            onLinkOpen(link);
-        } else if (remoteState == EndpointState.CLOSED) {
-            AmqpDeliveryListener context = (AmqpDeliveryListener) link.getContext();
-            if (context != null) {
-                context.onClose();
-            }
-            link.close();
-            link.free();
+    protected void processLinkOpen(Link link) throws Exception {
+        onLinkOpen(link);
+    }
+
+    protected void processLinkDetach(Link link) throws Exception {
+        AmqpDeliveryListener context = (AmqpDeliveryListener) link.getContext();
+        if (context != null) {
+            context.onDetach();
         }
+        link.detach();
+        link.free();
+    }
+
+    protected void processLinkClose(Link link) throws Exception {
+        AmqpDeliveryListener context = (AmqpDeliveryListener) link.getContext();
+        if (context != null) {
+            context.onClose();
+        }
+        link.close();
+        link.free();
     }
 
     protected void processSessionEvent(Session session) throws Exception {
@@ -502,6 +516,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
         abstract public void onDelivery(Delivery delivery) throws Exception;
 
+        public void onDetach() throws Exception {
+        }
+
         public void onClose() throws Exception {
         }
 
@@ -674,6 +691,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
 
         @Override
+        public String toString() {
+            return "ProducerContext { producerId = " + producerId + ", destination = " + destination + " }";
+        }
+
+        @Override
         protected void onMessage(final Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
             if (!closed) {
                 EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
@@ -1027,6 +1049,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
 
         @Override
+        public String toString() {
+            return "ConsumerContext { " + info + " }";
+        }
+
+        @Override
+        public void onDetach() throws Exception {
+            if (!closed) {
+                closed = true;
+                sender.setContext(null);
+                subscriptionsByConsumerId.remove(consumerId);
+
+                AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
+                if (session != null) {
+                    session.consumers.remove(info.getConsumerId());
+                }
+
+                RemoveInfo removeCommand = new RemoveInfo(consumerId);
+                removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+                sendToActiveMQ(removeCommand, null);
+            }
+        }
+
+        @Override
         public void onClose() throws Exception {
             if (!closed) {
                 closed = true;
@@ -1041,6 +1086,15 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 RemoveInfo removeCommand = new RemoveInfo(consumerId);
                 removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
                 sendToActiveMQ(removeCommand, null);
+
+                if (info.isDurable()) {
+                    RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                    rsi.setConnectionId(connectionId);
+                    rsi.setSubscriptionName(sender.getName());
+                    rsi.setClientId(connectionInfo.getClientId());
+
+                    sendToActiveMQ(rsi, null);
+                }
             }
         }
 
@@ -1339,57 +1393,35 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 }
             }
 
-            ActiveMQDestination dest;
+            ActiveMQDestination destination;
             if (source == null) {
+                // Attempt to recover previous subscription
+                destination = lookupSubscription(sender.getName());
 
-                source = new org.apache.qpid.proton.amqp.messaging.Source();
-                source.setAddress("");
-                source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
-                sender.setSource(source);
-
-                // Looks like durable sub removal.
-                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
-                rsi.setConnectionId(connectionId);
-                rsi.setSubscriptionName(sender.getName());
-                rsi.setClientId(connectionInfo.getClientId());
-
-                consumerContext.closed = true;
-                sendToActiveMQ(rsi, new ResponseHandler() {
-                    @Override
-                    public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
-                        if (response.isException()) {
-                            sender.setSource(null);
-                            Throwable exception = ((ExceptionResponse) response).getException();
-                            if (exception instanceof SecurityException) {
-                                sender.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
-                            } else if (exception instanceof InvalidDestinationException){
-                                sender.setCondition(new ErrorCondition(AmqpError.NOT_FOUND, exception.getMessage()));
-                            } else {
-                                sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
-                            }
-                            sender.close();
-                            sender.free();
-                        } else {
-                            sender.open();
-                        }
-                        pumpProtonToSocket();
-                    }
-                });
-                return;
-            } else if (contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED)) {
-                consumerContext.closed = true;
-                sender.close();
-                pumpProtonToSocket();
-                return;
+                if (destination != null) {
+                    source = new org.apache.qpid.proton.amqp.messaging.Source();
+                    source.setAddress(destination.getQualifiedName());
+                    source.setDurable(TerminusDurability.UNSETTLED_STATE);
+                    source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+                    sender.setSource(source);
+                } else {
+                    consumerContext.closed = true;
+                    sender.setSource(null);
+                    sender.setCondition(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + sender.getName()));
+                    sender.close();
+                    sender.free();
+                    pumpProtonToSocket();
+                    return;
+                }
             } else if (source.getDynamic()) {
                 // lets create a temp dest.
-                dest = createTempQueue();
+                destination = createTempQueue();
                 source = new org.apache.qpid.proton.amqp.messaging.Source();
-                source.setAddress(dest.getQualifiedName());
+                source.setAddress(destination.getQualifiedName());
                 source.setDynamic(true);
                 sender.setSource(source);
             } else {
-                dest = createDestination(source);
+                destination = createDestination(source);
             }
 
             subscriptionsByConsumerId.put(id, consumerContext);
@@ -1397,8 +1429,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             consumerContext.info = consumerInfo;
             consumerInfo.setSelector(selector);
             consumerInfo.setNoRangeAcks(true);
-            consumerInfo.setDestination(dest);
-            consumerContext.destination = dest;
+            consumerInfo.setDestination(destination);
+            consumerContext.destination = destination;
             int senderCredit = sender.getRemoteCredit();
             if (prefetch != 0) {
                 // use the value configured on the transport connector
@@ -1419,11 +1451,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             }
             consumerContext.credit = senderCredit;
             consumerInfo.setDispatchAsync(true);
-            if (source.getDistributionMode() == COPY && dest.isQueue()) {
+            if (source.getDistributionMode() == COPY && destination.isQueue()) {
                 consumerInfo.setBrowser(true);
             }
             if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
-                 TerminusDurability.CONFIGURATION.equals(source.getDurable())) && dest.isTopic()) {
+                 TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) {
                 consumerInfo.setSubscriptionName(sender.getName());
             }
 
@@ -1466,15 +1498,23 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
     }
 
-    static private boolean contains(Symbol[] haystack, Symbol needle) {
-        if (haystack != null) {
-            for (Symbol capability : haystack) {
-                if (capability == needle) {
-                    return true;
-                }
-            }
+    private ActiveMQDestination lookupSubscription(String subscriptionName) throws AmqpProtocolException {
+        ActiveMQDestination result = null;
+        RegionBroker regionBroker;
+
+        try {
+            regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
+        } catch (Exception e) {
+            throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e);
+        }
+
+        final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+        DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId());
+        if (subscription != null) {
+            result = subscription.getActiveMQDestination();
         }
-        return false;
+
+        return result;
     }
 
     private ActiveMQDestination createTempQueue() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/7af7c014/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
index 383f240..80088d7 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
@@ -378,6 +378,15 @@ public class TopicRegion extends AbstractRegion {
         return inactiveDestinations;
     }
 
+    public DurableTopicSubscription lookupSubscription(String subscriptionName, String clientId) {
+        SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+        if (durableSubscriptions.containsKey(key)) {
+            return durableSubscriptions.get(key);
+        }
+
+        return null;
+    }
+
     public boolean isKeepDurableSubsActive() {
         return keepDurableSubsActive;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7af7c014/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1b05d36..4d6cdcd 100755
--- a/pom.xml
+++ b/pom.xml
@@ -105,7 +105,7 @@
     <linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
     <zookeeper-version>3.4.6</zookeeper-version>
     <qpid-proton-version>0.8</qpid-proton-version>
-    <qpid-jms-version>0.30</qpid-jms-version>
+    <qpid-jms-version>0.32-SNAPSHOT</qpid-jms-version>
     <regexp-version>1.3</regexp-version>
     <rome-version>1.0</rome-version>
     <saxon-version>9.5.1-2</saxon-version>