You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/18 17:20:34 UTC
svn commit: r945692 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/
main/java/org/apache/activemq/transport/nio/
main/java/org/apache/activemq/transport/stomp/
main/java/org/apache/activemq/transport/tcp/ test/java/org/...
Author: gtully
Date: Tue May 18 15:20:33 2010
New Revision: 945692
URL: http://svn.apache.org/viewvc?rev=945692&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2737 - have nio work with soWritetTimeout filter
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Tue May 18 15:20:33 2010
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.Condit
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
+import org.apache.activemq.transport.tcp.TimeStampStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -97,8 +98,8 @@ public class WriteTimeoutFilter extends
}
- protected TcpBufferedOutputStream getWriter() {
- return next.narrow(TcpBufferedOutputStream.class);
+ protected TimeStampStream getWriter() {
+ return next.narrow(TimeStampStream.class);
}
protected Socket getSocket() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Tue May 18 15:20:33 2010
@@ -23,13 +23,15 @@ import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
+import org.apache.activemq.transport.tcp.TimeStampStream;
+
/**
* An optimized buffered outputstream for Tcp
*
* @version $Revision: 1.1.1.1 $
*/
-public class NIOOutputStream extends OutputStream {
+public class NIOOutputStream extends OutputStream implements TimeStampStream {
private static final int BUFFER_SIZE = 8192;
@@ -39,6 +41,7 @@ public class NIOOutputStream extends Out
private int count;
private boolean closed;
+ private volatile long writeTimestamp = -1;//concurrent reads of this value
/**
* Constructor
@@ -149,31 +152,51 @@ public class NIOOutputStream extends Out
int remaining = data.remaining();
int lastRemaining = remaining - 1;
long delay = 1;
- while (remaining > 0) {
-
- // We may need to do a little bit of sleeping to avoid a busy loop.
- // Slow down if no data was written out..
- if (remaining == lastRemaining) {
- try {
- // Use exponential rollback to increase sleep time.
- Thread.sleep(delay);
- delay *= 2;
- if (delay > 1000) {
- delay = 1000;
+ try {
+ writeTimestamp = System.currentTimeMillis();
+ while (remaining > 0) {
+
+ // We may need to do a little bit of sleeping to avoid a busy loop.
+ // Slow down if no data was written out..
+ if (remaining == lastRemaining) {
+ try {
+ // Use exponential rollback to increase sleep time.
+ Thread.sleep(delay);
+ delay *= 2;
+ if (delay > 1000) {
+ delay = 1000;
+ }
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
}
- } catch (InterruptedException e) {
- throw new InterruptedIOException();
+ } else {
+ delay = 1;
}
- } else {
- delay = 1;
- }
- lastRemaining = remaining;
+ lastRemaining = remaining;
- // Since the write is non-blocking, all the data may not have been
- // written.
- out.write(data);
- remaining = data.remaining();
+ // Since the write is non-blocking, all the data may not have been
+ // written.
+ out.write(data);
+ remaining = data.remaining();
+ }
+ } finally {
+ writeTimestamp = -1;
}
}
+
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
+ */
+ public boolean isWriting() {
+ return writeTimestamp > 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
+ */
+ public long getWriteTimestamp() {
+ return writeTimestamp;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Tue May 18 15:20:33 2010
@@ -83,8 +83,9 @@ public class NIOTransport extends TcpTra
currentBuffer = inputBuffer;
nextFrameSize = -1;
currentBuffer.limit(4);
- this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024));
-
+ NIOOutputStream outPutStream = new NIOOutputStream(channel, 16 * 1024);
+ this.dataOut = new DataOutputStream(outPutStream);
+ this.buffOut = outPutStream;
}
private void serviceRead() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Tue May 18 15:20:33 2010
@@ -82,19 +82,26 @@ public class StompConnection {
throw new IOException("socket closed.");
} else if (c == 0) {
c = is.read();
- if (c != '\n') {
- throw new IOException("Expecting stomp frame to terminate with \0\n");
+ if (c == '\n') {
+ // end of frame
+ return stringFromBuffer(inputBuffer);
+ } else {
+ inputBuffer.write(0);
+ inputBuffer.write(c);
}
- byte[] ba = inputBuffer.toByteArray();
- inputBuffer.reset();
- return new String(ba, "UTF-8");
} else {
inputBuffer.write(c);
}
}
}
- public Socket getStompSocket() {
+ private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
+ byte[] ba = inputBuffer.toByteArray();
+ inputBuffer.reset();
+ return new String(ba, "UTF-8");
+ }
+
+ public Socket getStompSocket() {
return stompSocket;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Tue May 18 15:20:33 2010
@@ -83,7 +83,9 @@ public class StompNIOTransport extends T
});
inputBuffer = ByteBuffer.allocate(8 * 1024);
- this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024));
+ NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
+ this.dataOut = new DataOutputStream(outPutStream);
+ this.buffOut = outPutStream;
}
private void serviceRead() {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java Tue May 18 15:20:33 2010
@@ -27,7 +27,7 @@ import java.io.OutputStream;
* @version $Revision: 1.1.1.1 $
*/
-public class TcpBufferedOutputStream extends FilterOutputStream {
+public class TcpBufferedOutputStream extends FilterOutputStream implements TimeStampStream {
private static final int BUFFER_SIZE = 8192;
private byte[] buffer;
private int bufferlen;
@@ -129,10 +129,16 @@ public class TcpBufferedOutputStream ext
super.close();
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
+ */
public boolean isWriting() {
return writeTimestamp > 0;
}
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
+ */
public long getWriteTimestamp() {
return writeTimestamp;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Tue May 18 15:20:33 2010
@@ -67,7 +67,7 @@ public class TcpTransport extends Transp
protected Socket socket;
protected DataOutputStream dataOut;
protected DataInputStream dataIn;
- protected TcpBufferedOutputStream buffOut = null;
+ protected TimeStampStream buffOut = null;
/**
* The Traffic Class to be set on the socket.
*/
@@ -576,8 +576,9 @@ public class TcpTransport extends Transp
}
};
this.dataIn = new DataInputStream(buffIn);
- buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
- this.dataOut = new DataOutputStream(buffOut);
+ TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+ this.dataOut = new DataOutputStream(outputStream);
+ this.buffOut = outputStream;
}
protected void closeStreams() throws IOException {
@@ -604,7 +605,7 @@ public class TcpTransport extends Transp
public <T> T narrow(Class<T> target) {
if (target == Socket.class) {
return target.cast(socket);
- } else if ( target == TcpBufferedOutputStream.class) {
+ } else if ( target == TimeStampStream.class) {
return target.cast(buffOut);
}
return super.narrow(target);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java Tue May 18 15:20:33 2010
@@ -47,6 +47,7 @@ public class JmsTestSupport extends Comb
static final private AtomicLong TEST_COUNTER = new AtomicLong();
public String userName;
public String password;
+ public String messageTextPrefix = "";
protected ConnectionFactory factory;
protected ActiveMQConnection connection;
@@ -96,7 +97,7 @@ public class JmsTestSupport extends Comb
protected void sendMessages(Session session, Destination destination, int count) throws JMSException {
MessageProducer producer = session.createProducer(destination);
for (int i = 0; i < count; i++) {
- producer.send(session.createTextMessage("" + i));
+ producer.send(session.createTextMessage(messageTextPrefix + i));
}
producer.close();
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Tue May 18 15:20:33 2010
@@ -20,6 +20,7 @@ package org.apache.activemq.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
@@ -38,10 +39,11 @@ public class SocketProxy {
private static final transient Log LOG = LogFactory.getLog(SocketProxy.class);
- public static final int ACCEPT_TIMEOUT_MILLIS = 1000;
+ public static final int ACCEPT_TIMEOUT_MILLIS = 100;
private URI proxyUrl;
private URI target;
+
private Acceptor acceptor;
private ServerSocket serverSocket;
@@ -49,6 +51,11 @@ public class SocketProxy {
private int listenPort = 0;
+ private int receiveBufferSize = -1;
+
+ public SocketProxy() throws Exception {
+ }
+
public SocketProxy(URI uri) throws Exception {
this(0, uri);
}
@@ -59,12 +66,24 @@ public class SocketProxy {
open();
}
- protected void open() throws Exception {
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ public void setTarget(URI tcpBrokerUri) {
+ target = tcpBrokerUri;
+ }
+
+ public void open() throws Exception {
+ serverSocket = new ServerSocket();
+ if (receiveBufferSize > 0) {
+ serverSocket.setReceiveBufferSize(receiveBufferSize);
+ }
if (proxyUrl == null) {
- serverSocket = new ServerSocket(listenPort);
+ serverSocket.bind(new InetSocketAddress(listenPort));
proxyUrl = urlFromSocket(target, serverSocket);
} else {
- serverSocket = new ServerSocket(proxyUrl.getPort());
+ serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
}
acceptor = new Acceptor(serverSocket, target);
new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
@@ -151,9 +170,13 @@ public class SocketProxy {
public Connection(Socket socket, URI target) throws Exception {
receiveSocket = socket;
- sendSocket = new Socket(target.getHost(), target.getPort());
+ sendSocket = new Socket();
+ if (receiveBufferSize > 0) {
+ sendSocket.setReceiveBufferSize(receiveBufferSize);
+ }
+ sendSocket.connect(new InetSocketAddress(target.getHost(), target.getPort()));
linkWithThreads(receiveSocket, sendSocket);
- LOG.info("proxy connection " + sendSocket);
+ LOG.info("proxy connection " + sendSocket + ", receiveBufferSize=" + sendSocket.getReceiveBufferSize());
}
public void goOn() {
@@ -210,6 +233,7 @@ public class SocketProxy {
while (true) {
int len = in.read(buf);
if (len == -1) {
+ LOG.debug("read eof from:" + src);
break;
}
pause.get().await();
@@ -259,7 +283,12 @@ public class SocketProxy {
pause.get().await();
try {
Socket source = socket.accept();
- LOG.info("accepted " + source);
+ LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
+ pause.get().await();
+ if (receiveBufferSize > 0) {
+ source.setReceiveBufferSize(receiveBufferSize);
+ }
+ LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
synchronized(connections) {
connections.add(new Connection(source, target));
}