You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/22 17:40:05 UTC

svn commit: r1352933 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java test/java/org/apache/activemq/transport/stomp/StompTest.java

Author: tabish
Date: Fri Jun 22 15:40:03 2012
New Revision: 1352933

URL: http://svn.apache.org/viewvc?rev=1352933&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3895

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1352933&r1=1352932&r2=1352933&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Fri Jun 22 15:40:03 2012
@@ -44,6 +44,7 @@ import org.apache.activemq.command.Comma
 import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.DestinationInfo;
@@ -485,13 +486,13 @@ public class ProtocolConverter {
             throw new ProtocolException("SUBSCRIBE received without a subscription id!");
         }
 
-        ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
+        final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
 
         if (actualDest == null) {
             throw new ProtocolException("Invalid Destination.");
         }
 
-        ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+        final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
         ConsumerInfo consumerInfo = new ConsumerInfo(id);
         consumerInfo.setPrefetchSize(1000);
         consumerInfo.setDispatchAsync(true);
@@ -540,9 +541,45 @@ public class ProtocolConverter {
             subscriptions.put(subscriptionId, stompSubscription);
         }
 
-        // dispatch can beat the receipt so send it early
-        sendReceipt(command);
-        sendToActiveMQ(consumerInfo, null);
+        final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+        if (receiptId != null && consumerInfo.getPrefetchSize() > 0) {
+
+            final StompFrame cmd = command;
+            final int prefetch = consumerInfo.getPrefetchSize();
+
+            // Since dispatch could beat the receipt we set prefetch to zero to start and then
+            // once we've sent our Receipt we are safe to turn on dispatch if the response isn't
+            // an error message.
+            consumerInfo.setPrefetchSize(0);
+
+            final ResponseHandler handler = new ResponseHandler() {
+                public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+                    if (response.isException()) {
+                        // Generally a command can fail.. but that does not invalidate the connection.
+                        // We report back the failure but we don't close the connection.
+                        Throwable exception = ((ExceptionResponse)response).getException();
+                        handleException(exception, cmd);
+                    } else {
+                        StompFrame sc = new StompFrame();
+                        sc.setAction(Stomp.Responses.RECEIPT);
+                        sc.setHeaders(new HashMap<String, String>(1));
+                        sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+                        stompTransport.sendToStomp(sc);
+
+                        ConsumerControl control = new ConsumerControl();
+                        control.setPrefetch(prefetch);
+                        control.setDestination(actualDest);
+                        control.setConsumerId(id);
+
+                        sendToActiveMQ(control, null);
+                    }
+                }
+            };
+
+            sendToActiveMQ(consumerInfo, handler);
+        } else {
+            sendToActiveMQ(consumerInfo, createResponseHandler(command));
+        }
     }
 
     protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1352933&r1=1352932&r2=1352933&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Fri Jun 22 15:40:03 2012
@@ -194,7 +194,6 @@ public class StompTest extends Combinati
         String f = stompConnection.receiveFrame();
         assertTrue(f.startsWith("CONNECTED"));
         assertTrue(f.indexOf("response-id:1") >= 0);
-
     }
 
     public void testSendMessage() throws Exception {
@@ -383,7 +382,6 @@ public class StompTest extends Combinati
         stompConnection.sendFrame(frame);
     }
 
-
     public void testSubscriptionReceipts() throws Exception {
         final int done = 500;
         int count = 0;
@@ -392,7 +390,6 @@ public class StompTest extends Combinati
         URI connectUri = new URI(bindAddress);
 
         do {
-
             StompConnection sender = new StompConnection();
             sender.open(createSocket(connectUri));
             String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@@ -434,7 +431,6 @@ public class StompTest extends Combinati
 
             receiver.disconnect();
         } while (count < done);
-
     }
 
     public void testSubscribeWithAutoAck() throws Exception {
@@ -570,7 +566,6 @@ public class StompTest extends Combinati
         frame = stompConnection.receiveFrame();
         assertTrue(frame.startsWith("MESSAGE"));
 
-
         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
     }
@@ -756,7 +751,6 @@ public class StompTest extends Combinati
         sendMessage(getName());
         StompFrame msg = stompConnection.receive();
 
-
         assertTrue(msg.getAction().equals("MESSAGE"));
 
         HashMap<String, String> ackHeaders = new HashMap<String, String>();
@@ -938,9 +932,43 @@ public class StompTest extends Combinati
         frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
 
         stompConnection.sendFrame(frame);
-        String f = stompConnection.receiveFrame();
-        assertTrue(f.startsWith("ERROR"));
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("ERROR"));
+    }
+
+    public void testSubscribeWithReceiptNotAuthorized() throws Exception {
+
+        String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
 
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" +
+                "ack:auto\n" + "receipt:1\n" + "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("ERROR"));
+        assertTrue("Error Frame did not contain receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
+    }
+
+    public void testSubscribeWithInvalidSelector() throws Exception {
+
+        String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector:foo.bar = 1\n" + "ack:auto\n\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("ERROR"));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
     }
 
     public void testTransformationUnknownTranslator() throws Exception {
@@ -1991,7 +2019,6 @@ public class StompTest extends Combinati
         assertTrue("Receipt Frame: " + frame, frame.trim().startsWith("RECEIPT"));
         assertTrue("Receipt contains correct receipt-id " + frame, frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
 
-
         // The subscription should receive a response with the ReplyTo property set.
         StompFrame received = responder.receive();
         assertNotNull(received);