You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/10/07 17:22:31 UTC

svn commit: r1180070 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/transport/stomp/ activemq-optional/src/main/java/org/apache/activemq/transport/ws/

Author: tabish
Date: Fri Oct  7 15:22:31 2011
New Revision: 1180070

URL: http://svn.apache.org/viewvc?rev=1180070&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3481

Issues arises when the StompTransport is sending an Ack back for Auto Ack 
mode in the same thread as the message was dispatched in.  If an incoming 
command beats the auto ack to onCommand in the activity monitor it will block
as the original thread is now waiting on the incoming command for the transport
lock.  Need to send back the Auto Acks in their own thread using a Task Runner
in the Stomp Transport so that the dispatch thread can complete and release
its lock on the transport.

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
    activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java?rev=1180070&r1=1180069&r2=1180070&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/JmsFrameTranslator.java Fri Oct  7 15:22:31 2011
@@ -40,6 +40,7 @@ import com.thoughtworks.xstream.io.Hiera
 import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
 import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
 import com.thoughtworks.xstream.io.xml.XppReader;
+import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
 
 /**
  * Frame translator implementation that uses XStream to convert messages to and
@@ -67,7 +68,7 @@ public class JmsFrameTranslator extends 
                 String text = new String(command.getContent(), "UTF-8");
                 switch (Stomp.Transformations.getValue(transformation)) {
                 case JMS_OBJECT_XML:
-                    in = new XppReader(new StringReader(text));
+                    in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
                     msg = createObjectMessage(in);
                     break;
                 case JMS_OBJECT_JSON:
@@ -75,7 +76,7 @@ public class JmsFrameTranslator extends 
                     msg = createObjectMessage(in);
                     break;
                 case JMS_MAP_XML:
-                    in = new XppReader(new StringReader(text));
+                    in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
                     msg = createMapMessage(in);
                     break;
                 case JMS_MAP_JSON:

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1180070&r1=1180069&r2=1180070&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java Fri Oct  7 15:22:31 2011
@@ -75,7 +75,7 @@ public class StompSubscription {
             }
         } else if (ackMode == AUTO_ACK) {
             MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
-            protocolConverter.getStompTransport().sendToActiveMQ(ack);
+            protocolConverter.getStompTransport().asyncSendToActiveMQ(ack);
         }
 
         boolean ignoreTransformation = false;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java?rev=1180070&r1=1180069&r2=1180070&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java Fri Oct  7 15:22:31 2011
@@ -29,6 +29,8 @@ public interface StompTransport {
 
     public void sendToActiveMQ(Command command);
 
+    public void asyncSendToActiveMQ(Command command);
+
     public void sendToStomp(StompFrame command) throws IOException;
 
     public X509Certificate[] getPeerCertificates();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?rev=1180070&r1=1180069&r2=1180070&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java Fri Oct  7 15:22:31 2011
@@ -18,11 +18,15 @@ package org.apache.activemq.transport.st
 
 import java.io.IOException;
 import java.security.cert.X509Certificate;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import javax.jms.JMSException;
 
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.thread.DefaultThreadPools;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportListener;
@@ -46,21 +50,61 @@ public class StompTransportFilter extend
     private final ProtocolConverter protocolConverter;
     private StompInactivityMonitor monitor;
     private StompWireFormat wireFormat;
+    private final TaskRunner asyncSendTask;
+    private final ConcurrentLinkedQueue<Command> asyncCommands = new ConcurrentLinkedQueue<Command>();
 
     private boolean trace;
+    private int maxAsyncBatchSize = 25;
 
     public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
         super(next);
         this.protocolConverter = new ProtocolConverter(this, brokerContext);
 
+        asyncSendTask = DefaultThreadPools.getDefaultTaskRunnerFactory().createTaskRunner(new Task() {
+            public boolean iterate() {
+                int iterations = 0;
+                TransportListener listener = transportListener;
+                if (listener != null) {
+                    while (iterations++ < maxAsyncBatchSize && !asyncCommands.isEmpty()) {
+                        Command command = asyncCommands.poll();
+                        if (command != null) {
+                            listener.onCommand(command);
+                        }
+                    }
+                }
+                return !asyncCommands.isEmpty();
+            }
+
+        }, "ActiveMQ StompTransport Async Worker: " + System.identityHashCode(this));
+
         if (wireFormat instanceof StompWireFormat) {
             this.wireFormat = (StompWireFormat) wireFormat;
         }
     }
 
+    public void stop() throws Exception {
+        asyncSendTask.shutdown();
+
+        TransportListener listener = transportListener;
+        if (listener != null) {
+            Command commands[] = new Command[0];
+            asyncCommands.toArray(commands);
+            asyncCommands.clear();
+            for(Command command : commands) {
+                try {
+                    listener.onCommand(command);
+                } catch(Exception e) {
+                    break;
+                }
+            }
+        }
+
+        super.stop();
+    }
+
     public void oneway(Object o) throws IOException {
         try {
-            final Command command = (Command)o;
+            final Command command = (Command) o;
             protocolConverter.onActiveMQCommand(command);
         } catch (JMSException e) {
             throw IOExceptionSupport.create(e);
@@ -73,7 +117,7 @@ public class StompTransportFilter extend
                 TRACE.trace("Received: \n" + command);
             }
 
-            protocolConverter.onStompCommand((StompFrame)command);
+            protocolConverter.onStompCommand((StompFrame) command);
         } catch (IOException e) {
             onException(e);
         } catch (JMSException e) {
@@ -83,24 +127,33 @@ public class StompTransportFilter extend
 
     public void sendToActiveMQ(Command command) {
         TransportListener l = transportListener;
-        if (l!=null) {
+        if (l != null) {
             l.onCommand(command);
         }
     }
 
+    public void asyncSendToActiveMQ(Command command) {
+        asyncCommands.offer(command);
+        try {
+            asyncSendTask.wakeup();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
     public void sendToStomp(StompFrame command) throws IOException {
         if (trace) {
             TRACE.trace("Sending: \n" + command);
         }
         Transport n = next;
-        if (n!=null) {
+        if (n != null) {
             n.oneway(command);
         }
     }
 
     public X509Certificate[] getPeerCertificates() {
-        if(next instanceof SslTransport) {
-            X509Certificate[] peerCerts = ((SslTransport)next).getPeerCertificates();
+        if (next instanceof SslTransport) {
+            X509Certificate[] peerCerts = ((SslTransport) next).getPeerCertificates();
             if (trace && peerCerts != null) {
                 LOG.debug("Peer Identity has been verified\n");
             }
@@ -130,4 +183,12 @@ public class StompTransportFilter extend
     public StompWireFormat getWireFormat() {
         return this.wireFormat;
     }
+
+    public int getMaxAsyncBatchSize() {
+        return maxAsyncBatchSize;
+    }
+
+    public void setMaxAsyncBatchSize(int maxAsyncBatchSize) {
+        this.maxAsyncBatchSize = maxAsyncBatchSize;
+    }
 }

Modified: activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java?rev=1180070&r1=1180069&r2=1180070&view=diff
==============================================================================
--- activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java (original)
+++ activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/transport/ws/StompSocket.java Fri Oct  7 15:22:31 2011
@@ -101,4 +101,9 @@ class StompSocket extends TransportSuppo
     public StompWireFormat getWireFormat() {
         return this.wireFormat;
     }
+
+    @Override
+    public void asyncSendToActiveMQ(Command command) {
+        doConsume(command);
+    }
 }