You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/02/27 18:19:13 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5597
Repository: activemq
Updated Branches:
refs/heads/master 5667e4ddc -> 7af7c0143
https://issues.apache.org/jira/browse/AMQ-5597
Clean up the durable subscription unsubscribe handling to be in line
with the AMQP JMS mapping spec and switch to the QPid 0.32-SNAPSHOT
build for now to allow us to track and other changes we might want to
feed back there before release.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7af7c014
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7af7c014
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7af7c014
Branch: refs/heads/master
Commit: 7af7c0143fd86767035d79746a9d4e7b740fff29
Parents: 5667e4d
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Feb 27 12:18:49 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Feb 27 12:18:49 2015 -0500
----------------------------------------------------------------------
.../transport/amqp/AmqpProtocolConverter.java | 180 +++++++++++--------
.../activemq/broker/region/TopicRegion.java | 9 +
pom.xml | 2 +-
3 files changed, 120 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/7af7c014/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 87fa7df..1b86040 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -30,10 +30,12 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Destination;
import javax.jms.InvalidClientIDException;
-import javax.jms.InvalidDestinationException;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempQueue;
@@ -85,6 +87,7 @@ import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
+import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transaction.Coordinator;
import org.apache.qpid.proton.amqp.transaction.Declare;
import org.apache.qpid.proton.amqp.transaction.Declared;
@@ -127,7 +130,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Symbol COPY = Symbol.getSymbol("copy");
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
- private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
private final AmqpTransport amqpTransport;
private final AmqpWireFormat amqpWireFormat;
@@ -333,9 +335,13 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
processSessionEvent(event.getSession());
break;
case LINK_REMOTE_OPEN:
- case LINK_REMOTE_CLOSE:
+ processLinkOpen(event.getLink());
+ break;
case LINK_REMOTE_DETACH:
- processLinkEvent(event.getLink());
+ processLinkDetach(event.getLink());
+ break;
+ case LINK_REMOTE_CLOSE:
+ processLinkClose(event.getLink());
break;
case LINK_FLOW:
processLinkFlow(event.getLink());
@@ -387,18 +393,26 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
- protected void processLinkEvent(Link link) throws Exception {
- EndpointState remoteState = link.getRemoteState();
- if (remoteState == EndpointState.ACTIVE) {
- onLinkOpen(link);
- } else if (remoteState == EndpointState.CLOSED) {
- AmqpDeliveryListener context = (AmqpDeliveryListener) link.getContext();
- if (context != null) {
- context.onClose();
- }
- link.close();
- link.free();
+ protected void processLinkOpen(Link link) throws Exception {
+ onLinkOpen(link);
+ }
+
+ protected void processLinkDetach(Link link) throws Exception {
+ AmqpDeliveryListener context = (AmqpDeliveryListener) link.getContext();
+ if (context != null) {
+ context.onDetach();
}
+ link.detach();
+ link.free();
+ }
+
+ protected void processLinkClose(Link link) throws Exception {
+ AmqpDeliveryListener context = (AmqpDeliveryListener) link.getContext();
+ if (context != null) {
+ context.onClose();
+ }
+ link.close();
+ link.free();
}
protected void processSessionEvent(Session session) throws Exception {
@@ -502,6 +516,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
abstract public void onDelivery(Delivery delivery) throws Exception;
+ public void onDetach() throws Exception {
+ }
+
public void onClose() throws Exception {
}
@@ -674,6 +691,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
@Override
+ public String toString() {
+ return "ProducerContext { producerId = " + producerId + ", destination = " + destination + " }";
+ }
+
+ @Override
protected void onMessage(final Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
if (!closed) {
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
@@ -1027,6 +1049,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
@Override
+ public String toString() {
+ return "ConsumerContext { " + info + " }";
+ }
+
+ @Override
+ public void onDetach() throws Exception {
+ if (!closed) {
+ closed = true;
+ sender.setContext(null);
+ subscriptionsByConsumerId.remove(consumerId);
+
+ AmqpSessionContext session = (AmqpSessionContext) sender.getSession().getContext();
+ if (session != null) {
+ session.consumers.remove(info.getConsumerId());
+ }
+
+ RemoveInfo removeCommand = new RemoveInfo(consumerId);
+ removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
+ sendToActiveMQ(removeCommand, null);
+ }
+ }
+
+ @Override
public void onClose() throws Exception {
if (!closed) {
closed = true;
@@ -1041,6 +1086,15 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
RemoveInfo removeCommand = new RemoveInfo(consumerId);
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
+
+ if (info.isDurable()) {
+ RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+ rsi.setConnectionId(connectionId);
+ rsi.setSubscriptionName(sender.getName());
+ rsi.setClientId(connectionInfo.getClientId());
+
+ sendToActiveMQ(rsi, null);
+ }
}
}
@@ -1339,57 +1393,35 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
- ActiveMQDestination dest;
+ ActiveMQDestination destination;
if (source == null) {
+ // Attempt to recover previous subscription
+ destination = lookupSubscription(sender.getName());
- source = new org.apache.qpid.proton.amqp.messaging.Source();
- source.setAddress("");
- source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
- sender.setSource(source);
-
- // Looks like durable sub removal.
- RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
- rsi.setConnectionId(connectionId);
- rsi.setSubscriptionName(sender.getName());
- rsi.setClientId(connectionInfo.getClientId());
-
- consumerContext.closed = true;
- sendToActiveMQ(rsi, new ResponseHandler() {
- @Override
- public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
- if (response.isException()) {
- sender.setSource(null);
- Throwable exception = ((ExceptionResponse) response).getException();
- if (exception instanceof SecurityException) {
- sender.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
- } else if (exception instanceof InvalidDestinationException){
- sender.setCondition(new ErrorCondition(AmqpError.NOT_FOUND, exception.getMessage()));
- } else {
- sender.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
- }
- sender.close();
- sender.free();
- } else {
- sender.open();
- }
- pumpProtonToSocket();
- }
- });
- return;
- } else if (contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED)) {
- consumerContext.closed = true;
- sender.close();
- pumpProtonToSocket();
- return;
+ if (destination != null) {
+ source = new org.apache.qpid.proton.amqp.messaging.Source();
+ source.setAddress(destination.getQualifiedName());
+ source.setDurable(TerminusDurability.UNSETTLED_STATE);
+ source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
+ sender.setSource(source);
+ } else {
+ consumerContext.closed = true;
+ sender.setSource(null);
+ sender.setCondition(new ErrorCondition(AmqpError.NOT_FOUND, "Unknown subscription link: " + sender.getName()));
+ sender.close();
+ sender.free();
+ pumpProtonToSocket();
+ return;
+ }
} else if (source.getDynamic()) {
// lets create a temp dest.
- dest = createTempQueue();
+ destination = createTempQueue();
source = new org.apache.qpid.proton.amqp.messaging.Source();
- source.setAddress(dest.getQualifiedName());
+ source.setAddress(destination.getQualifiedName());
source.setDynamic(true);
sender.setSource(source);
} else {
- dest = createDestination(source);
+ destination = createDestination(source);
}
subscriptionsByConsumerId.put(id, consumerContext);
@@ -1397,8 +1429,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
consumerContext.info = consumerInfo;
consumerInfo.setSelector(selector);
consumerInfo.setNoRangeAcks(true);
- consumerInfo.setDestination(dest);
- consumerContext.destination = dest;
+ consumerInfo.setDestination(destination);
+ consumerContext.destination = destination;
int senderCredit = sender.getRemoteCredit();
if (prefetch != 0) {
// use the value configured on the transport connector
@@ -1419,11 +1451,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
consumerContext.credit = senderCredit;
consumerInfo.setDispatchAsync(true);
- if (source.getDistributionMode() == COPY && dest.isQueue()) {
+ if (source.getDistributionMode() == COPY && destination.isQueue()) {
consumerInfo.setBrowser(true);
}
if ((TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) ||
- TerminusDurability.CONFIGURATION.equals(source.getDurable())) && dest.isTopic()) {
+ TerminusDurability.CONFIGURATION.equals(source.getDurable())) && destination.isTopic()) {
consumerInfo.setSubscriptionName(sender.getName());
}
@@ -1466,15 +1498,23 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
}
- static private boolean contains(Symbol[] haystack, Symbol needle) {
- if (haystack != null) {
- for (Symbol capability : haystack) {
- if (capability == needle) {
- return true;
- }
- }
+ private ActiveMQDestination lookupSubscription(String subscriptionName) throws AmqpProtocolException {
+ ActiveMQDestination result = null;
+ RegionBroker regionBroker;
+
+ try {
+ regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class);
+ } catch (Exception e) {
+ throw new AmqpProtocolException("Error finding subscription: " + subscriptionName + ": " + e.getMessage(), false, e);
+ }
+
+ final TopicRegion topicRegion = (TopicRegion) regionBroker.getTopicRegion();
+ DurableTopicSubscription subscription = topicRegion.lookupSubscription(subscriptionName, connectionInfo.getClientId());
+ if (subscription != null) {
+ result = subscription.getActiveMQDestination();
}
- return false;
+
+ return result;
}
private ActiveMQDestination createTempQueue() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/7af7c014/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
index 383f240..80088d7 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
@@ -378,6 +378,15 @@ public class TopicRegion extends AbstractRegion {
return inactiveDestinations;
}
+ public DurableTopicSubscription lookupSubscription(String subscriptionName, String clientId) {
+ SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+ if (durableSubscriptions.containsKey(key)) {
+ return durableSubscriptions.get(key);
+ }
+
+ return null;
+ }
+
public boolean isKeepDurableSubsActive() {
return keepDurableSubsActive;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/7af7c014/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1b05d36..4d6cdcd 100755
--- a/pom.xml
+++ b/pom.xml
@@ -105,7 +105,7 @@
<linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
<zookeeper-version>3.4.6</zookeeper-version>
<qpid-proton-version>0.8</qpid-proton-version>
- <qpid-jms-version>0.30</qpid-jms-version>
+ <qpid-jms-version>0.32-SNAPSHOT</qpid-jms-version>
<regexp-version>1.3</regexp-version>
<rome-version>1.0</rome-version>
<saxon-version>9.5.1-2</saxon-version>