You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/23 21:13:20 UTC
svn commit: r1401394 - in /activemq/trunk:
activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/
activemq-core/src/main/java/org/apache/activemq/transport/nio/
activemq-core/src/main/java/org/apache/activemq/transport/stomp/
activemq-core/src...
Author: tabish
Date: Tue Oct 23 19:13:19 2012
New Revision: 1401394
URL: http://svn.apache.org/viewvc?rev=1401394&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-4106
NIO based transports weren't updating the receive counter in the TcpTransport which can lead to the inactivity monitor mistakenly shutting down the connection.
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java Tue Oct 23 19:13:19 2012
@@ -16,16 +16,6 @@
*/
package org.apache.activemq.transport.amqp;
-import org.apache.activemq.transport.nio.NIOOutputStream;
-import org.apache.activemq.transport.nio.SelectorManager;
-import org.apache.activemq.transport.nio.SelectorSelection;
-import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.hawtbuf.DataByteArrayInputStream;
-
-import javax.net.SocketFactory;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
@@ -36,6 +26,16 @@ import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import javax.net.SocketFactory;
+
+import org.apache.activemq.transport.nio.NIOOutputStream;
+import org.apache.activemq.transport.nio.SelectorManager;
+import org.apache.activemq.transport.nio.SelectorSelection;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+
/**
* An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO
*/
@@ -97,11 +97,12 @@ public class AmqpNioTransport extends Tc
break;
}
+ receiveCounter += readSize;
+
inputBuffer.flip();
doConsume(AmqpSupport.toBuffer(inputBuffer));
// clear the buffer
inputBuffer.clear();
-
}
} catch (IOException e) {
onException(e);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java Tue Oct 23 19:13:19 2012
@@ -178,8 +178,9 @@ public class NIOSSLTransport extends NIO
}
int readCount = secureRead(plain);
- if (readCount == 0)
+ if (readCount == 0) {
break;
+ }
// channel is closed, cleanup
if (readCount == -1) {
@@ -187,6 +188,8 @@ public class NIOSSLTransport extends NIO
selection.close();
break;
}
+
+ receiveCounter += readCount;
}
if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java Tue Oct 23 19:13:19 2012
@@ -39,6 +39,7 @@ public class StompCodec {
int contentLength = -1;
int readLength = 0;
int previousByte = -1;
+ boolean awaitingCommandStart = true;
String version = Stomp.DEFAULT_VERSION;
public StompCodec(TcpTransport transport) {
@@ -56,6 +57,14 @@ public class StompCodec {
}
if (!processedHeaders) {
+
+ // skip heart beat commands.
+ if (awaitingCommandStart && b == '\n') {
+ continue;
+ } else {
+ awaitingCommandStart = false; // non-newline indicates next frame.
+ }
+
currentCommand.write(b);
// end of headers section, parse action and header
if (b == '\n' && (previousByte == '\n' || currentCommand.endsWith(crlfcrlf))) {
@@ -74,6 +83,7 @@ public class StompCodec {
processedHeaders = true;
currentCommand.reset();
}
+
} else {
if (contentLength == -1) {
@@ -102,6 +112,7 @@ public class StompCodec {
StompFrame frame = new StompFrame(action, headers, currentCommand.toByteArray());
transport.doConsume(frame);
processedHeaders = false;
+ awaitingCommandStart = true;
currentCommand.reset();
contentLength = -1;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Tue Oct 23 19:13:19 2012
@@ -97,11 +97,14 @@ public class StompNIOTransport extends T
selection.close();
break;
}
+
// nothing more to read, break
if (readSize == 0) {
break;
}
+ receiveCounter += readSize;
+
inputBuffer.flip();
ByteArrayInputStream input = new ByteArrayInputStream(inputBuffer.array());
@@ -109,7 +112,6 @@ public class StompNIOTransport extends T
// clear the buffer
inputBuffer.clear();
-
}
} catch (IOException e) {
onException(e);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Tue Oct 23 19:13:19 2012
@@ -128,13 +128,13 @@ public class TcpTransport extends Transp
protected int minmumWireFormatVersion;
protected SocketFactory socketFactory;
protected final AtomicReference<CountDownLatch> stoppedLatch = new AtomicReference<CountDownLatch>();
+ protected volatile int receiveCounter;
private Map<String, Object> socketOptions;
private int soLinger = Integer.MIN_VALUE;
private Boolean keepAlive;
private Boolean tcpNoDelay;
private Thread runnerThread;
- private volatile int receiveCounter;
/**
* Connect to a remote Node - e.g. a Broker
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java?rev=1401394&r1=1401393&r2=1401394&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java Tue Oct 23 19:13:19 2012
@@ -23,6 +23,8 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
@@ -290,6 +292,55 @@ public class Stomp11Test extends Combina
assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000);
}
+ public void testHeartbeatsKeepsConnectionOpen() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:2000,0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(connectFrame);
+ String f = stompConnection.receiveFrame();
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.1") >= 0);
+ assertTrue(f.indexOf("heart-beat:") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+ LOG.debug("Broker sent: " + f);
+
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ stompConnection.sendFrame(message);
+
+ ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+
+ service.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Sending next KeepAlive");
+ stompConnection.keepAlive();
+ } catch (Exception e) {
+ }
+ }
+ }, 1, 1, TimeUnit.SECONDS);
+
+ TimeUnit.SECONDS.sleep(20);
+
+ String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame stompFrame = stompConnection.receive();
+ assertTrue(stompFrame.getAction().equals("MESSAGE"));
+
+ service.shutdownNow();
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
public void testSendAfterMissingHeartbeat() throws Exception {
String connectFrame = "STOMP\n" + "login:system\n" +