You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/06/19 18:31:36 UTC
svn commit: r1494681 - in
/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web:
AjaxListener.java MessageListenerServlet.java
Author: tabish
Date: Wed Jun 19 16:31:36 2013
New Revision: 1494681
URL: http://svn.apache.org/r1494681
Log:
https://issues.apache.org/jira/browse/AMQ-4589
Apply patch to work around known race condition.
Modified:
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java?rev=1494681&r1=1494680&r2=1494681&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/AjaxListener.java Wed Jun 19 16:31:36 2013
@@ -16,30 +16,27 @@
*/
package org.apache.activemq.web;
+import java.util.LinkedList;
+
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import org.apache.activemq.MessageAvailableListener;
import org.eclipse.jetty.continuation.Continuation;
-import org.eclipse.jetty.continuation.ContinuationSupport;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.activemq.MessageAvailableListener;
-
-import java.util.LinkedList;
-
/*
* Listen for available messages and wakeup any continuations.
*/
public class AjaxListener implements MessageAvailableListener {
private static final Logger LOG = LoggerFactory.getLogger(AjaxListener.class);
-
- private long maximumReadTimeout;
- private AjaxWebClient client;
+
+ private final long maximumReadTimeout;
+ private final AjaxWebClient client;
private long lastAccess;
private Continuation continuation;
- private LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
+ private final LinkedList<UndeliveredAjaxMessage> undeliveredMessages = new LinkedList<UndeliveredAjaxMessage>();
AjaxListener(AjaxWebClient client, long maximumReadTimeout) {
this.client = client;
@@ -58,7 +55,8 @@ public class AjaxListener implements Mes
public LinkedList<UndeliveredAjaxMessage> getUndeliveredMessages() {
return undeliveredMessages;
}
-
+
+ @Override
public synchronized void onMessageAvailable(MessageConsumer consumer) {
if (LOG.isDebugEnabled()) {
LOG.debug("message for " + consumer + " continuation=" + continuation);
@@ -66,23 +64,24 @@ public class AjaxListener implements Mes
if (continuation != null) {
try {
Message message = consumer.receive(10);
- LOG.debug( "message is " + message );
- if( message != null ) {
- if( continuation.isSuspended() ) {
- LOG.debug( "Resuming suspended continuation " + continuation );
- continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage( message, consumer ) );
+ LOG.debug("message is " + message);
+ if (message != null) {
+ if (!continuation.isResumed()) {
+ LOG.debug("Resuming suspended continuation " + continuation);
+ continuation.setAttribute("undelivered_message", new UndeliveredAjaxMessage(message, consumer));
continuation.resume();
} else {
- LOG.debug( "Message available, but continuation is already resumed. Buffer for next time." );
- bufferMessageForDelivery( message, consumer );
+ LOG.debug("Message available, but continuation is already resumed. Buffer for next time.");
+ bufferMessageForDelivery(message, consumer);
}
}
} catch (Exception e) {
LOG.error("Error receiving message " + e, e);
}
-
+
} else if (System.currentTimeMillis() - lastAccess > 2 * this.maximumReadTimeout) {
new Thread() {
+ @Override
public void run() {
client.closeConsumers();
};
@@ -90,17 +89,17 @@ public class AjaxListener implements Mes
} else {
try {
Message message = consumer.receive(10);
- bufferMessageForDelivery( message, consumer );
+ bufferMessageForDelivery(message, consumer);
} catch (Exception e) {
LOG.error("Error receiving message " + e, e);
}
}
}
-
- public void bufferMessageForDelivery( Message message, MessageConsumer consumer ) {
- if( message != null ) {
- synchronized( undeliveredMessages ) {
- undeliveredMessages.addLast( new UndeliveredAjaxMessage( message, consumer ) );
+
+ public void bufferMessageForDelivery(Message message, MessageConsumer consumer) {
+ if (message != null) {
+ synchronized (undeliveredMessages) {
+ undeliveredMessages.addLast(new UndeliveredAjaxMessage(message, consumer));
}
}
}
Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java?rev=1494681&r1=1494680&r2=1494681&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/MessageListenerServlet.java Wed Jun 19 16:31:36 2013
@@ -313,20 +313,20 @@ public class MessageListenerServlet exte
Continuation continuation = ContinuationSupport.getContinuation(request);
// Add a listener to the continuation to make sure it actually
- // will expire (seems like a bug in Jetty Servlet 3 continuations,
+ // will expire (seems like a bug in Jetty Servlet 3 continuations,
// see https://issues.apache.org/jira/browse/AMQ-3447
continuation.addContinuationListener(new ContinuationListener() {
@Override
public void onTimeout(Continuation cont) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Continuation " + cont.toString() + " expired.");
+ LOG.debug("Continuation " + cont.toString() + " expired.");
}
}
@Override
public void onComplete(Continuation cont) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Continuation " + cont.toString() + " completed.");
+ LOG.debug("Continuation " + cont.toString() + " completed.");
}
}
});
@@ -381,7 +381,7 @@ public class MessageListenerServlet exte
LinkedList<UndeliveredAjaxMessage> undeliveredMessages = ((AjaxListener)consumer.getAvailableListener()).getUndeliveredMessages();
LOG.debug("Send " + undeliveredMessages.size() + " unconsumed messages");
synchronized( undeliveredMessages ) {
- for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext(); ) {
+ for (Iterator<UndeliveredAjaxMessage> it = undeliveredMessages.iterator(); it.hasNext();) {
messages++;
UndeliveredAjaxMessage undelivered = it.next();
Message msg = undelivered.getMessage();
@@ -425,7 +425,6 @@ public class MessageListenerServlet exte
String m = swriter.toString();
response.getWriter().println(m);
}
-
}
protected void writeMessageResponse(PrintWriter writer, Message message, String id, String destinationName) throws JMSException, IOException {