You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/07/26 21:43:41 UTC

svn commit: r1366173 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java test/java/org/apache/activemq/transport/stomp/Stomp11Test.java

Author: tabish
Date: Thu Jul 26 19:43:41 2012
New Revision: 1366173

URL: http://svn.apache.org/viewvc?rev=1366173&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3917 

Allow multiple durable subs for Stomp v1.1 client connections.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1366173&r1=1366172&r2=1366173&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Thu Jul 26 19:43:41 2012
@@ -22,10 +22,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -603,9 +600,13 @@ public class ProtocolConverter {
 
         // check if it is a durable subscription
         String durable = command.getHeaders().get("activemq.subscriptionName");
+        String clientId = durable;
+        if (this.version.equals(Stomp.V1_1)) {
+            clientId = connectionInfo.getClientId();
+        }
         if (durable != null) {
             RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
-            info.setClientId(durable);
+            info.setClientId(clientId);
             info.setSubscriptionName(durable);
             info.setConnectionId(connectionId);
             sendToActiveMQ(info, createResponseHandler(command));

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java?rev=1366173&r1=1366172&r2=1366173&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java Thu Jul 26 19:43:41 2012
@@ -30,11 +30,13 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import javax.management.ObjectName;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.slf4j.Logger;
@@ -638,11 +640,11 @@ public class Stomp11Test extends Combina
         assertTrue(frame.startsWith("CONNECTED"));
 
         frame = "SEND\n" +
-        		"value:newest" + "\n" +
-        		"value:older" + "\n" +
-        		"value:oldest" + "\n" +
-        		"destination:/queue/" + getQueueName() +
-        		"\n\n" + "Hello World" + Stomp.NULL;
+                "value:newest" + "\n" +
+                "value:older" + "\n" +
+                "value:oldest" + "\n" +
+                "destination:/queue/" + getQueueName() +
+                "\n\n" + "Hello World" + Stomp.NULL;
 
         stompConnection.sendFrame(frame);
 
@@ -779,4 +781,209 @@ public class Stomp11Test extends Combina
         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
     }
+
+    public void testDurableSubAndUnSubOnTwoTopics() throws Exception {
+        stompConnection.setVersion(Stomp.V1_1);
+
+        String domain = "org.apache.activemq";
+        ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
+
+        BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
+
+        String connectFrame = "STOMP\n" +
+                "login:system\n" + "passcode:manager\n" + "accept-version:1.1\n" +
+                "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+
+        assertTrue(frame.startsWith("CONNECTED"));
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+
+        // subscribe to first destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "1" + "\n" +
+                "ack:auto\n" + "receipt:1\n" + "id:durablesub-1\n" +
+                "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("1", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 1);
+
+        // subscribe to second destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "2" + "\n" +
+                "ack:auto\n" + "receipt:2\n" + "id:durablesub-2\n" +
+                "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("2", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 2);
+
+        frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        try {
+            Thread.sleep(400);
+        } catch (InterruptedException e){}
+
+        // reconnect and send some messages to the offline subscribers and then try to get
+        // them after subscribing again.
+        stompConnect();
+        stompConnection.sendFrame(connectFrame);
+        frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(frame.startsWith("CONNECTED"));
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 2);
+
+        // unsubscribe from topic 1
+        frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "1\n" +
+                "id:durablesub-1\n" + "receipt:3\n" +
+                "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("3", receipt.getHeaders().get("receipt-id"));
+
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 1);
+
+        // unsubscribe from topic 2
+        frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "2\n" +
+                "id:durablesub-2\n" + "receipt:4\n" +
+                "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("4", receipt.getHeaders().get("receipt-id"));
+
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    public void testMultipleDurableSubsWithOfflineMessages() throws Exception {
+        stompConnection.setVersion(Stomp.V1_1);
+
+        String domain = "org.apache.activemq";
+        ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
+
+        BrokerViewMBean view = (BrokerViewMBean)broker.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
+
+        String connectFrame = "STOMP\n" + "login:system\n" + "passcode:manager\n" +
+                "accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+
+        assertTrue(frame.startsWith("CONNECTED"));
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+
+        // subscribe to first destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "1" + "\n" +
+                "ack:auto\n" + "receipt:1\n" + "id:durablesub-1\n" +
+                "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("1", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 1);
+
+        // subscribe to second destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "2" + "\n" +
+                "ack:auto\n" + "receipt:2\n" + "id:durablesub-2\n" +
+                "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("2", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 2);
+
+        frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        try {
+            Thread.sleep(400);
+        } catch (InterruptedException e){}
+
+        // reconnect and send some messages to the offline subscribers and then try to get
+        // them after subscribing again.
+        stompConnect();
+        stompConnection.sendFrame(connectFrame);
+        frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(frame.startsWith("CONNECTED"));
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 2);
+
+        frame = "SEND\n" + "destination:/topic/" + getQueueName() + "1\n" +
+                "receipt:10\n" + "\n" + "Hello World 1" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        receipt = stompConnection.receive();
+        assertEquals("10", receipt.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
+
+        frame = "SEND\n" + "destination:/topic/" + getQueueName() + "2\n" +
+                "receipt:11\n" + "\n" + "Hello World 2" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        receipt = stompConnection.receive();
+        assertEquals("11", receipt.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
+
+        // subscribe to first destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "1" + "\n" +
+                "ack:auto\n" + "receipt:3\n" + "id:durablesub-1\n" +
+                "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("3", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 1);
+
+        StompFrame message = stompConnection.receive();
+        assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+        assertEquals("durablesub-1", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+
+        assertEquals(view.getDurableTopicSubscribers().length, 1);
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 1);
+
+        // subscribe to second destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "2" + "\n" +
+                "ack:auto\n" + "receipt:4\n" + "id:durablesub-2\n" +
+                "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("4", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 2);
+
+        message = stompConnection.receive();
+        assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+        assertEquals("durablesub-2", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+
+        assertEquals(view.getDurableTopicSubscribers().length, 2);
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
 }