You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/22 17:40:05 UTC
svn commit: r1352933 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
test/java/org/apache/activemq/transport/stomp/StompTest.java
Author: tabish
Date: Fri Jun 22 15:40:03 2012
New Revision: 1352933
URL: http://svn.apache.org/viewvc?rev=1352933&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3895
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1352933&r1=1352932&r2=1352933&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Fri Jun 22 15:40:03 2012
@@ -44,6 +44,7 @@ import org.apache.activemq.command.Comma
import org.apache.activemq.command.ConnectionError;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DestinationInfo;
@@ -485,13 +486,13 @@ public class ProtocolConverter {
throw new ProtocolException("SUBSCRIBE received without a subscription id!");
}
- ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
+ final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
if (actualDest == null) {
throw new ProtocolException("Invalid Destination.");
}
- ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+ final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
ConsumerInfo consumerInfo = new ConsumerInfo(id);
consumerInfo.setPrefetchSize(1000);
consumerInfo.setDispatchAsync(true);
@@ -540,9 +541,45 @@ public class ProtocolConverter {
subscriptions.put(subscriptionId, stompSubscription);
}
- // dispatch can beat the receipt so send it early
- sendReceipt(command);
- sendToActiveMQ(consumerInfo, null);
+ final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
+ if (receiptId != null && consumerInfo.getPrefetchSize() > 0) {
+
+ final StompFrame cmd = command;
+ final int prefetch = consumerInfo.getPrefetchSize();
+
+ // Since dispatch could beat the receipt we set prefetch to zero to start and then
+ // once we've sent our Receipt we are safe to turn on dispatch if the response isn't
+ // an error message.
+ consumerInfo.setPrefetchSize(0);
+
+ final ResponseHandler handler = new ResponseHandler() {
+ public void onResponse(ProtocolConverter converter, Response response) throws IOException {
+ if (response.isException()) {
+ // Generally a command can fail.. but that does not invalidate the connection.
+ // We report back the failure but we don't close the connection.
+ Throwable exception = ((ExceptionResponse)response).getException();
+ handleException(exception, cmd);
+ } else {
+ StompFrame sc = new StompFrame();
+ sc.setAction(Stomp.Responses.RECEIPT);
+ sc.setHeaders(new HashMap<String, String>(1));
+ sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
+ stompTransport.sendToStomp(sc);
+
+ ConsumerControl control = new ConsumerControl();
+ control.setPrefetch(prefetch);
+ control.setDestination(actualDest);
+ control.setConsumerId(id);
+
+ sendToActiveMQ(control, null);
+ }
+ }
+ };
+
+ sendToActiveMQ(consumerInfo, handler);
+ } else {
+ sendToActiveMQ(consumerInfo, createResponseHandler(command));
+ }
}
protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1352933&r1=1352932&r2=1352933&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Fri Jun 22 15:40:03 2012
@@ -194,7 +194,6 @@ public class StompTest extends Combinati
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("response-id:1") >= 0);
-
}
public void testSendMessage() throws Exception {
@@ -383,7 +382,6 @@ public class StompTest extends Combinati
stompConnection.sendFrame(frame);
}
-
public void testSubscriptionReceipts() throws Exception {
final int done = 500;
int count = 0;
@@ -392,7 +390,6 @@ public class StompTest extends Combinati
URI connectUri = new URI(bindAddress);
do {
-
StompConnection sender = new StompConnection();
sender.open(createSocket(connectUri));
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
@@ -434,7 +431,6 @@ public class StompTest extends Combinati
receiver.disconnect();
} while (count < done);
-
}
public void testSubscribeWithAutoAck() throws Exception {
@@ -570,7 +566,6 @@ public class StompTest extends Combinati
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
-
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@@ -756,7 +751,6 @@ public class StompTest extends Combinati
sendMessage(getName());
StompFrame msg = stompConnection.receive();
-
assertTrue(msg.getAction().equals("MESSAGE"));
HashMap<String, String> ackHeaders = new HashMap<String, String>();
@@ -938,9 +932,43 @@ public class StompTest extends Combinati
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
- String f = stompConnection.receiveFrame();
- assertTrue(f.startsWith("ERROR"));
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("ERROR"));
+ }
+
+ public void testSubscribeWithReceiptNotAuthorized() throws Exception {
+
+ String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" +
+ "ack:auto\n" + "receipt:1\n" + "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("ERROR"));
+ assertTrue("Error Frame did not contain receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
+ }
+
+ public void testSubscribeWithInvalidSelector() throws Exception {
+
+ String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector:foo.bar = 1\n" + "ack:auto\n\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("ERROR"));
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
}
public void testTransformationUnknownTranslator() throws Exception {
@@ -1991,7 +2019,6 @@ public class StompTest extends Combinati
assertTrue("Receipt Frame: " + frame, frame.trim().startsWith("RECEIPT"));
assertTrue("Receipt contains correct receipt-id " + frame, frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
-
// The subscription should receive a response with the ReplyTo property set.
StompFrame received = responder.receive();
assertNotNull(received);