You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by al...@apache.org on 2011/02/20 17:09:28 UTC
svn commit: r1072620 - in /activemq/trunk:
activemq-web-demo/src/test/java/org/apache/activemq/web/
activemq-web/src/main/java/org/apache/activemq/web/
Author: alexd
Date: Sun Feb 20 16:09:28 2011
New Revision: 1072620
URL: http://svn.apache.org/viewvc?rev=1072620&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3123 - messages for web clients need to be associated with the correct consumer.
Added:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/UndeliveredAjaxMessage.java
Modified:
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
Modified: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java?rev=1072620&r1=1072619&r2=1072620&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java (original)
+++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java Sun Feb 20 16:09:28 2011
@@ -463,4 +463,75 @@ public class AjaxTest extends JettyTestS
assertEquals( "Poll response is not correct.", expected, poll.getResponseContent() );
}
+ public void testAjaxClientReceivesMessagesForMultipleTopics() throws Exception {
+ LOG.debug( "*** testAjaxClientReceivesMessagesForMultipleTopics ***" );
+ HttpClient httpClient = new HttpClient();
+ httpClient.start();
+ httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+
+ LOG.debug( "SENDING LISTEN FOR /topic/topicA" );
+ AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
+ contentExchange.setMethod( "POST" );
+ contentExchange.setURL("http://localhost:8080/amq");
+ contentExchange.setRequestContent( new ByteArrayBuffer("destination=topic://topicA&type=listen&message=handlerA") );
+ contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
+ httpClient.send(contentExchange);
+ contentExchange.waitForDone();
+ String jsessionid = contentExchange.getJsessionId();
+
+ LOG.debug( "SENDING LISTEN FOR /topic/topicB" );
+ contentExchange = new AjaxTestContentExchange();
+ contentExchange.setMethod( "POST" );
+ contentExchange.setURL("http://localhost:8080/amq");
+ contentExchange.setRequestContent( new ByteArrayBuffer("destination=topic://topicB&type=listen&message=handlerB") );
+ contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
+ contentExchange.setRequestHeader( "Cookie", jsessionid );
+ httpClient.send(contentExchange);
+ contentExchange.waitForDone();
+
+ // client 1 polls for messages
+ LOG.debug( "SENDING POLL" );
+ AjaxTestContentExchange poll = new AjaxTestContentExchange();
+ poll.setMethod( "GET" );
+ poll.setURL("http://localhost:8080/amq?timeout=5000");
+ poll.setRequestHeader( "Cookie", jsessionid );
+ httpClient.send( poll );
+
+ // while client 1 is polling, client 2 sends messages to the topics
+ LOG.debug( "SENDING MESSAGES" );
+ contentExchange = new AjaxTestContentExchange();
+ contentExchange.setMethod( "POST" );
+ contentExchange.setURL("http://localhost:8080/amq");
+ contentExchange.setRequestContent( new ByteArrayBuffer(
+ "destination=topic://topicA&type=send&message=A1&"+
+ "d1=topic://topicB&t1=send&m1=B1&"+
+ "d2=topic://topicA&t2=send&m2=A2&"+
+ "d3=topic://topicB&t3=send&m3=B2"
+ ) );
+ contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
+ httpClient.send(contentExchange);
+ contentExchange.waitForDone();
+ LOG.debug( "DONE POSTING MESSAGES" );
+
+ // wait for poll to finish
+ poll.waitForDone();
+ String response = poll.getResponseContent();
+
+ // not all messages might be delivered during the 1st poll. We need to check again.
+ poll = new AjaxTestContentExchange();
+ poll.setMethod( "GET" );
+ poll.setURL("http://localhost:8080/amq?timeout=5000");
+ poll.setRequestHeader( "Cookie", jsessionid );
+ httpClient.send( poll );
+ poll.waitForDone();
+
+ String fullResponse = response + poll.getResponseContent();
+ LOG.debug( "full response " + fullResponse );
+ assertContains( "<response id='handlerA' destination='topic://topicA' >A1</response>\n", fullResponse );
+ assertContains( "<response id='handlerB' destination='topic://topicB' >B1</response>\n", fullResponse );
+ assertContains( "<response id='handlerA' destination='topic://topicA' >A2</response>\n", fullResponse );
+ assertContains( "<response id='handlerB' destination='topic://topicB' >B2</response>\n", fullResponse );
+ assertResponseCount( 4, fullResponse );
+ }
+
}
Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java?rev=1072620&r1=1072619&r2=1072620&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java Sun Feb 20 16:09:28 2011
@@ -39,7 +39,7 @@ public class AjaxListener implements Mes
private AjaxWebClient client;
private long lastAccess;
private Continuation continuation;
- private LinkedList<Message> unconsumedMessages = new LinkedList<Message>();
+ private LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
this.client = client;
@@ -55,10 +55,10 @@ public class AjaxListener implements Mes
this.continuation = continuation;
}
- public LinkedList<Message> getUnconsumedMessages() {
- return unconsumedMessages;
+ public LinkedList<UndeliveredAjaxMessage> getUndeliveredMessages() {
+ return undeliveredMessages;
}
-
+
public synchronized void onMessageAvailable(MessageConsumer consumer) {
if (LOG.isDebugEnabled()) {
LOG.debug("message for " + consumer + " continuation=" + continuation);
@@ -70,12 +70,11 @@ public class AjaxListener implements Mes
if( message != null ) {
if( continuation.isSuspended() ) {
LOG.debug( "Resuming suspended continuation " + continuation );
- continuation.setAttribute("message", message);
- continuation.setAttribute("consumer", consumer);
+ continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage( message, consumer ) );
continuation.resume();
} else {
LOG.debug( "Message available, but continuation is already resumed. Buffer for next time." );
- bufferMessageForDelivery( message );
+ bufferMessageForDelivery( message, consumer );
}
}
} catch (Exception e) {
@@ -91,17 +90,17 @@ public class AjaxListener implements Mes
} else {
try {
Message message = consumer.receive(10);
- bufferMessageForDelivery( message );
+ bufferMessageForDelivery( message, consumer );
} catch (Exception e) {
LOG.error("Error receiving message " + e, e);
}
}
}
- public void bufferMessageForDelivery( Message message ) {
+ public void bufferMessageForDelivery( Message message, MessageConsumer consumer ) {
if( message != null ) {
- synchronized( unconsumedMessages ) {
- unconsumedMessages.addLast(message);
+ synchronized( undeliveredMessages ) {
+ undeliveredMessages.addLast( new UndeliveredAjaxMessage( message, consumer ) );
}
}
}
Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1072620&r1=1072619&r2=1072620&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java Sun Feb 20 16:09:28 2011
@@ -262,16 +262,23 @@ public class MessageListenerServlet exte
if (LOG.isDebugEnabled()) {
LOG.debug("doMessage timeout=" + timeout);
}
-
- Message message = null;
+
// this is non-null if we're resuming the continuation.
// attributes set in AjaxListener
- message = (Message)request.getAttribute("message");
+ UndeliveredAjaxMessage undelivered_message = null;
+ Message message = null;
+ undelivered_message = (UndeliveredAjaxMessage)request.getAttribute("undelivered_message");
+ if( undelivered_message != null ) {
+ message = (Message)undelivered_message.getMessage();
+ }
synchronized (client) {
List consumers = client.getConsumers();
- MessageAvailableConsumer consumer = (MessageAvailableConsumer)request.getAttribute("consumer");
+ MessageAvailableConsumer consumer = null;
+ if( undelivered_message != null ) {
+ consumer = (MessageAvailableConsumer)undelivered_message.getConsumer();
+ }
if (message == null) {
// Look for a message that is ready to go
@@ -293,7 +300,7 @@ public class MessageListenerServlet exte
response.setContentType("text/xml");
response.setHeader("Cache-Control", "no-cache");
- if (message == null && client.getListener().getUnconsumedMessages().size() == 0) {
+ if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
Continuation continuation = ContinuationSupport.getContinuation(request);
if (continuation.isExpired()) {
@@ -331,16 +338,38 @@ public class MessageListenerServlet exte
Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
response.setStatus(HttpServletResponse.SC_OK);
writer.println("<ajax-response>");
-
+
// Send any message we already have
if (message != null) {
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
+ LOG.debug( "sending pre-existing message" );
writeMessageResponse(writer, message, id, destinationName);
messages++;
}
-
+
+ // send messages buffered while continuation was unavailable.
+ LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
+ LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
+ synchronized( undeliveredMessages ) {
+ for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext(); ) {
+ messages++;
+ UndeliveredAjaxMessage undelivered = it.next();
+ Message msg = (Message)undelivered.getMessage();
+ consumer = (MessageAvailableConsumer)undelivered.getConsumer();
+ String id = consumerIdMap.get(consumer);
+ String destinationName = consumerDestinationNameMap.get(consumer);
+ LOG.debug( "sending undelivered/buffered messages" );
+ LOG.debug( "msg:" +msg+ ", id:" +id+ ", destinationName:" +destinationName);
+ writeMessageResponse(writer, msg, id, destinationName);
+ it.remove();
+ if (messages >= maximumMessages) {
+ break;
+ }
+ }
+ }
+
// Send the rest of the messages
for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
consumer = (MessageAvailableConsumer)consumers.get(i);
@@ -348,22 +377,6 @@ public class MessageListenerServlet exte
continue;
}
- LinkedList<Message> unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages();
- LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages");
- synchronized( unconsumedMessages ) {
- for (Iterator<Message> it = unconsumedMessages.iterator(); it.hasNext(); ) {
- messages++;
- Message msg = it.next();
- String id = consumerIdMap.get(consumer);
- String destinationName = consumerDestinationNameMap.get(consumer);
- writeMessageResponse(writer, msg, id, destinationName);
- it.remove();
- if (messages >= maximumMessages) {
- break;
- }
- }
- }
-
// Look for any available messages
while (messages < maximumMessages) {
message = consumer.receiveNoWait();
@@ -373,6 +386,7 @@ public class MessageListenerServlet exte
messages++;
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
+ LOG.debug( "sending final available messages" );
writeMessageResponse(writer, message, id, destinationName);
}
}
Added: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/UndeliveredAjaxMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/UndeliveredAjaxMessage.java?rev=1072620&view=auto
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/UndeliveredAjaxMessage.java (added)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/UndeliveredAjaxMessage.java Sun Feb 20 16:09:28 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.web;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+
+class UndeliveredAjaxMessage {
+ private Message message;
+ private MessageConsumer consumer;
+
+ UndeliveredAjaxMessage( Message message, MessageConsumer consumer ) {
+ this.message = message;
+ this.consumer = consumer;
+ }
+
+ public MessageConsumer getConsumer() {
+ return this.consumer;
+ }
+
+ public Message getMessage() {
+ return this.message;
+ }
+}