You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by al...@apache.org on 2011/02/20 17:09:28 UTC

svn commit: r1072620 - in /activemq/trunk: activemq-web-demo/src/test/java/org/apache/activemq/web/ activemq-web/src/main/java/org/apache/activemq/web/

Author: alexd
Date: Sun Feb 20 16:09:28 2011
New Revision: 1072620

URL: http://svn.apache.org/viewvc?rev=1072620&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3123 - messages for web clients need to be associated with the correct consumer.  

Added:
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/UndeliveredAjaxMessage.java
Modified:
    activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java

Modified: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java?rev=1072620&r1=1072619&r2=1072620&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java (original)
+++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/AjaxTest.java Sun Feb 20 16:09:28 2011
@@ -463,4 +463,75 @@ public class AjaxTest extends JettyTestS
         assertEquals( "Poll response is not correct.", expected, poll.getResponseContent() );
     }
     
+    public void testAjaxClientReceivesMessagesForMultipleTopics() throws Exception {
+        LOG.debug( "*** testAjaxClientReceivesMessagesForMultipleTopics ***" );
+        HttpClient httpClient = new HttpClient();
+        httpClient.start();
+        httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+        
+        LOG.debug( "SENDING LISTEN FOR /topic/topicA" );
+        AjaxTestContentExchange contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer("destination=topic://topicA&type=listen&message=handlerA") );
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        String jsessionid = contentExchange.getJsessionId();
+        
+        LOG.debug( "SENDING LISTEN FOR /topic/topicB" );
+        contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer("destination=topic://topicB&type=listen&message=handlerB") );
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
+        contentExchange.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        
+        // client 1 polls for messages
+        LOG.debug( "SENDING POLL" );
+        AjaxTestContentExchange poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        
+        // while client 1 is polling, client 2 sends messages to the topics
+        LOG.debug( "SENDING MESSAGES" );
+        contentExchange = new AjaxTestContentExchange();
+        contentExchange.setMethod( "POST" );
+        contentExchange.setURL("http://localhost:8080/amq");
+        contentExchange.setRequestContent( new ByteArrayBuffer(
+            "destination=topic://topicA&type=send&message=A1&"+
+            "d1=topic://topicB&t1=send&m1=B1&"+
+            "d2=topic://topicA&t2=send&m2=A2&"+
+            "d3=topic://topicB&t3=send&m3=B2"
+        ) );
+        contentExchange.setRequestContentType( "application/x-www-form-urlencoded; charset=UTF-8" );
+        httpClient.send(contentExchange);
+        contentExchange.waitForDone();
+        LOG.debug( "DONE POSTING MESSAGES" );
+        
+        // wait for poll to finish
+        poll.waitForDone();
+        String response = poll.getResponseContent();
+        
+        // not all messages might be delivered during the 1st poll.  We need to check again.
+        poll = new AjaxTestContentExchange();
+        poll.setMethod( "GET" );
+        poll.setURL("http://localhost:8080/amq?timeout=5000");
+        poll.setRequestHeader( "Cookie", jsessionid );
+        httpClient.send( poll );
+        poll.waitForDone();
+
+        String fullResponse = response + poll.getResponseContent();
+        LOG.debug( "full response " + fullResponse );
+        assertContains( "<response id='handlerA' destination='topic://topicA' >A1</response>\n", fullResponse );
+        assertContains( "<response id='handlerB' destination='topic://topicB' >B1</response>\n", fullResponse );
+        assertContains( "<response id='handlerA' destination='topic://topicA' >A2</response>\n", fullResponse );
+        assertContains( "<response id='handlerB' destination='topic://topicB' >B2</response>\n", fullResponse );
+        assertResponseCount( 4, fullResponse );
+     }
+    
 }

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java?rev=1072620&r1=1072619&r2=1072620&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java Sun Feb 20 16:09:28 2011
@@ -39,7 +39,7 @@ public class AjaxListener implements Mes
     private AjaxWebClient client;
     private long lastAccess;
     private Continuation continuation;
-    private LinkedList<Message> unconsumedMessages = new LinkedList<Message>();
+    private LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
 
     AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
         this.client = client;
@@ -55,10 +55,10 @@ public class AjaxListener implements Mes
         this.continuation = continuation;
     }
 
-    public LinkedList<Message> getUnconsumedMessages() {
-        return unconsumedMessages;
+    public LinkedList<UndeliveredAjaxMessage> getUndeliveredMessages() {
+        return undeliveredMessages;
     }
-
+    
     public synchronized void onMessageAvailable(MessageConsumer consumer) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("message for " + consumer + " continuation=" + continuation);
@@ -70,12 +70,11 @@ public class AjaxListener implements Mes
                 if( message != null ) {
                     if( continuation.isSuspended() ) {
                         LOG.debug( "Resuming suspended continuation " + continuation );
-                        continuation.setAttribute("message", message);
-                        continuation.setAttribute("consumer", consumer);
+                        continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage( message, consumer ) );
                         continuation.resume();
                     } else {
                         LOG.debug( "Message available, but continuation is already resumed.  Buffer for next time." );
-                        bufferMessageForDelivery( message );
+                        bufferMessageForDelivery( message, consumer );
                     }
                 }
             } catch (Exception e) {
@@ -91,17 +90,17 @@ public class AjaxListener implements Mes
         } else {
             try {
                 Message message = consumer.receive(10);
-                bufferMessageForDelivery( message );
+                bufferMessageForDelivery( message, consumer );
             } catch (Exception e) {
                 LOG.error("Error receiving message " + e, e);
             }
         }
     }
     
