You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/16 16:20:04 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-4938
Updated Branches:
refs/heads/trunk e56c062f2 -> a2ede974b
https://issues.apache.org/jira/browse/AMQ-4938
apply patch with cleanups
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a2ede974
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a2ede974
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a2ede974
Branch: refs/heads/trunk
Commit: a2ede974b95261885404a02f32a918dbd9213f51
Parents: e56c062
Author: Timothy Bish <ta...@gmai.com>
Authored: Thu Jan 16 10:19:10 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Thu Jan 16 10:19:41 2014 -0500
----------------------------------------------------------------------
.../org/apache/activemq/web/MessageServlet.java | 199 +++++++++++++------
1 file changed, 134 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a2ede974/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
----------------------------------------------------------------------
diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
index 3c0dad2..63056e9 100644
--- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
+++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.HashSet;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -42,11 +43,18 @@ import org.slf4j.LoggerFactory;
/**
* A servlet for sending and receiving messages to/from JMS destinations using
- * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
- * destination and whether it is a topic or queue via configuration details on
- * the servlet or as request parameters. <p/> For reading messages you can
- * specify a readTimeout parameter to determine how long the servlet should
- * block for.
+ * HTTP POST for sending and HTTP GET for receiving.
+ * <p/>
+ * You can specify the destination and whether it is a topic or queue via
+ * configuration details on the servlet or as request parameters.
+ * <p/>
+ * For reading messages you can specify a readTimeout parameter to determine how
+ * long the servlet should block for.
+ *
+ * One thing to keep in mind with this solution - due to the nature of REST,
+ * there will always be a chance of losing messages. Consider what happens when
+ * a message is retrieved from the broker but the web call is interrupted before
+ * the client receives the message in the response - the message is lost.
*/
public class MessageServlet extends MessageServletSupport {
@@ -59,12 +67,15 @@ public class MessageServlet extends MessageServletSupport {
private static final Logger LOG = LoggerFactory.getLogger(MessageServlet.class);
private final String readTimeoutParameter = "readTimeout";
+ private final String readTimeoutRequestAtt = "xamqReadDeadline";
+ private final String oneShotParameter = "oneShot";
private long defaultReadTimeout = -1;
private long maximumReadTimeout = 20000;
private long requestTimeout = 1000;
private String defaultContentType = "application/xml";
private final HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
+ private final HashSet<MessageAvailableConsumer> activeConsumers = new HashSet<MessageAvailableConsumer>();
@Override
public void init() throws ServletException {
@@ -144,7 +155,7 @@ public class MessageServlet extends MessageServletSupport {
}
/**
- * Supports a HTTP DELETE to be equivlanent of consuming a singe message
+ * Supports a HTTP DELETE to be equivalent of consuming a singe message
* from a queue
*/
@Override
@@ -153,7 +164,7 @@ public class MessageServlet extends MessageServletSupport {
}
/**
- * Supports a HTTP DELETE to be equivlanent of consuming a singe message
+ * Supports a HTTP DELETE to be equivalent of consuming a singe message
* from a queue
*/
@Override
@@ -170,69 +181,115 @@ public class MessageServlet extends MessageServletSupport {
* @throws IOException
*/
protected void doMessages(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+ MessageAvailableConsumer consumer = null;
+
try {
WebClient client = getWebClient(request);
Destination destination = getDestination(client, request);
if (destination == null) {
throw new NoDestinationSuppliedException();
}
- MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
+ consumer = (MessageAvailableConsumer) client.getConsumer(destination, request.getHeader(WebClient.selectorName));
+ Continuation continuation = ContinuationSupport.getContinuation(request);
+
+ // Don't allow concurrent use of the consumer. Do make sure to allow
+ // subsequent calls on continuation to use the consumer.
+ if (continuation.isInitial()) {
+ synchronized (activeConsumers) {
+ if (activeConsumers.contains(consumer)) {
+ throw new ServletException("Concurrent access to consumer is not supported");
+ } else {
+ activeConsumers.add(consumer);
+ }
+ }
+ }
+
Message message = null;
- message = (Message)request.getAttribute("message");
- if (message != null) {
- // we're resuming continuation,
- // so just write the message and return
- writeResponse(request, response, message);
- return;
+
+ long deadline = getReadDeadline(request);
+ long timeout = deadline - System.currentTimeMillis();
+
+ // Set the message available listener *before* calling receive to eliminate any
+ // chance of a missed notification between the time receive() completes without
+ // a message and the time the listener is set.
+ synchronized (consumer) {
+ Listener listener = (Listener) consumer.getAvailableListener();
+ if (listener == null) {
+ listener = new Listener(consumer);
+ consumer.setAvailableListener(listener);
+ }
}
- long timeout = getReadTimeout(request);
if (LOG.isDebugEnabled()) {
LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
}
- Continuation continuation = null;
- Listener listener = null;
-
- // Look for any available messages (need a little timeout)
- message = consumer.receive(10);
+ // Look for any available messages (need a little timeout). Always
+ // try at least one lookup; don't block past the deadline.
+ if (timeout <= 0) {
+ message = consumer.receiveNoWait();
+ } else if (timeout < 10) {
+ message = consumer.receive(timeout);
+ } else {
+ message = consumer.receive(10);
+ }
- // Get an existing Continuation or create a new one if there are
- // no events.
if (message == null) {
- continuation = ContinuationSupport.getContinuation(request);
+ handleContinuation(request, response, client, destination, consumer, deadline);
+ } else {
+ writeResponse(request, response, message);
+ closeConsumerOnOneShot(request, client, destination);
- if (continuation.isExpired()) {
- response.setStatus(HttpServletResponse.SC_NO_CONTENT);
- return;
+ synchronized (activeConsumers) {
+ activeConsumers.remove(consumer);
}
+ }
+ } catch (JMSException e) {
+ throw new ServletException("Could not post JMS message: " + e, e);
+ }
+ }
- continuation.setTimeout(timeout);
- continuation.suspend();
+ protected void handleContinuation(HttpServletRequest request, HttpServletResponse response, WebClient client, Destination destination,
+ MessageAvailableConsumer consumer, long deadline) {
+ // Get an existing Continuation or create a new one if there are no events.
+ Continuation continuation = ContinuationSupport.getContinuation(request);
- // Fetch the listeners
- listener = (Listener)consumer.getAvailableListener();
- if (listener == null) {
- listener = new Listener(consumer);
- consumer.setAvailableListener(listener);
+ long timeout = deadline - System.currentTimeMillis();
+ if ((continuation.isExpired()) || (timeout <= 0)) {
+ // Reset the continuation on the available listener for the consumer to prevent the
+ // next message receipt from being consumed without a valid, active continuation.
+ synchronized (consumer) {
+ Object obj = consumer.getAvailableListener();
+ if (obj instanceof Listener) {
+ ((Listener) obj).setContinuation(null);
}
-
- // register this continuation with our listener.
- listener.setContinuation(continuation);
}
+ response.setStatus(HttpServletResponse.SC_NO_CONTENT);
+ closeConsumerOnOneShot(request, client, destination);
+ synchronized (activeConsumers) {
+ activeConsumers.remove(consumer);
+ }
+ return;
+ }
- writeResponse(request, response, message);
- } catch (JMSException e) {
- throw new ServletException("Could not post JMS message: " + e, e);
+ continuation.setTimeout(timeout);
+ continuation.suspend();
+
+ synchronized (consumer) {
+ Listener listener = (Listener) consumer.getAvailableListener();
+
+ // register this continuation with our listener.
+ listener.setContinuation(continuation);
}
}
protected void writeResponse(HttpServletRequest request, HttpServletResponse response, Message message) throws IOException, JMSException {
int messages = 0;
try {
- response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // HTTP 1.1
- response.setHeader("Pragma", "no-cache"); // HTTP 1.0
- response.setDateHeader("Expires", 0);
+ response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // HTTP
+ // 1.1
+ response.setHeader("Pragma", "no-cache"); // HTTP 1.0
+ response.setDateHeader("Expires", 0);
// write a responds
PrintWriter writer = response.getWriter();
@@ -269,7 +326,7 @@ public class MessageServlet extends MessageServletSupport {
protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException {
if (message instanceof TextMessage) {
- TextMessage textMsg = (TextMessage)message;
+ TextMessage textMsg = (TextMessage) message;
String txt = textMsg.getText();
if (txt != null) {
if (txt.startsWith("<?")) {
@@ -278,7 +335,7 @@ public class MessageServlet extends MessageServletSupport {
writer.print(txt);
}
} else if (message instanceof ObjectMessage) {
- ObjectMessage objectMsg = (ObjectMessage)message;
+ ObjectMessage objectMsg = (ObjectMessage) message;
Object object = objectMsg.getObject();
if (object != null) {
writer.print(object.toString());
@@ -288,7 +345,7 @@ public class MessageServlet extends MessageServletSupport {
protected boolean isXmlContent(Message message) throws JMSException {
if (message instanceof TextMessage) {
- TextMessage textMsg = (TextMessage)message;
+ TextMessage textMsg = (TextMessage) message;
String txt = textMsg.getText();
if (txt != null) {
// assume its xml when it starts with <
@@ -304,7 +361,7 @@ public class MessageServlet extends MessageServletSupport {
public WebClient getWebClient(HttpServletRequest request) {
String clientId = request.getParameter("clientId");
if (clientId != null) {
- synchronized(this) {
+ synchronized (this) {
LOG.debug("Getting local client [" + clientId + "]");
WebClient client = clients.get(clientId);
if (client == null) {
@@ -338,9 +395,9 @@ public class MessageServlet extends MessageServletSupport {
response.setHeader("id", message.getJMSMessageID());
// Return JMS properties as header values.
- for(Enumeration names = message.getPropertyNames(); names.hasMoreElements();) {
+ for (Enumeration names = message.getPropertyNames(); names.hasMoreElements();) {
String name = (String) names.nextElement();
- response.setHeader(name , message.getObjectProperty(name).toString());
+ response.setHeader(name, message.getObjectProperty(name).toString());
}
}
@@ -348,17 +405,37 @@ public class MessageServlet extends MessageServletSupport {
* @return the timeout value for read requests which is always >= 0 and <=
* maximumReadTimeout to avoid DoS attacks
*/
- protected long getReadTimeout(HttpServletRequest request) {
- long answer = defaultReadTimeout;
+ protected long getReadDeadline(HttpServletRequest request) {
+ Long answer;
- String name = request.getParameter(readTimeoutParameter);
- if (name != null) {
- answer = asLong(name);
+ answer = (Long) request.getAttribute(readTimeoutRequestAtt);
+
+ if (answer == null) {
+ long timeout = defaultReadTimeout;
+ String name = request.getParameter(readTimeoutParameter);
+ if (name != null) {
+ timeout = asLong(name);
+ }
+ if (timeout < 0 || timeout > maximumReadTimeout) {
+ timeout = maximumReadTimeout;
+ }
+
+ answer = Long.valueOf(System.currentTimeMillis() + timeout);
}
- if (answer < 0 || answer > maximumReadTimeout) {
- answer = maximumReadTimeout;
+ return answer.longValue();
+ }
+
+ /**
+ * Close the consumer if one-shot mode is used on the given request.
+ */
+ protected void closeConsumerOnOneShot(HttpServletRequest request, WebClient client, Destination dest) {
+ if (asBoolean(request.getParameter(oneShotParameter), false)) {
+ try {
+ client.closeConsumer(dest);
+ } catch (JMSException jms_exc) {
+ LOG.warn("JMS exception on closing consumer after request with one-shot mode", jms_exc);
+ }
}
- return answer;
}
/*
@@ -386,17 +463,9 @@ public class MessageServlet extends MessageServletSupport {
synchronized (this.consumer) {
if (continuation != null) {
- try {
- Message message = consumer.receiveNoWait();
- continuation.setAttribute("message", message);
- } catch (Exception e) {
- LOG.warn("Error receiving message due " + e.getMessage() + ". This exception is ignored.", e);
- } finally {
- continuation.resume();
- }
+ continuation.resume();
}
}
}
}
-
}