You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/23 21:13:20 UTC

svn commit: r1401394 - in /activemq/trunk: activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ activemq-core/src/main/java/org/apache/activemq/transport/nio/ activemq-core/src/main/java/org/apache/activemq/transport/stomp/ activemq-core/src...

Author: tabish
Date: Tue Oct 23 19:13:19 2012
New Revision: 1401394

URL: http://svn.apache.org/viewvc?rev=1401394&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-4106

NIO based transports weren't updating the receive counter in the TcpTransport which can lead to the inactivity monitor mistakenly shutting down the connection.  

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java Tue Oct 23 19:13:19 2012
@@ -16,16 +16,6 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import org.apache.activemq.transport.nio.NIOOutputStream;
-import org.apache.activemq.transport.nio.SelectorManager;
-import org.apache.activemq.transport.nio.SelectorSelection;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.hawtbuf.DataByteArrayInputStream;
-
-import javax.net.SocketFactory;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -36,6 +26,16 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.nio.NIOOutputStream;
+import org.apache.activemq.transport.nio.SelectorManager;
+import org.apache.activemq.transport.nio.SelectorSelection;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+
 /**
  * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
  */
@@ -97,11 +97,12 @@ public class AmqpNioTransport extends Tc
                     break;
                 }
 
+                receiveCounter += readSize;
+
                 inputBuffer.flip();
                 doConsume(AmqpSupport.toBuffer(inputBuffer));
                 // clear the buffer
                 inputBuffer.clear();
-
             }
         } catch (IOException e) {
             onException(e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java Tue Oct 23 19:13:19 2012
@@ -178,8 +178,9 @@ public class NIOSSLTransport extends NIO
                     }
                     int readCount = secureRead(plain);
 
-                    if (readCount == 0)
+                    if (readCount == 0) {
                         break;
+                    }
 
                     // channel is closed, cleanup
                     if (readCount == -1) {
@@ -187,6 +188,8 @@ public class NIOSSLTransport extends NIO
                         selection.close();
                         break;
                     }
+
+                    receiveCounter += readCount;
                 }
 
                 if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java Tue Oct 23 19:13:19 2012
@@ -39,6 +39,7 @@ public class StompCodec {
     int contentLength = -1;
     int readLength = 0;
     int previousByte = -1;
+    boolean awaitingCommandStart = true;
     String version = Stomp.DEFAULT_VERSION;
 
     public StompCodec(TcpTransport transport) {
@@ -56,6 +57,14 @@ public class StompCodec {
            }
 
            if (!processedHeaders) {
+
+               // skip heart beat commands.
+               if (awaitingCommandStart && b == '\n') {
+                   continue;
+               } else {
+                   awaitingCommandStart = false;   // non-newline indicates next frame.
+               }
+
                currentCommand.write(b);
                // end of headers section, parse action and header
                if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) {
@@ -74,6 +83,7 @@ public class StompCodec {
                    processedHeaders = true;
                    currentCommand.reset();
                }
+
            } else {
 
                if (contentLength == -1) {
@@ -102,6 +112,7 @@ public class StompCodec {
         StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
         transport.doConsume(frame);
         processedHeaders = false;
+        awaitingCommandStart = true;
         currentCommand.reset();
         contentLength = -1;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Tue Oct 23 19:13:19 2012
@@ -97,11 +97,14 @@ public class StompNIOTransport extends T
                    selection.close();
                    break;
                }
+
                // nothing more to read, break
                if (readSize == 0) {
                    break;
                }
 
+               receiveCounter += readSize;
+
                inputBuffer.flip();
 
                ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
@@ -109,7 +112,6 @@ public class StompNIOTransport extends T
 
                // clear the buffer
                inputBuffer.clear();
-
            }
         } catch (IOException e) {
             onException(e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Tue Oct 23 19:13:19 2012
@@ -128,13 +128,13 @@ public class TcpTransport extends Transp
     protected int minmumWireFormatVersion;
     protected SocketFactory socketFactory;
     protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
+    protected volatile int receiveCounter;
 
     private Map<String, Object> socketOptions;
     private int soLinger = Integer.MIN_VALUE;
     private Boolean keepAlive;
     private Boolean tcpNoDelay;
     private Thread runnerThread;
-    private volatile int receiveCounter;
 
     /**
      * Connect to a remote Node - e.g. a Broker

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java Tue Oct 23 19:13:19 2012
@@ -23,6 +23,8 @@ import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
@@ -290,6 +292,55 @@ public class Stomp11Test extends Combina
         assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000);
     }
 
+    public void testHeartbeatsKeepsConnectionOpen() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:2000,0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(connectFrame);
+        String f = stompConnection.receiveFrame();
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.1") >= 0);
+        assertTrue(f.indexOf("heart-beat:") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+        LOG.debug("Broker sent: " + f);
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+        stompConnection.sendFrame(message);
+
+        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+
+        service.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Sending next KeepAlive");
+                    stompConnection.keepAlive();
+                } catch (Exception e) {
+                }
+            }
+        }, 1, 1, TimeUnit.SECONDS);
+
+        TimeUnit.SECONDS.sleep(20);
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame stompFrame = stompConnection.receive();
+        assertTrue(stompFrame.getAction().equals("MESSAGE"));
+
+        service.shutdownNow();
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
     public void testSendAfterMissingHeartbeat() throws Exception {
 
         String connectFrame = "STOMP\n" + "login:system\n" +