You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/05/11 17:37:44 UTC

svn commit: r943146 - in /activemq/trunk: activemq-optional/src/test/java/org/apache/activemq/transport/http/ activemq-web-demo/src/test/java/org/apache/activemq/web/ activemq-web/src/main/java/org/apache/activemq/web/

Author: dejanb
Date: Tue May 11 15:37:44 2010
New Revision: 943146

URL: http://svn.apache.org/viewvc?rev=943146&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2728 - MessageServlet refactoring

Modified:
    activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java
    activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
    activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java

Modified: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java?rev=943146&r1=943145&r2=943146&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java (original)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java Tue May 11 15:37:44 2010
@@ -41,7 +41,7 @@ public class WaitForJettyListener {
                     socket.close();
                     canConnect = true;
                 } catch (Exception e) {
-                    LOG.warn("verify jettty available, failed to connect to " + url + e);
+                    LOG.warn("verify jetty available, failed to connect to " + url + e);
                 }
                 return canConnect;
             }}, 60 * 1000));

Modified: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java?rev=943146&r1=943145&r2=943146&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java (original)
+++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java Tue May 11 15:37:44 2010
@@ -1,20 +1,28 @@
 package org.apache.activemq.web;
 
+import java.net.Socket;
+import java.net.URL;
+
 import javax.jms.Connection;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.net.SocketFactory;
+
+import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.server.nio.SelectChannelConnector;
 import org.eclipse.jetty.webapp.WebAppContext;
 
-import junit.framework.TestCase;
-
 public class JettyTestSupport extends TestCase {
-
+    private static final Log LOG = LogFactory.getLog(JettyTestSupport.class);
+    
     BrokerService broker;
     Server server;
     ActiveMQConnectionFactory factory;
@@ -43,7 +51,8 @@ public class JettyTestSupport extends Te
         server.setConnectors(new Connector[] {
             connector
         });
-        server.start();   
+        server.start();
+        waitForJettySocketToAccept("http://localhost:8080");
         
         factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
         connection = factory.createConnection();
@@ -60,6 +69,21 @@ public class JettyTestSupport extends Te
         connection.close();
     }
 
-    
+    public void waitForJettySocketToAccept(String bindLocation) throws Exception {
+        final URL url = new URL(bindLocation);
+        assertTrue("Jetty endpoint is available", Wait.waitFor(new Wait.Condition() {
+
+            public boolean isSatisified() throws Exception {
+                boolean canConnect = false;
+                try {
+                    Socket socket = SocketFactory.getDefault().createSocket(url.getHost(), url.getPort());
+                    socket.close();
+                    canConnect = true;
+                } catch (Exception e) {
+                    LOG.warn("verify jetty available, failed to connect to " + url + e);
+                }
+                return canConnect;
+            }}, 60 * 1000));
+    }
     
 }