-    public void bufferMessageForDelivery( Message message ) {
+    public void bufferMessageForDelivery( Message message, MessageConsumer consumer ) {
         if( message != null ) {
-            synchronized( unconsumedMessages ) {
-                unconsumedMessages.addLast(message);
+            synchronized( undeliveredMessages ) {
+                undeliveredMessages.addLast( new UndeliveredAjaxMessage( message, consumer ) );
             }
         }
     }

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1072620&r1=1072619&r2=1072620&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java Sun Feb 20 16:09:28 2011
@@ -262,16 +262,23 @@ public class MessageListenerServlet exte
         if (LOG.isDebugEnabled()) {
             LOG.debug("doMessage timeout=" + timeout);
         }
-
-        Message message = null;
+        
         // this is non-null if we're resuming the continuation.
         // attributes set in AjaxListener
-        message = (Message)request.getAttribute("message"); 
+        UndeliveredAjaxMessage undelivered_message = null;
+        Message message = null;
+        undelivered_message = (UndeliveredAjaxMessage)request.getAttribute("undelivered_message");
+        if( undelivered_message != null ) {
+            message = (Message)undelivered_message.getMessage();
+        }
         
         synchronized (client) {
 
             List consumers = client.getConsumers();
-            MessageAvailableConsumer consumer = (MessageAvailableConsumer)request.getAttribute("consumer");
+            MessageAvailableConsumer consumer = null;
+            if( undelivered_message != null ) {
+                consumer = (MessageAvailableConsumer)undelivered_message.getConsumer();
+            }
 
             if (message == null) {
                 // Look for a message that is ready to go
@@ -293,7 +300,7 @@ public class MessageListenerServlet exte
             response.setContentType("text/xml");
             response.setHeader("Cache-Control", "no-cache");
             
-            if (message == null && client.getListener().getUnconsumedMessages().size() == 0) {
+            if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
                 Continuation continuation = ContinuationSupport.getContinuation(request);
                 
                 if (continuation.isExpired()) {
@@ -331,16 +338,38 @@ public class MessageListenerServlet exte
             Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
             response.setStatus(HttpServletResponse.SC_OK);
             writer.println("<ajax-response>");
-
+            
             // Send any message we already have
             if (message != null) {
                 String id = consumerIdMap.get(consumer);
                 String destinationName = consumerDestinationNameMap.get(consumer);
+                LOG.debug( "sending pre-existing message" );
                 writeMessageResponse(writer, message, id, destinationName);
                 
                 messages++;
             }
-
+            
+            // send messages buffered while continuation was unavailable.
+            LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
+            LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
+            synchronized( undeliveredMessages ) {
+                for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext(); ) {
+                    messages++;
+                    UndeliveredAjaxMessage undelivered = it.next();
+                    Message msg = (Message)undelivered.getMessage();
+                    consumer = (MessageAvailableConsumer)undelivered.getConsumer();
+                    String id = consumerIdMap.get(consumer);
+                    String destinationName = consumerDestinationNameMap.get(consumer);
+                    LOG.debug( "sending undelivered/buffered messages" );
+                    LOG.debug( "msg:" +msg+ ", id:" +id+ ", destinationName:" +destinationName);
+                    writeMessageResponse(writer, msg, id, destinationName);
+                    it.remove();
+                    if (messages >= maximumMessages) {
+                        break;
+                    }
+                }
+            }
+            
             // Send the rest of the messages
             for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
                 consumer = (MessageAvailableConsumer)consumers.get(i);
@@ -348,22 +377,6 @@ public class MessageListenerServlet exte
                     continue;
                 }
 
-                LinkedList<Message> unconsumedMessages = ((AjaxListener)consumer.getAvailableListener()).getUnconsumedMessages();
-                LOG.debug("Send " + unconsumedMessages.size() + " unconsumed messages");
-                synchronized( unconsumedMessages ) {
-                    for (Iterator<Message> it = unconsumedMessages.iterator(); it.hasNext(); ) {
-                        messages++;
-                        Message msg = it.next();
-                        String id = consumerIdMap.get(consumer);
-                        String destinationName = consumerDestinationNameMap.get(consumer);
-                        writeMessageResponse(writer, msg, id, destinationName);
-                        it.remove();
-                        if (messages >= maximumMessages) {
-                            break;
-                        }
-                    }
-                }
-
                 // Look for any available messages
                 while (messages < maximumMessages) {
                     message = consumer.receiveNoWait();
@@ -373,6 +386,7 @@ public class MessageListenerServlet exte
                     messages++;
                     String id = consumerIdMap.get(consumer);
                     String destinationName = consumerDestinationNameMap.get(consumer);
+                    LOG.debug( "sending final available messages" );
                     writeMessageResponse(writer, message, id, destinationName);
                 }
             }

Added: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/UndeliveredAjaxMessage.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/UndeliveredAjaxMessage.java?rev=1072620&view=auto
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/UndeliveredAjaxMessage.java (added)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/UndeliveredAjaxMessage.java Sun Feb 20 16:09:28 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.web;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+
+class UndeliveredAjaxMessage {
+    private Message message;
+    private MessageConsumer consumer;
+    
+    UndeliveredAjaxMessage( Message message, MessageConsumer consumer ) {
+        this.message = message;
+        this.consumer = consumer;
+    }
+    
+    public MessageConsumer getConsumer() {
+        return this.consumer;
+    }
+    
+    public Message getMessage() {
+        return this.message;
+    }
+}