You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/10/07 17:22:31 UTC
svn commit: r1180070 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/transport/stomp/
activemq-optional/src/main/java/org/apache/activemq/transport/ws/
Author: tabish
Date: Fri Oct 7 15:22:31 2011
New Revision: 1180070
URL: http://svn.apache.org/viewvc?rev=1180070&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3481
Issues arises when the StompTransport is sending an Ack back for Auto Ack
mode in the same thread as the message was dispatched in. If an incoming
command beats the auto ack to onCommand in the activity monitor it will block
as the original thread is now waiting on the incoming command for the transport
lock. Need to send back the Auto Acks in their own thread using a Task Runner
in the Stomp Transport so that the dispatch thread can complete and release
its lock on the transport.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java?rev=1180070&r1=1180069&r2=1180070&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java Fri Oct 7 15:22:31 2011
@@ -40,6 +40,7 @@ import com.thoughtworks.xstream.io.Hiera
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
import com.thoughtworks.xstream.io.xml.XppReader;
+import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
/**
* Frame translator implementation that uses XStream to convert messages to and
@@ -67,7 +68,7 @@ public class JmsFrameTranslator extends
String text = new String(command.getContent(), "UTF-8");
switch (Stomp.Transformations.getValue(transformation)) {
case JMS_OBJECT_XML:
- in = new XppReader(new StringReader(text));
+ in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
msg = createObjectMessage(in);
break;
case JMS_OBJECT_JSON:
@@ -75,7 +76,7 @@ public class JmsFrameTranslator extends
msg = createObjectMessage(in);
break;
case JMS_MAP_XML:
- in = new XppReader(new StringReader(text));
+ in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
msg = createMapMessage(in);
break;
case JMS_MAP_JSON:
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1180070&r1=1180069&r2=1180070&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java Fri Oct 7 15:22:31 2011
@@ -75,7 +75,7 @@ public class StompSubscription {
}
} else if (ackMode == AUTO_ACK) {
MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
- protocolConverter.getStompTransport().sendToActiveMQ(ack);
+ protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
}
boolean ignoreTransformation = false;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java?rev=1180070&r1=1180069&r2=1180070&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java Fri Oct 7 15:22:31 2011
@@ -29,6 +29,8 @@ public interface StompTransport {
public void sendToActiveMQ(Command command);
+ public void asyncSendToActiveMQ(Command command);
+
public void sendToStomp(StompFrame command) throws IOException;
public X509Certificate[] getPeerCertificates();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?rev=1180070&r1=1180069&r2=1180070&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java Fri Oct 7 15:22:31 2011
@@ -18,11 +18,15 @@ package org.apache.activemq.transport.st
import java.io.IOException;
import java.security.cert.X509Certificate;
+import java.util.concurrent.ConcurrentLinkedQueue;
import javax.jms.JMSException;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.command.Command;
+import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
@@ -46,21 +50,61 @@ public class StompTransportFilter extend
private final ProtocolConverter protocolConverter;
private StompInactivityMonitor monitor;
private StompWireFormat wireFormat;
+ private final TaskRunner asyncSendTask;
+ private final ConcurrentLinkedQueue<Command> asyncCommands = new ConcurrentLinkedQueue<Command>();
private boolean trace;
+ private int maxAsyncBatchSize = 25;
public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
super(next);
this.protocolConverter = new ProtocolConverter(this, brokerContext);
+ asyncSendTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
+ public boolean iterate() {
+ int iterations = 0;
+ TransportListener listener = transportListener;
+ if (listener != null) {
+ while (iterations++ < maxAsyncBatchSize && !asyncCommands.isEmpty()) {
+ Command command = asyncCommands.poll();
+ if (command != null) {
+ listener.onCommand(command);
+ }
+ }
+ }
+ return !asyncCommands.isEmpty();
+ }
+
+ }, "ActiveMQ StompTransport Async Worker: " + System.identityHashCode(this));
+
if (wireFormat instanceof StompWireFormat) {
this.wireFormat = (StompWireFormat) wireFormat;
}
}
+ public void stop() throws Exception {
+ asyncSendTask.shutdown();
+
+ TransportListener listener = transportListener;
+ if (listener != null) {
+ Command commands[] = new Command[0];
+ asyncCommands.toArray(commands);
+ asyncCommands.clear();
+ for(Command command : commands) {
+ try {
+ listener.onCommand(command);
+ } catch(Exception e) {
+ break;
+ }
+ }
+ }
+
+ super.stop();
+ }
+
public void oneway(Object o) throws IOException {
try {
- final Command command = (Command)o;
+ final Command command = (Command) o;
protocolConverter.onActiveMQCommand(command);
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
@@ -73,7 +117,7 @@ public class StompTransportFilter extend
TRACE.trace("Received: \n" + command);
}
- protocolConverter.onStompCommand((StompFrame)command);
+ protocolConverter.onStompCommand((StompFrame) command);
} catch (IOException e) {
onException(e);
} catch (JMSException e) {
@@ -83,24 +127,33 @@ public class StompTransportFilter extend
public void sendToActiveMQ(Command command) {
TransportListener l = transportListener;
- if (l!=null) {
+ if (l != null) {
l.onCommand(command);
}
}
+ public void asyncSendToActiveMQ(Command command) {
+ asyncCommands.offer(command);
+ try {
+ asyncSendTask.wakeup();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
public void sendToStomp(StompFrame command) throws IOException {
if (trace) {
TRACE.trace("Sending: \n" + command);
}
Transport n = next;
- if (n!=null) {
+ if (n != null) {
n.oneway(command);
}
}
public X509Certificate[] getPeerCertificates() {
- if(next instanceof SslTransport) {
- X509Certificate[] peerCerts = ((SslTransport)next).getPeerCertificates();
+ if (next instanceof SslTransport) {
+ X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
if (trace && peerCerts != null) {
LOG.debug("Peer Identity has been verified\n");
}
@@ -130,4 +183,12 @@ public class StompTransportFilter extend
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
+
+ public int getMaxAsyncBatchSize() {
+ return maxAsyncBatchSize;
+ }
+
+ public void setMaxAsyncBatchSize(int maxAsyncBatchSize) {
+ this.maxAsyncBatchSize = maxAsyncBatchSize;
+ }
}
Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java?rev=1180070&r1=1180069&r2=1180070&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java (original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java Fri Oct 7 15:22:31 2011
@@ -101,4 +101,9 @@ class StompSocket extends TransportSuppo
public StompWireFormat getWireFormat() {
return this.wireFormat;
}
+
+ @Override
+ public void asyncSendToActiveMQ(Command command) {
+ doConsume(command);
+ }
}