Modified: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java?rev=943146&r1=943145&r2=943146&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java (original)
+++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java Tue May 11 15:37:44 2010
@@ -2,13 +2,17 @@ package org.apache.activemq.web;
 
 import javax.jms.TextMessage;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.eclipse.jetty.client.ContentExchange;
 import org.eclipse.jetty.client.HttpClient;
 
 public class RestTest extends JettyTestSupport {
+    private static final Log LOG = LogFactory.getLog(RestTest.class);
 	
 	public void testConsume() throws Exception {
 	    producer.send(session.createTextMessage("test"));
+	    LOG.info("message sent");
 	    
 	    HttpClient httpClient = new HttpClient();
         httpClient.start();
@@ -17,18 +21,36 @@ public class RestTest extends JettyTestS
         contentExchange.setURL("http://localhost:8080/message/test?readTimeout=1000&type=queue");
         httpClient.send(contentExchange);
         contentExchange.waitForDone();
-        assertEquals("test", contentExchange.getResponseContent());
-	    
+        assertEquals("test", contentExchange.getResponseContent());	    
+	}
+	
+	public void testSubscribeFirst() throws Exception {
+        HttpClient httpClient = new HttpClient();
+        httpClient.start();
+        ContentExchange contentExchange = new ContentExchange();
+        httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+        contentExchange.setURL("http://localhost:8080/message/test?readTimeout=5000&type=queue");
+        httpClient.send(contentExchange);
+        
+        Thread.sleep(1000);
+        
+        producer.send(session.createTextMessage("test"));
+        LOG.info("message sent");
+        
+        contentExchange.waitForDone();
+        assertEquals("test", contentExchange.getResponseContent());	    
 	}
 	
 	public void testSelector() throws Exception {
 	    TextMessage msg1 = session.createTextMessage("test1");
 	    msg1.setIntProperty("test", 1);
 	    producer.send(msg1);
+	    LOG.info("message 1 sent");
 	    
 	    TextMessage msg2 = session.createTextMessage("test2");
 	    msg2.setIntProperty("test", 2);
 	    producer.send(msg2);
+	    LOG.info("message 2 sent");
 	    
         HttpClient httpClient = new HttpClient();
         httpClient.start();

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java?rev=943146&r1=943145&r2=943146&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java Tue May 11 15:37:44 2010
@@ -162,111 +162,125 @@ public class MessageServlet extends Mess
      * @throws IOException
      */
     protected void doMessages(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException {
-
-        int messages = 0;
         try {
             WebClient client = getWebClient(request);
             Destination destination = getDestination(client, request);
             if (destination == null) {
                 throw new NoDestinationSuppliedException();
             }
-            long timeout = getReadTimeout(request);
-            boolean ajax = isRicoAjax(request);
-            if (!ajax) {
-                maxMessages = 1;
+            MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
+            Message message = null;
+            message = (Message)request.getAttribute("message"); 
+            if (message != null) {
+                // we're resuming continuation,
+                // so just write the message and return
+                writeResponse(request, response, maxMessages, message, consumer);
+                return;
             }
+            long timeout = getReadTimeout(request);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
             }
 
-            MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
             Continuation continuation = null;
             Listener listener = null;
-            Message message = null;
+            
 
-            synchronized (consumer) {
+            // Look for any available messages
+            message = consumer.receive(10);
+
+            // Get an existing Continuation or create a new one if there are
+            // no events.
+            if (message == null) {
+                continuation = ContinuationSupport.getContinuation(request);
+                
+                if (continuation.isExpired()) {
+                    response.setStatus(isRicoAjax(request) ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
+                    return;
+                }
+
+                continuation.setTimeout(timeout);
+                continuation.suspend();
+                
                 // Fetch the listeners
                 listener = (Listener)consumer.getAvailableListener();
                 if (listener == null) {
                     listener = new Listener(consumer);
                     consumer.setAvailableListener(listener);
                 }
-                // Look for any available messages
-                message = consumer.receiveNoWait();
 
-                // Get an existing Continuation or create a new one if there are
-                // no events.
-                if (message == null) {
-                    continuation = ContinuationSupport.getContinuation(request);
-
-                    // register this continuation with our listener.
-                    listener.setContinuation(continuation);
-
-                    // Get the continuation object (may wait and/or retry
-                    // request here).
-                    continuation.setTimeout(timeout);
-                    continuation.suspend();
-                }
+                // register this continuation with our listener.
+                listener.setContinuation(continuation);
+            }
 
-                // Try again now
-                if (message == null) {
-                    message = consumer.receiveNoWait();
-                }
+            writeResponse(request, response, maxMessages, message, consumer);
+        } catch (JMSException e) {
+            throw new ServletException("Could not post JMS message: " + e, e);
+        }
+    }
+    
+    protected void writeResponse(HttpServletRequest request, HttpServletResponse response, int maxMessages, Message message, MessageAvailableConsumer consumer) throws IOException, JMSException {
+        int messages = 0;
+        try {
+            boolean ajax = isRicoAjax(request);
+            if (!ajax) {
+                maxMessages = 1;
+            }
 
-                // write a responds
-                response.setContentType("text/xml");
-                PrintWriter writer = response.getWriter();
+            // write a responds
+            response.setContentType("text/xml");
+            PrintWriter writer = response.getWriter();
 
-                if (ajax) {
-                    writer.println("<ajax-response>");
+            if (ajax) {
+                writer.println("<ajax-response>");
+            }
+
+            // handle any message(s)
+            if (message == null) {
+                // No messages so OK response of for ajax else no content.
+                response.setStatus(ajax ? HttpServletResponse.SC_OK
+                        : HttpServletResponse.SC_NO_CONTENT);
+            } else {
+                // We have at least one message so set up the response
+                response.setStatus(HttpServletResponse.SC_OK);
+                String type = getContentType(request);
+                if (type != null) {
+                    response.setContentType(type);
                 }
 
-                // handle any message(s)
-                if (message == null) {
-                    // No messages so OK response of for ajax else no content.
-                    response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
-                } else {
-                    // We have at least one message so set up the response
-                    response.setStatus(HttpServletResponse.SC_OK);
-                    String type = getContentType(request);
-                    if (type != null) {
-                        response.setContentType(type);
+                // send a response for each available message (up to max
+                // messages)
+                while ((maxMessages < 0 || messages < maxMessages)
+                        && message != null) {
+                    if (ajax) {
+                        writer.print("<response type='object' id='");
+                        writer.print(request.getParameter("id"));
+                        writer.println("'>");
+                    } else {
+                        // only ever 1 message for non ajax!
+                        setResponseHeaders(response, message);
                     }
 
-                    // send a response for each available message (up to max
-                    // messages)
-                    while ((maxMessages < 0 || messages < maxMessages) && message != null) {
-                        if (ajax) {
-                            writer.print("<response type='object' id='");
-                            writer.print(request.getParameter("id"));
-                            writer.println("'>");
-                        } else {
-                            // only ever 1 message for non ajax!
-                            setResponseHeaders(response, message);
-                        }
-
-                        writeMessageResponse(writer, message);
+                    writeMessageResponse(writer, message);
 
-                        if (ajax) {
-                            writer.println("</response>");
-                        }
+                    if (ajax) {
+                        writer.println("</response>");
+                    }
 
-                        // look for next message
-                        messages++;
-                        if(maxMessages < 0 || messages < maxMessages) {
-                        	message = consumer.receiveNoWait();
-                        }
+                    // look for next message
+                    messages++;
+                    if (maxMessages < 0 || messages < maxMessages) {
+                        message = consumer.receiveNoWait();
                     }
                 }
+            }
 
-                if (ajax) {
-                    writer.println("<response type='object' id='poll'><ok/></response>");
-                    writer.println("</ajax-response>");
-                }
+            if (ajax) {
+                writer
+                        .println("<response type='object' id='poll'><ok/></response>");
+                writer.println("</ajax-response>");
             }
-        } catch (JMSException e) {
-            throw new ServletException("Could not post JMS message: " + e, e);
         } finally {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Received " + messages + " message(s)");
@@ -475,9 +489,14 @@ public class MessageServlet extends Mess
 
             synchronized (this.consumer) {
                 if (continuation != null) {
-                    continuation.resume();
+                    try {
+                        Message message = consumer.receiveNoWait();
+                        continuation.setAttribute("message", message);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                    continuation.resume();   
                 }
-                continuation = null;
             }
         }
     }