You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/09/01 10:28:48 UTC

svn commit: r1163940 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java test/java/org/apache/activemq/transport/stomp/StompTest.java

Author: dejanb
Date: Thu Sep  1 08:28:47 2011
New Revision: 1163940

URL: http://svn.apache.org/viewvc?rev=1163940&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3481 - stomp deadlock

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1163940&r1=1163939&r2=1163940&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Thu Sep  1 08:28:47 2011
@@ -538,7 +538,9 @@ public class ProtocolConverter {
         if (subscriptionId != null) {
             subscriptions.put(subscriptionId, stompSubscription);
         }
-        sendToActiveMQ(consumerInfo, createResponseHandler(command));
+
+        sendToActiveMQ(consumerInfo, null);
+        sendReceipt(command);
     }
 
     protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
@@ -840,4 +842,19 @@ public class ProtocolConverter {
             }
         }
     }
+
+    protected void sendReceipt(StompFrame command) {
+        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+        if (receiptId != null) {
+            StompFrame sc = new StompFrame();
+            sc.setAction(Stomp.Responses.RECEIPT);
+            sc.setHeaders(new HashMap<String, String>(1));
+            sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+            try {
+                sendToStomp(sc);
+            } catch (IOException e) {
+                LOG.warn("Could not send a receipt for " + command, e);
+            }
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1163940&r1=1163939&r2=1163940&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Thu Sep  1 08:28:47 2011
@@ -1559,6 +1559,44 @@ public class StompTest extends Combinati
         assertNotNull(stompMessage);
         assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
     }
+    
+    public void testReceiptNewQueue() throws Exception {
+    	
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+        
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        
+        StompFrame receipt = stompConnection.receive();
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("8fee4b8-4e5c9f66-4703-e936-2", receipt.getHeaders().get("receipt-id"));
+
+
+        frame = "SEND\n destination:/queue/" + getQueueName() + 123 + "\ncontent-length:0" + " \n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-2" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        
+        receipt = stompConnection.receive();
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("8fee4b8-4e5c9f66-4703-e936-1", receipt.getHeaders().get("receipt-id"));
+
+        StompFrame message = stompConnection.receive();
+        assertTrue(message.getAction().startsWith("MESSAGE"));
+
+        String length = message.getHeaders().get("content-length");
+        assertEquals("0", length);
+        assertEquals(0, message.getContent().length);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
 
     protected void assertClients(int expected) throws Exception {
         org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();