You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/09/28 21:36:02 UTC
svn commit: r1177026 -
/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
Author: tabish
Date: Wed Sep 28 19:36:02 2011
New Revision: 1177026
URL: http://svn.apache.org/viewvc?rev=1177026&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3514
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1177026&r1=1177025&r2=1177026&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java Wed Sep 28 19:36:02 2011
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.activemq.web;
import java.io.IOException;
@@ -35,7 +34,6 @@ import javax.servlet.http.HttpServletRes
import javax.servlet.http.HttpSession;
import org.apache.activemq.MessageAvailableConsumer;
-import org.apache.activemq.MessageAvailableListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.continuation.Continuation;
@@ -59,19 +57,20 @@ import org.eclipse.jetty.continuation.Co
* <dt></dt>
* <dd></dd>
* </dl>
- *
- *
+ *
+ *
*/
+@SuppressWarnings("serial")
public class MessageListenerServlet extends MessageServletSupport {
private static final Logger LOG = LoggerFactory.getLogger(MessageListenerServlet.class);
-
+
private String readTimeoutParameter = "timeout";
private long defaultReadTimeout = -1;
private long maximumReadTimeout = 25000;
private int maximumMessages = 100;
private Timer clientCleanupTimer = new Timer();
private HashMap<String,AjaxWebClient> ajaxWebClients = new HashMap<String,AjaxWebClient>();
-
+
public void init() throws ServletException {
ServletConfig servletConfig = getServletConfig();
String name = servletConfig.getInitParameter("defaultReadTimeout");
@@ -88,7 +87,7 @@ public class MessageListenerServlet exte
}
clientCleanupTimer.schedule( new ClientCleaner(), 5000, 60000 );
}
-
+
/**
* Sends a message to a destination or manage subscriptions. If the the
* content type of the POST is
@@ -102,7 +101,7 @@ public class MessageListenerServlet exte
* the content type is not <code>application/x-www-form-urlencoded</code>,
* then the body of the post is sent as the message to a destination that is
* derived from a query parameter, the URL or the default destination.
- *
+ *
* @param request
* @param response
* @throws ServletException
@@ -166,7 +165,7 @@ public class MessageListenerServlet exte
}
} else if ("unlisten".equals(type)) {
Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
- Map consumerDestinationNameMap = client.getDestinationNameMap();
+ Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
MessageAvailableConsumer consumer = (MessageAvailableConsumer)client.getConsumer(destination, request.getHeader(WebClient.selectorName));
consumer.setAvailableListener(null);
@@ -246,7 +245,7 @@ public class MessageListenerServlet exte
/**
* Reads a message from a destination up to some specific timeout period
- *
+ *
* @param client The webclient
* @param request
* @param response
@@ -262,7 +261,7 @@ public class MessageListenerServlet exte
if (LOG.isDebugEnabled()) {
LOG.debug("doMessage timeout=" + timeout);
}
-
+
// this is non-null if we're resuming the continuation.
// attributes set in AjaxListener
UndeliveredAjaxMessage undelivered_message = null;
@@ -271,10 +270,10 @@ public class MessageListenerServlet exte
if( undelivered_message != null ) {
message = (Message)undelivered_message.getMessage();
}
-
+
synchronized (client) {
- List consumers = client.getConsumers();
+ List<MessageConsumer> consumers = client.getConsumers();
MessageAvailableConsumer consumer = null;
if( undelivered_message != null ) {
consumer = (MessageAvailableConsumer)undelivered_message.getConsumer();
@@ -287,7 +286,7 @@ public class MessageListenerServlet exte
if (consumer.getAvailableListener() == null) {
continue;
}
-
+
// Look for any available messages
message = consumer.receive(10);
if (LOG.isDebugEnabled()) {
@@ -295,14 +294,14 @@ public class MessageListenerServlet exte
}
}
}
-
+
// prepare the response
response.setContentType("text/xml");
response.setHeader("Cache-Control", "no-cache");
-
+
if (message == null && client.getListener().getUndeliveredMessages().size() == 0) {
Continuation continuation = ContinuationSupport.getContinuation(request);
-
+
if (continuation.isExpired()) {
response.setStatus(HttpServletResponse.SC_OK);
StringWriter swriter = new StringWriter();
@@ -312,43 +311,43 @@ public class MessageListenerServlet exte
writer.flush();
String m = swriter.toString();
- response.getWriter().println(m);
-
+ response.getWriter().println(m);
+
return;
}
continuation.setTimeout(timeout);
continuation.suspend();
LOG.debug( "Suspending continuation " + continuation );
-
+
// Fetch the listeners
AjaxListener listener = client.getListener();
listener.access();
// register this continuation with our listener.
listener.setContinuation(continuation);
-
+
return;
}
StringWriter swriter = new StringWriter();
PrintWriter writer = new PrintWriter(swriter);
-
+
Map<MessageAvailableConsumer, String> consumerIdMap = client.getIdMap();
Map<MessageAvailableConsumer, String> consumerDestinationNameMap = client.getDestinationNameMap();
response.setStatus(HttpServletResponse.SC_OK);
writer.println("<ajax-response>");
-
+
// Send any message we already have
if (message != null) {
String id = consumerIdMap.get(consumer);
String destinationName = consumerDestinationNameMap.get(consumer);
LOG.debug( "sending pre-existing message" );
writeMessageResponse(writer, message, id, destinationName);
-
+
messages++;
}
-
+
// send messages buffered while continuation was unavailable.
LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
@@ -369,7 +368,7 @@ public class MessageListenerServlet exte
}
}
}
-
+
// Send the rest of the messages
for (int i = 0; i < consumers.size() && messages < maximumMessages; i++) {
consumer = (MessageAvailableConsumer)consumers.get(i);
@@ -422,22 +421,21 @@ public class MessageListenerServlet exte
}
writer.println("</response>");
}
-
+
/*
* Return the AjaxWebClient for this session+clientId.
* Create one if it does not already exist.
*/
protected AjaxWebClient getAjaxWebClient( HttpServletRequest request ) {
- long now = (new Date()).getTime();
HttpSession session = request.getSession(true);
-
- String clientId = request.getParameter( "clientId" );
+
+ String clientId = request.getParameter( "clientId" );
// if user doesn't supply a 'clientId', we'll just use a default.
if( clientId == null ) {
clientId = "defaultAjaxWebClient";
}
String sessionKey = session.getId() + '-' + clientId;
-
+
AjaxWebClient client = ajaxWebClients.get( sessionKey );
synchronized (ajaxWebClients) {
// create a new AjaxWebClient if one does not already exist for this sessionKey.
@@ -469,7 +467,7 @@ public class MessageListenerServlet exte
}
return answer;
}
-
+
/*
* an instance of this class runs every minute (started in init), to clean up old web clients & free resources.
*/
@@ -478,11 +476,11 @@ public class MessageListenerServlet exte
if( LOG.isDebugEnabled() ) {
LOG.debug( "Cleaning up expired web clients." );
}
-
+
synchronized( ajaxWebClients ) {
- Iterator it = ajaxWebClients.entrySet().iterator();
+ Iterator<Map.Entry<String, AjaxWebClient>> it = ajaxWebClients.entrySet().iterator();
while ( it.hasNext() ) {
- Map.Entry<String,AjaxWebClient> e = (Map.Entry<String,AjaxWebClient>)it.next();
+ Map.Entry<String,AjaxWebClient> e = it.next();
String key = e.getKey();
AjaxWebClient val = e.getValue();
if ( LOG.isDebugEnabled() ) {
@@ -499,4 +497,10 @@ public class MessageListenerServlet exte
}
}
}
+
+ public void destroy() {
+ // make sure we cancel the timer
+ clientCleanupTimer.cancel();
+ super.destroy();
+ }
}