You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2010/05/11 17:37:44 UTC
svn commit: r943146 - in /activemq/trunk:
activemq-optional/src/test/java/org/apache/activemq/transport/http/
activemq-web-demo/src/test/java/org/apache/activemq/web/
activemq-web/src/main/java/org/apache/activemq/web/
Author: dejanb
Date: Tue May 11 15:37:44 2010
New Revision: 943146
URL: http://svn.apache.org/viewvc?rev=943146&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2728 - MessageServlet refactoring
Modified:
activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
Modified: activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java?rev=943146&r1=943145&r2=943146&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java (original)
+++ activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/transport/http/WaitForJettyListener.java Tue May 11 15:37:44 2010
@@ -41,7 +41,7 @@ public class WaitForJettyListener {
socket.close();
canConnect = true;
} catch (Exception e) {
- LOG.warn("verify jettty available, failed to connect to " + url + e);
+ LOG.warn("verify jetty available, failed to connect to " + url + e);
}
return canConnect;
}}, 60 * 1000));
Modified: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java?rev=943146&r1=943145&r2=943146&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java (original)
+++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/JettyTestSupport.java Tue May 11 15:37:44 2010
@@ -1,20 +1,28 @@
package org.apache.activemq.web;
+import java.net.Socket;
+import java.net.URL;
+
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.net.SocketFactory;
+
+import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.webapp.WebAppContext;
-import junit.framework.TestCase;
-
public class JettyTestSupport extends TestCase {
-
+ private static final Log LOG = LogFactory.getLog(JettyTestSupport.class);
+
BrokerService broker;
Server server;
ActiveMQConnectionFactory factory;
@@ -43,7 +51,8 @@ public class JettyTestSupport extends Te
server.setConnectors(new Connector[] {
connector
});
- server.start();
+ server.start();
+ waitForJettySocketToAccept("http://localhost:8080");
factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
connection = factory.createConnection();
@@ -60,6 +69,21 @@ public class JettyTestSupport extends Te
connection.close();
}
-
+ public void waitForJettySocketToAccept(String bindLocation) throws Exception {
+ final URL url = new URL(bindLocation);
+ assertTrue("Jetty endpoint is available", Wait.waitFor(new Wait.Condition() {
+
+ public boolean isSatisified() throws Exception {
+ boolean canConnect = false;
+ try {
+ Socket socket = SocketFactory.getDefault().createSocket(url.getHost(), url.getPort());
+ socket.close();
+ canConnect = true;
+ } catch (Exception e) {
+ LOG.warn("verify jetty available, failed to connect to " + url + e);
+ }
+ return canConnect;
+ }}, 60 * 1000));
+ }
}
Modified: activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java?rev=943146&r1=943145&r2=943146&view=diff
==============================================================================
--- activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java (original)
+++ activemq/trunk/activemq-web-demo/src/test/java/org/apache/activemq/web/RestTest.java Tue May 11 15:37:44 2010
@@ -2,13 +2,17 @@ package org.apache.activemq.web;
import javax.jms.TextMessage;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.eclipse.jetty.client.ContentExchange;
import org.eclipse.jetty.client.HttpClient;
public class RestTest extends JettyTestSupport {
+ private static final Log LOG = LogFactory.getLog(RestTest.class);
public void testConsume() throws Exception {
producer.send(session.createTextMessage("test"));
+ LOG.info("message sent");
HttpClient httpClient = new HttpClient();
httpClient.start();
@@ -17,18 +21,36 @@ public class RestTest extends JettyTestS
contentExchange.setURL("http://localhost:8080/message/test?readTimeout=1000&type=queue");
httpClient.send(contentExchange);
contentExchange.waitForDone();
- assertEquals("test", contentExchange.getResponseContent());
-
+ assertEquals("test", contentExchange.getResponseContent());
+ }
+
+ public void testSubscribeFirst() throws Exception {
+ HttpClient httpClient = new HttpClient();
+ httpClient.start();
+ ContentExchange contentExchange = new ContentExchange();
+ httpClient.setConnectorType(HttpClient.CONNECTOR_SELECT_CHANNEL);
+ contentExchange.setURL("http://localhost:8080/message/test?readTimeout=5000&type=queue");
+ httpClient.send(contentExchange);
+
+ Thread.sleep(1000);
+
+ producer.send(session.createTextMessage("test"));
+ LOG.info("message sent");
+
+ contentExchange.waitForDone();
+ assertEquals("test", contentExchange.getResponseContent());
}
public void testSelector() throws Exception {
TextMessage msg1 = session.createTextMessage("test1");
msg1.setIntProperty("test", 1);
producer.send(msg1);
+ LOG.info("message 1 sent");
TextMessage msg2 = session.createTextMessage("test2");
msg2.setIntProperty("test", 2);
producer.send(msg2);
+ LOG.info("message 2 sent");
HttpClient httpClient = new HttpClient();
httpClient.start();
Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java?rev=943146&r1=943145&r2=943146&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java Tue May 11 15:37:44 2010
@@ -162,111 +162,125 @@ public class MessageServlet extends Mess
* @throws IOException
*/
protected void doMessages(HttpServletRequest request, HttpServletResponse response, int maxMessages) throws ServletException, IOException {
-
- int messages = 0;
try {
WebClient client = getWebClient(request);
Destination destination = getDestination(client, request);
if (destination == null) {
throw new NoDestinationSuppliedException();
}
- long timeout = getReadTimeout(request);
- boolean ajax = isRicoAjax(request);
- if (!ajax) {
- maxMessages = 1;
+ MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
+ Message message = null;
+ message = (Message)request.getAttribute("message");
+ if (message != null) {
+ // we're resuming continuation,
+ // so just write the message and return
+ writeResponse(request, response, maxMessages, message, consumer);
+ return;
}
+ long timeout = getReadTimeout(request);
if (LOG.isDebugEnabled()) {
LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
}
- MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
Continuation continuation = null;
Listener listener = null;
- Message message = null;
+
- synchronized (consumer) {
+ // Look for any available messages
+ message = consumer.receive(10);
+
+ // Get an existing Continuation or create a new one if there are
+ // no events.
+ if (message == null) {
+ continuation = ContinuationSupport.getContinuation(request);
+
+ if (continuation.isExpired()) {
+ response.setStatus(isRicoAjax(request) ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
+ return;
+ }
+
+ continuation.setTimeout(timeout);
+ continuation.suspend();
+
// Fetch the listeners
listener = (Listener)consumer.getAvailableListener();
if (listener == null) {
listener = new Listener(consumer);
consumer.setAvailableListener(listener);
}
- // Look for any available messages
- message = consumer.receiveNoWait();
- // Get an existing Continuation or create a new one if there are
- // no events.
- if (message == null) {
- continuation = ContinuationSupport.getContinuation(request);
-
- // register this continuation with our listener.
- listener.setContinuation(continuation);
-
- // Get the continuation object (may wait and/or retry
- // request here).
- continuation.setTimeout(timeout);
- continuation.suspend();
- }
+ // register this continuation with our listener.
+ listener.setContinuation(continuation);
+ }
- // Try again now
- if (message == null) {
- message = consumer.receiveNoWait();
- }
+ writeResponse(request, response, maxMessages, message, consumer);
+ } catch (JMSException e) {
+ throw new ServletException("Could not post JMS message: " + e, e);
+ }
+ }
+
+ protected void writeResponse(HttpServletRequest request, HttpServletResponse response, int maxMessages, Message message, MessageAvailableConsumer consumer) throws IOException, JMSException {
+ int messages = 0;
+ try {
+ boolean ajax = isRicoAjax(request);
+ if (!ajax) {
+ maxMessages = 1;
+ }
- // write a responds
- response.setContentType("text/xml");
- PrintWriter writer = response.getWriter();
+ // write a responds
+ response.setContentType("text/xml");
+ PrintWriter writer = response.getWriter();
- if (ajax) {
- writer.println("<ajax-response>");
+ if (ajax) {
+ writer.println("<ajax-response>");
+ }
+
+ // handle any message(s)
+ if (message == null) {
+ // No messages so OK response of for ajax else no content.
+ response.setStatus(ajax ? HttpServletResponse.SC_OK
+ : HttpServletResponse.SC_NO_CONTENT);
+ } else {
+ // We have at least one message so set up the response
+ response.setStatus(HttpServletResponse.SC_OK);
+ String type = getContentType(request);
+ if (type != null) {
+ response.setContentType(type);
}
- // handle any message(s)
- if (message == null) {
- // No messages so OK response of for ajax else no content.
- response.setStatus(ajax ? HttpServletResponse.SC_OK : HttpServletResponse.SC_NO_CONTENT);
- } else {
- // We have at least one message so set up the response
- response.setStatus(HttpServletResponse.SC_OK);
- String type = getContentType(request);
- if (type != null) {
- response.setContentType(type);
+ // send a response for each available message (up to max
+ // messages)
+ while ((maxMessages < 0 || messages < maxMessages)
+ && message != null) {
+ if (ajax) {
+ writer.print("<response type='object' id='");
+ writer.print(request.getParameter("id"));
+ writer.println("'>");
+ } else {
+ // only ever 1 message for non ajax!
+ setResponseHeaders(response, message);
}
- // send a response for each available message (up to max
- // messages)
- while ((maxMessages < 0 || messages < maxMessages) && message != null) {
- if (ajax) {
- writer.print("<response type='object' id='");
- writer.print(request.getParameter("id"));
- writer.println("'>");
- } else {
- // only ever 1 message for non ajax!
- setResponseHeaders(response, message);
- }
-
- writeMessageResponse(writer, message);
+ writeMessageResponse(writer, message);
- if (ajax) {
- writer.println("</response>");
- }
+ if (ajax) {
+ writer.println("</response>");
+ }
- // look for next message
- messages++;
- if(maxMessages < 0 || messages < maxMessages) {
- message = consumer.receiveNoWait();
- }
+ // look for next message
+ messages++;
+ if (maxMessages < 0 || messages < maxMessages) {
+ message = consumer.receiveNoWait();
}
}
+ }
- if (ajax) {
- writer.println("<response type='object' id='poll'><ok/></response>");
- writer.println("</ajax-response>");
- }
+ if (ajax) {
+ writer
+ .println("<response type='object' id='poll'><ok/></response>");
+ writer.println("</ajax-response>");
}
- } catch (JMSException e) {
- throw new ServletException("Could not post JMS message: " + e, e);
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug("Received " + messages + " message(s)");
@@ -475,9 +489,14 @@ public class MessageServlet extends Mess
synchronized (this.consumer) {
if (continuation != null) {
- continuation.resume();
+ try {
+ Message message = consumer.receiveNoWait();
+ continuation.setAttribute("message", message);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ continuation.resume();
}
- continuation = null;
}
}
}