You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/28 21:36:02 UTC

svn commit: r1177026 - /activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java

Author: tabish
Date: Wed Sep 28 19:36:02 2011
New Revision: 1177026

URL: http://svn.apache.org/viewvc?rev=1177026&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3514

Modified:
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1177026&r1=1177025&r2=1177026&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java Wed Sep 28 19:36:02 2011
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.activemq.web;
 
 import java.io.IOException;
@@ -35,7 +34,6 @@ import javax.servlet.http.HttpServletRes
 import javax.servlet.http.HttpSession;
 
 import org.apache.activemq.MessageAvailableConsumer;
-import org.apache.activemq.MessageAvailableListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.eclipse.jetty.continuation.Continuation;
@@ -59,19 +57,20 @@ import org.eclipse.jetty.continuation.Co
  * <dt></dt>
  * <dd></dd>
  * </dl>
- * 
- * 
+ *
+ *
  */
+@SuppressWarnings("serial")
 public class MessageListenerServlet extends MessageServletSupport {
     private static final Logger LOG = LoggerFactory.getLogger(MessageListenerServlet.class);
-    
+
     private String readTimeoutParameter = "timeout";
     private long defaultReadTimeout = -1;
     private long maximumReadTimeout = 25000;
     private int maximumMessages = 100;
     private Timer clientCleanupTimer = new Timer();
     private HashMap<String,AjaxWebClient> ajaxWebClients = new HashMap<String,AjaxWebClient>();
-    
+
     public void init() throws ServletException {
         ServletConfig servletConfig = getServletConfig();
         String name = servletConfig.getInitParameter("defaultReadTimeout");
@@ -88,7 +87,7 @@ public class MessageListenerServlet exte
         }
         clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 );
     }
