You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/06/19 18:31:36 UTC

svn commit: r1494681 - in /activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web: AjaxListener.java MessageListenerServlet.java

Author: tabish
Date: Wed Jun 19 16:31:36 2013
New Revision: 1494681

URL: http://svn.apache.org/r1494681
Log:
https://issues.apache.org/jira/browse/AMQ-4589

Apply patch to work around known race condition. 

Modified:
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java?rev=1494681&r1=1494680&r2=1494681&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java Wed Jun 19 16:31:36 2013
@@ -16,30 +16,27 @@
  */
 package org.apache.activemq.web;
 
+import java.util.LinkedList;
+
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 
+import org.apache.activemq.MessageAvailableListener;
 import org.eclipse.jetty.continuation.Continuation;
-import org.eclipse.jetty.continuation.ContinuationSupport;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.activemq.MessageAvailableListener;
-
-import java.util.LinkedList;
-
 /*
  * Listen for available messages and wakeup any continuations.
  */
 public class AjaxListener implements MessageAvailableListener {
     private static final Logger LOG = LoggerFactory.getLogger(AjaxListener.class);
-    
-    private long maximumReadTimeout;
-    private AjaxWebClient client;
+
+    private final long maximumReadTimeout;
+    private final AjaxWebClient client;
     private long lastAccess;
     private Continuation continuation;
-    private LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
+    private final LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
 
     AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
         this.client = client;
@@ -58,7 +55,8 @@ public class AjaxListener implements Mes
     public LinkedList<UndeliveredAjaxMessage> getUndeliveredMessages() {
         return undeliveredMessages;
     }
-    
+
+    @Override
     public synchronized void onMessageAvailable(MessageConsumer consumer) {
         if (LOG.isDebugEnabled()) {
             LOG.debug("message for " + consumer + " continuation=" + continuation);
@@ -66,23 +64,24 @@ public class AjaxListener implements Mes
         if (continuation != null) {
             try {
                 Message message = consumer.receive(10);
-                LOG.debug( "message is " + message );
-                if( message != null ) {
-                    if( continuation.isSuspended() ) {
-                        LOG.debug( "Resuming suspended continuation " + continuation );
-                        continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage( message, consumer ) );
+                LOG.debug("message is " + message);
+                if (message != null) {
+                    if (!continuation.isResumed()) {
+                        LOG.debug("Resuming suspended continuation " + continuation);
+                        continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage(message, consumer));
                         continuation.resume();
                     } else {
-                        LOG.debug( "Message available, but continuation is already resumed.  Buffer for next time." );
-                        bufferMessageForDelivery( message, consumer );
+                        LOG.debug("Message available, but continuation is already resumed.  Buffer for next time.");
+                        bufferMessageForDelivery(message, consumer);
                     }
                 }
             } catch (Exception e) {
                 LOG.error("Error receiving message " + e, e);
             }
-            
+
         } else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout) {
             new Thread() {
+                @Override
                 public void run() {
                     client.closeConsumers();
                 };
@@ -90,17 +89,17 @@ public class AjaxListener implements Mes
         } else {
             try {
                 Message message = consumer.receive(10);
-                bufferMessageForDelivery( message, consumer );
+                bufferMessageForDelivery(message, consumer);
             } catch (Exception e) {
                 LOG.error("Error receiving message " + e, e);
             }
         }
     }
-    
-    public void bufferMessageForDelivery( Message message, MessageConsumer consumer ) {
-        if( message != null ) {
-            synchronized( undeliveredMessages ) {
-                undeliveredMessages.addLast( new UndeliveredAjaxMessage( message, consumer ) );
+
+    public void bufferMessageForDelivery(Message message, MessageConsumer consumer) {
+        if (message != null) {
+            synchronized (undeliveredMessages) {
+                undeliveredMessages.addLast(new UndeliveredAjaxMessage(message, consumer));
             }
         }
     }

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1494681&r1=1494680&r2=1494681&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java Wed Jun 19 16:31:36 2013
@@ -313,20 +313,20 @@ public class MessageListenerServlet exte
                 Continuation continuation = ContinuationSupport.getContinuation(request);
 
                 // Add a listener to the continuation to make sure it actually
-                // will expire (seems like a bug in Jetty Servlet 3 continuations, 
+                // will expire (seems like a bug in Jetty Servlet 3 continuations,
                 // see https://issues.apache.org/jira/browse/AMQ-3447
                 continuation.addContinuationListener(new ContinuationListener() {
                     @Override
                     public void onTimeout(Continuation cont) {
                         if (LOG.isDebugEnabled()) {
-                             LOG.debug("Continuation " + cont.toString() + " expired.");
+                            LOG.debug("Continuation " + cont.toString() + " expired.");
                         }
                     }
 
                     @Override
                     public void onComplete(Continuation cont) {
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("Continuation " + cont.toString() + " completed.");
+                           LOG.debug("Continuation " + cont.toString() + " completed.");
                         }
                     }
                 });
@@ -381,7 +381,7 @@ public class MessageListenerServlet exte
             LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
             LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
             synchronized( undeliveredMessages ) {
-                for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext(); ) {
+                for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext();) {
                     messages++;
                     UndeliveredAjaxMessage undelivered = it.next();
                     Message msg = undelivered.getMessage();
@@ -425,7 +425,6 @@ public class MessageListenerServlet exte
             String m = swriter.toString();
             response.getWriter().println(m);
         }
-
     }
 
     protected void writeMessageResponse(PrintWriter writer, Message message, String id, String destinationName) throws JMSException, IOException {