You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/01/16 16:20:04 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4938

Updated Branches:
  refs/heads/trunk e56c062f2 -> a2ede974b


https://issues.apache.org/jira/browse/AMQ-4938

apply patch with cleanups

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a2ede974
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a2ede974
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a2ede974

Branch: refs/heads/trunk
Commit: a2ede974b95261885404a02f32a918dbd9213f51
Parents: e56c062
Author: Timothy Bish <ta...@gmai.com>
Authored: Thu Jan 16 10:19:10 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Thu Jan 16 10:19:41 2014 -0500

----------------------------------------------------------------------
 .../org/apache/activemq/web/MessageServlet.java | 199 +++++++++++++------
 1 file changed, 134 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a2ede974/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
----------------------------------------------------------------------
diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
index 3c0dad2..63056e9 100644
--- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
+++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServlet.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Enumeration;
 import java.util.HashMap;
+import java.util.HashSet;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -42,11 +43,18 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A servlet for sending and receiving messages to/from JMS destinations using
- * HTTP POST for sending and HTTP GET for receiving. <p/> You can specify the
- * destination and whether it is a topic or queue via configuration details on
- * the servlet or as request parameters. <p/> For reading messages you can
- * specify a readTimeout parameter to determine how long the servlet should
- * block for.
+ * HTTP POST for sending and HTTP GET for receiving.
+ * <p/>
+ * You can specify the destination and whether it is a topic or queue via
+ * configuration details on the servlet or as request parameters.
+ * <p/>
+ * For reading messages you can specify a readTimeout parameter to determine how
+ * long the servlet should block for.
+ *
+ * One thing to keep in mind with this solution - due to the nature of REST,
+ * there will always be a chance of losing messages. Consider what happens when
+ * a message is retrieved from the broker but the web call is interrupted before
+ * the client receives the message in the response - the message is lost.
  */
 public class MessageServlet extends MessageServletSupport {
 
@@ -59,12 +67,15 @@ public class MessageServlet extends MessageServletSupport {
     private static final Logger LOG = LoggerFactory.getLogger(MessageServlet.class);
 
     private final String readTimeoutParameter = "readTimeout";
+    private final String readTimeoutRequestAtt = "xamqReadDeadline";
+    private final String oneShotParameter = "oneShot";
     private long defaultReadTimeout = -1;
     private long maximumReadTimeout = 20000;
     private long requestTimeout = 1000;
     private String defaultContentType = "application/xml";
 
     private final HashMap<String, WebClient> clients = new HashMap<String, WebClient>();
+    private final HashSet<MessageAvailableConsumer> activeConsumers = new HashSet<MessageAvailableConsumer>();
 
     @Override
     public void init() throws ServletException {
@@ -144,7 +155,7 @@ public class MessageServlet extends MessageServletSupport {
     }
 
     /**
-     * Supports a HTTP DELETE to be equivlanent of consuming a singe message
+     * Supports a HTTP DELETE to be equivalent of consuming a singe message
      * from a queue
      */
     @Override
@@ -153,7 +164,7 @@ public class MessageServlet extends MessageServletSupport {
     }
 
     /**
-     * Supports a HTTP DELETE to be equivlanent of consuming a singe message
+     * Supports a HTTP DELETE to be equivalent of consuming a singe message
      * from a queue
      */
     @Override
@@ -170,69 +181,115 @@ public class MessageServlet extends MessageServletSupport {
      * @throws IOException
      */
     protected void doMessages(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+        MessageAvailableConsumer consumer = null;
+
         try {
             WebClient client = getWebClient(request);
             Destination destination = getDestination(client, request);
             if (destination == null) {
                 throw new NoDestinationSuppliedException();
             }
-            MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
+            consumer = (MessageAvailableConsumer) client.getConsumer(destination, request.getHeader(WebClient.selectorName));
+            Continuation continuation = ContinuationSupport.getContinuation(request);
+
+            // Don't allow concurrent use of the consumer. Do make sure to allow
+            // subsequent calls on continuation to use the consumer.
+            if (continuation.isInitial()) {
+                synchronized (activeConsumers) {
+                    if (activeConsumers.contains(consumer)) {
+                        throw new ServletException("Concurrent access to consumer is not supported");
+                    } else {
+                        activeConsumers.add(consumer);
+                    }
+                }
+            }
+
             Message message = null;
-            message = (Message)request.getAttribute("message");
-            if (message != null) {
-                // we're resuming continuation,
-                // so just write the message and return
-                writeResponse(request, response, message);
-                return;
+
+            long deadline = getReadDeadline(request);
+            long timeout = deadline - System.currentTimeMillis();
+
+            // Set the message available listener *before* calling receive to eliminate any
+            // chance of a missed notification between the time receive() completes without
+            // a message and the time the listener is set.
+            synchronized (consumer) {
+                Listener listener = (Listener) consumer.getAvailableListener();
+                if (listener == null) {
+                    listener = new Listener(consumer);
+                    consumer.setAvailableListener(listener);
+                }
             }
-            long timeout = getReadTimeout(request);
 
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Receiving message(s) from: " + destination + " with timeout: " + timeout);
             }
 
-            Continuation continuation = null;
-            Listener listener = null;
-
-            // Look for any available messages (need a little timeout)
-            message = consumer.receive(10);
+            // Look for any available messages (need a little timeout). Always
+            // try at least one lookup; don't block past the deadline.
+            if (timeout <= 0) {
+                message = consumer.receiveNoWait();
+            } else if (timeout < 10) {
+                message = consumer.receive(timeout);
+            } else {
+                message = consumer.receive(10);
+            }
 
-            // Get an existing Continuation or create a new one if there are
-            // no events.
             if (message == null) {
-                continuation = ContinuationSupport.getContinuation(request);
+                handleContinuation(request, response, client, destination, consumer, deadline);
+            } else {
+                writeResponse(request, response, message);
+                closeConsumerOnOneShot(request, client, destination);
 
-                if (continuation.isExpired()) {
-                    response.setStatus(HttpServletResponse.SC_NO_CONTENT);
-                    return;
+                synchronized (activeConsumers) {
+                    activeConsumers.remove(consumer);
                 }
+            }
+        } catch (JMSException e) {
+            throw new ServletException("Could not post JMS message: " + e, e);
+        }
+    }
 
-                continuation.setTimeout(timeout);
-                continuation.suspend();
+    protected void handleContinuation(HttpServletRequest request, HttpServletResponse response, WebClient client, Destination destination,
+        MessageAvailableConsumer consumer, long deadline) {
+        // Get an existing Continuation or create a new one if there are no events.
+        Continuation continuation = ContinuationSupport.getContinuation(request);
 
-                // Fetch the listeners
-                listener = (Listener)consumer.getAvailableListener();
-                if (listener == null) {
-                    listener = new Listener(consumer);
-                    consumer.setAvailableListener(listener);
+        long timeout = deadline - System.currentTimeMillis();
+        if ((continuation.isExpired()) || (timeout <= 0)) {
+            // Reset the continuation on the available listener for the consumer to prevent the
+            // next message receipt from being consumed without a valid, active continuation.
+            synchronized (consumer) {
+                Object obj = consumer.getAvailableListener();
+                if (obj instanceof Listener) {
+                    ((Listener) obj).setContinuation(null);
                 }
-
-                // register this continuation with our listener.
-                listener.setContinuation(continuation);
             }
+            response.setStatus(HttpServletResponse.SC_NO_CONTENT);
+            closeConsumerOnOneShot(request, client, destination);
+            synchronized (activeConsumers) {
+                activeConsumers.remove(consumer);
+            }
+            return;
+        }
 
-            writeResponse(request, response, message);
-        } catch (JMSException e) {
-            throw new ServletException("Could not post JMS message: " + e, e);
+        continuation.setTimeout(timeout);
+        continuation.suspend();
+
+        synchronized (consumer) {
+            Listener listener = (Listener) consumer.getAvailableListener();
+
+            // register this continuation with our listener.
+            listener.setContinuation(continuation);
         }
     }
 
     protected void writeResponse(HttpServletRequest request, HttpServletResponse response, Message message) throws IOException, JMSException {
         int messages = 0;
         try {
-        	response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // HTTP 1.1
-        	response.setHeader("Pragma", "no-cache"); // HTTP 1.0
-        	response.setDateHeader("Expires", 0);
+            response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // HTTP
+                                                                                        // 1.1
+            response.setHeader("Pragma", "no-cache"); // HTTP 1.0
+            response.setDateHeader("Expires", 0);
             // write a responds
             PrintWriter writer = response.getWriter();
 
@@ -269,7 +326,7 @@ public class MessageServlet extends MessageServletSupport {
 
     protected void writeMessageResponse(PrintWriter writer, Message message) throws JMSException, IOException {
         if (message instanceof TextMessage) {
-            TextMessage textMsg = (TextMessage)message;
+            TextMessage textMsg = (TextMessage) message;
             String txt = textMsg.getText();
             if (txt != null) {
                 if (txt.startsWith("<?")) {
@@ -278,7 +335,7 @@ public class MessageServlet extends MessageServletSupport {
                 writer.print(txt);
             }
         } else if (message instanceof ObjectMessage) {
-            ObjectMessage objectMsg = (ObjectMessage)message;
+            ObjectMessage objectMsg = (ObjectMessage) message;
             Object object = objectMsg.getObject();
             if (object != null) {
                 writer.print(object.toString());
@@ -288,7 +345,7 @@ public class MessageServlet extends MessageServletSupport {
 
     protected boolean isXmlContent(Message message) throws JMSException {
         if (message instanceof TextMessage) {
-            TextMessage textMsg = (TextMessage)message;
+            TextMessage textMsg = (TextMessage) message;
             String txt = textMsg.getText();
             if (txt != null) {
                 // assume its xml when it starts with <
@@ -304,7 +361,7 @@ public class MessageServlet extends MessageServletSupport {
     public WebClient getWebClient(HttpServletRequest request) {
         String clientId = request.getParameter("clientId");
         if (clientId != null) {
-            synchronized(this) {
+            synchronized (this) {
                 LOG.debug("Getting local client [" + clientId + "]");
                 WebClient client = clients.get(clientId);
                 if (client == null) {
@@ -338,9 +395,9 @@ public class MessageServlet extends MessageServletSupport {
         response.setHeader("id", message.getJMSMessageID());
 
         // Return JMS properties as header values.
-        for(Enumeration names = message.getPropertyNames(); names.hasMoreElements();) {
+        for (Enumeration names = message.getPropertyNames(); names.hasMoreElements();) {
             String name = (String) names.nextElement();
-            response.setHeader(name , message.getObjectProperty(name).toString());
+            response.setHeader(name, message.getObjectProperty(name).toString());
         }
     }
 
@@ -348,17 +405,37 @@ public class MessageServlet extends MessageServletSupport {
      * @return the timeout value for read requests which is always >= 0 and <=
      *         maximumReadTimeout to avoid DoS attacks
      */
-    protected long getReadTimeout(HttpServletRequest request) {
-        long answer = defaultReadTimeout;
+    protected long getReadDeadline(HttpServletRequest request) {
+        Long answer;
 
-        String name = request.getParameter(readTimeoutParameter);
-        if (name != null) {
-            answer = asLong(name);
+        answer = (Long) request.getAttribute(readTimeoutRequestAtt);
+
+        if (answer == null) {
+            long timeout = defaultReadTimeout;
+            String name = request.getParameter(readTimeoutParameter);
+            if (name != null) {
+                timeout = asLong(name);
+            }
+            if (timeout < 0 || timeout > maximumReadTimeout) {
+                timeout = maximumReadTimeout;
+            }
+
+            answer = Long.valueOf(System.currentTimeMillis() + timeout);
         }
-        if (answer < 0 || answer > maximumReadTimeout) {
-            answer = maximumReadTimeout;
+        return answer.longValue();
+    }
+
+    /**
+     * Close the consumer if one-shot mode is used on the given request.
+     */
+    protected void closeConsumerOnOneShot(HttpServletRequest request, WebClient client, Destination dest) {
+        if (asBoolean(request.getParameter(oneShotParameter), false)) {
+            try {
+                client.closeConsumer(dest);
+            } catch (JMSException jms_exc) {
+                LOG.warn("JMS exception on closing consumer after request with one-shot mode", jms_exc);
+            }
         }
-        return answer;
     }
 
     /*
@@ -386,17 +463,9 @@ public class MessageServlet extends MessageServletSupport {
 
             synchronized (this.consumer) {
                 if (continuation != null) {
-                    try {
-                        Message message = consumer.receiveNoWait();
-                        continuation.setAttribute("message", message);
-                    } catch (Exception e) {
-                        LOG.warn("Error receiving message due " + e.getMessage() + ". This exception is ignored.", e);
-                    } finally {
-                        continuation.resume();
-                    }
+                    continuation.resume();
                 }
             }
         }
     }
-
 }