-    
+
     /**
      * Sends a message to a destination or manage subscriptions. If the the
      * content type of the POST is
@@ -102,7 +101,7 @@ public class MessageListenerServlet exte
      * the content type is not <code>application/x-www-form-urlencoded</code>,
      * then the body of the post is sent as the message to a destination that is
      * derived from a query parameter, the URL or the default destination.
-     * 
+     *
      * @param request
      * @param response
      * @throws ServletException
@@ -166,7 +165,7 @@ public class MessageListenerServlet exte
                         }
                     } else if ("unlisten".equals(type)) {
                         Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
-                        Map consumerDestinationNameMap = client.getDestinationNameMap();
+                        Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
                         MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
 
                         consumer.setAvailableListener(null);
@@ -246,7 +245,7 @@ public class MessageListenerServlet exte
 
     /**
      * Reads a message from a destination up to some specific timeout period
-     * 
+     *
      * @param client The webclient
      * @param request
      * @param response
@@ -262,7 +261,7 @@ public class MessageListenerServlet exte
         if (LOG.isDebugEnabled()) {
             LOG.debug("doMessage timeout=" + timeout);
         }
-        
+
         // this is non-null if we're resuming the continuation.
         // attributes set in AjaxListener
         UndeliveredAjaxMessage undelivered_message = null;
@@ -271,10 +270,10 @@ public class MessageListenerServlet exte
         if( undelivered_message != null ) {
             message = (Message)undelivered_message.getMessage();
         }
-        
+
         synchronized (client) {
 
-            List consumers = client.getConsumers();
+            List<MessageConsumer> consumers = client.getConsumers();
             MessageAvailableConsumer consumer = null;
             if( undelivered_message != null ) {
                 consumer = (MessageAvailableConsumer)undelivered_message.getConsumer();
@@ -287,7 +286,7 @@ public class MessageListenerServlet exte
                     if (consumer.getAvailableListener() == null) {
                         continue;
                     }
-    
+
                     // Look for any available messages
                     message = consumer.receive(10);
                     if (LOG.isDebugEnabled()) {
@@ -295,14 +294,14 @@ public class MessageListenerServlet exte
                     }
                 }
             }
-            
+
             // prepare the response
             response.setContentType("text/xml");
             response.setHeader("Cache-Control", "no-cache");
-            
+
             if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
                 Continuation continuation = ContinuationSupport.getContinuation(request);
-                
+
                 if (continuation.isExpired()) {
                     response.setStatus(HttpServletResponse.SC_OK);
                     StringWriter swriter = new StringWriter();
@@ -312,43 +311,43 @@ public class MessageListenerServlet exte
 
                     writer.flush();
                     String m = swriter.toString();
-                    response.getWriter().println(m); 
-                    
+                    response.getWriter().println(m);
+
                     return;
                 }
 
                 continuation.setTimeout(timeout);
                 continuation.suspend();
                 LOG.debug( "Suspending continuation " + continuation );
-                
+
                 // Fetch the listeners
                 AjaxListener listener = client.getListener();
                 listener.access();
 
                 // register this continuation with our listener.
                 listener.setContinuation(continuation);
-                
+
                 return;
             }
 
             StringWriter swriter = new StringWriter();
             PrintWriter writer = new PrintWriter(swriter);
-            
+
             Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
             Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
             response.setStatus(HttpServletResponse.SC_OK);
             writer.println("<ajax-response>");
-            
+
             // Send any message we already have
             if (message != null) {
                 String id = consumerIdMap.get(consumer);
                 String destinationName = consumerDestinationNameMap.get(consumer);
                 LOG.debug( "sending pre-existing message" );
                 writeMessageResponse(writer, message, id, destinationName);
-                
+
                 messages++;
             }
-            
+
             // send messages buffered while continuation was unavailable.
             LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
             LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
@@ -369,7 +368,7 @@ public class MessageListenerServlet exte
                     }
                 }
             }
-            
+
             // Send the rest of the messages
             for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
                 consumer = (MessageAvailableConsumer)consumers.get(i);
@@ -422,22 +421,21 @@ public class MessageListenerServlet exte
         }
         writer.println("</response>");
     }
-    
+
     /*
      * Return the AjaxWebClient for this session+clientId.
      * Create one if it does not already exist.
      */
     protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) {
-        long now = (new Date()).getTime();
         HttpSession session = request.getSession(true);
-        
-        String clientId = request.getParameter( "clientId" );      
+
+        String clientId = request.getParameter( "clientId" );
         // if user doesn't supply a 'clientId', we'll just use a default.
         if( clientId == null ) {
             clientId = "defaultAjaxWebClient";
         }
         String sessionKey = session.getId() + '-' + clientId;
-        
+
         AjaxWebClient client = ajaxWebClients.get( sessionKey );
         synchronized (ajaxWebClients) {
             // create a new AjaxWebClient if one does not already exist for this sessionKey.
@@ -469,7 +467,7 @@ public class MessageListenerServlet exte
         }
         return answer;
     }
-    
+
     /*
      * an instance of this class runs every minute (started in init), to clean up old web clients & free resources.
      */
@@ -478,11 +476,11 @@ public class MessageListenerServlet exte
             if( LOG.isDebugEnabled() ) {
                 LOG.debug( "Cleaning up expired web clients." );
             }
-            
+
             synchronized( ajaxWebClients ) {
-                Iterator it = ajaxWebClients.entrySet().iterator();
+                Iterator<Map.Entry<String, AjaxWebClient>> it = ajaxWebClients.entrySet().iterator();
                 while ( it.hasNext() ) {
-                    Map.Entry<String,AjaxWebClient> e = (Map.Entry<String,AjaxWebClient>)it.next();
+                    Map.Entry<String,AjaxWebClient> e = it.next();
                     String key = e.getKey();
                     AjaxWebClient val = e.getValue();
                     if ( LOG.isDebugEnabled() ) {
@@ -499,4 +497,10 @@ public class MessageListenerServlet exte
             }
         }
     }
+
+    public void destroy() {
+        // make sure we cancel the timer
+        clientCleanupTimer.cancel();
+        super.destroy();
+    }
 }