You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/05/18 17:20:34 UTC

svn commit: r945692 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/ main/java/org/apache/activemq/transport/nio/ main/java/org/apache/activemq/transport/stomp/ main/java/org/apache/activemq/transport/tcp/ test/java/org/...

Author: gtully
Date: Tue May 18 15:20:33 2010
New Revision: 945692

URL: http://svn.apache.org/viewvc?rev=945692&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2737 - have nio work with soWritetTimeout filter

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WriteTimeoutFilter.java Tue May 18 15:20:33 2010
@@ -26,6 +26,7 @@ import java.util.concurrent.locks.Condit
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.activemq.transport.tcp.TcpBufferedOutputStream;
+import org.apache.activemq.transport.tcp.TimeStampStream;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -97,8 +98,8 @@ public class WriteTimeoutFilter extends 
     }
 
     
-    protected TcpBufferedOutputStream getWriter() {
-        return next.narrow(TcpBufferedOutputStream.class);
+    protected TimeStampStream getWriter() {
+        return next.narrow(TimeStampStream.class);
     }
     
     protected Socket getSocket() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java Tue May 18 15:20:33 2010
@@ -23,13 +23,15 @@ import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
+import org.apache.activemq.transport.tcp.TimeStampStream;
+
 /**
  * An optimized buffered outputstream for Tcp
  * 
  * @version $Revision: 1.1.1.1 $
  */
 
-public class NIOOutputStream extends OutputStream {
+public class NIOOutputStream extends OutputStream implements TimeStampStream {
 
     private static final int BUFFER_SIZE = 8192;
 
@@ -39,6 +41,7 @@ public class NIOOutputStream extends Out
 
     private int count;
     private boolean closed;
+    private volatile long writeTimestamp = -1;//concurrent reads of this value
 
     /**
      * Constructor
@@ -149,31 +152,51 @@ public class NIOOutputStream extends Out
         int remaining = data.remaining();
         int lastRemaining = remaining - 1;
         long delay = 1;
-        while (remaining > 0) {
-
-            // We may need to do a little bit of sleeping to avoid a busy loop.
-            // Slow down if no data was written out..
-            if (remaining == lastRemaining) {
-                try {
-                    // Use exponential rollback to increase sleep time.
-                    Thread.sleep(delay);
-                    delay *= 2;
-                    if (delay > 1000) {
-                        delay = 1000;
+        try {
+            writeTimestamp = System.currentTimeMillis();
+            while (remaining > 0) {
+
+                // We may need to do a little bit of sleeping to avoid a busy loop.
+                // Slow down if no data was written out..
+                if (remaining == lastRemaining) {
+                    try {
+                        // Use exponential rollback to increase sleep time.
+                        Thread.sleep(delay);
+                        delay *= 2;
+                        if (delay > 1000) {
+                            delay = 1000;
+                        }
+                    } catch (InterruptedException e) {
+                        throw new InterruptedIOException();
                     }
-                } catch (InterruptedException e) {
-                    throw new InterruptedIOException();
+                } else {
+                    delay = 1;
                 }
-            } else {
-                delay = 1;
-            }
-            lastRemaining = remaining;
+                lastRemaining = remaining;
 
-            // Since the write is non-blocking, all the data may not have been
-            // written.
-            out.write(data);
-            remaining = data.remaining();
+                // Since the write is non-blocking, all the data may not have been
+                // written.
+                out.write(data);
+                remaining = data.remaining();
+            }
+        } finally {
+            writeTimestamp = -1;
         }
     }
+    
+    
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
+     */
+    public boolean isWriting() {
+        return writeTimestamp > 0;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
+     */
+    public long getWriteTimestamp() {
+        return writeTimestamp;
+    }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java Tue May 18 15:20:33 2010
@@ -83,8 +83,9 @@ public class NIOTransport extends TcpTra
         currentBuffer = inputBuffer;
         nextFrameSize = -1;
         currentBuffer.limit(4);
-        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 16 * 1024));
-
+        NIOOutputStream outPutStream = new NIOOutputStream(channel, 16 * 1024);
+        this.dataOut = new DataOutputStream(outPutStream);
+        this.buffOut = outPutStream;
     }
 
     private void serviceRead() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Tue May 18 15:20:33 2010
@@ -82,19 +82,26 @@ public class StompConnection {
                 throw new IOException("socket closed.");
             } else if (c == 0) {
                 c = is.read();
-                if (c != '\n') {
-                    throw new IOException("Expecting stomp frame to terminate with \0\n");
+                if (c == '\n') {
+                    // end of frame
+                    return stringFromBuffer(inputBuffer);
+                } else {
+                    inputBuffer.write(0);
+                    inputBuffer.write(c);
                 }
-                byte[] ba = inputBuffer.toByteArray();
-                inputBuffer.reset();
-                return new String(ba, "UTF-8");
             } else {
                 inputBuffer.write(c);
             }
         }
     }
 
-	public Socket getStompSocket() {
+	private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
+	    byte[] ba = inputBuffer.toByteArray();
+        inputBuffer.reset();
+        return new String(ba, "UTF-8");
+    }
+
+    public Socket getStompSocket() {
 		return stompSocket;
 	}
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Tue May 18 15:20:33 2010
@@ -83,7 +83,9 @@ public class StompNIOTransport extends T
         });
 
         inputBuffer = ByteBuffer.allocate(8 * 1024);
-        this.dataOut = new DataOutputStream(new NIOOutputStream(channel, 8 * 1024));
+        NIOOutputStream outPutStream = new NIOOutputStream(channel, 8 * 1024);
+        this.dataOut = new DataOutputStream(outPutStream);
+        this.buffOut = outPutStream;
     }
     
     private void serviceRead() {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java Tue May 18 15:20:33 2010
@@ -27,7 +27,7 @@ import java.io.OutputStream;
  * @version $Revision: 1.1.1.1 $
  */
 
-public class TcpBufferedOutputStream extends FilterOutputStream {
+public class TcpBufferedOutputStream extends FilterOutputStream implements TimeStampStream {
     private static final int BUFFER_SIZE = 8192;
     private byte[] buffer;
     private int bufferlen;
@@ -129,10 +129,16 @@ public class TcpBufferedOutputStream ext
         super.close();
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
+     */
     public boolean isWriting() {
         return writeTimestamp > 0;
     }
     
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
+     */
     public long getWriteTimestamp() {
     	return writeTimestamp;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Tue May 18 15:20:33 2010
@@ -67,7 +67,7 @@ public class TcpTransport extends Transp
     protected Socket socket;
     protected DataOutputStream dataOut;
     protected DataInputStream dataIn;
-    protected TcpBufferedOutputStream buffOut = null;
+    protected TimeStampStream buffOut = null;
     /**
      * The Traffic Class to be set on the socket.
      */
@@ -576,8 +576,9 @@ public class TcpTransport extends Transp
             }
         };
         this.dataIn = new DataInputStream(buffIn);
-        buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
-        this.dataOut = new DataOutputStream(buffOut);
+        TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
+        this.dataOut = new DataOutputStream(outputStream);
+        this.buffOut = outputStream;
     }
 
     protected void closeStreams() throws IOException {
@@ -604,7 +605,7 @@ public class TcpTransport extends Transp
     public <T> T narrow(Class<T> target) {
         if (target == Socket.class) {
             return target.cast(socket);
-        } else if ( target == TcpBufferedOutputStream.class) {
+        } else if ( target == TimeStampStream.class) {
             return target.cast(buffOut);
         }
         return super.narrow(target);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsTestSupport.java Tue May 18 15:20:33 2010
@@ -47,6 +47,7 @@ public class JmsTestSupport extends Comb
     static final private AtomicLong TEST_COUNTER = new AtomicLong();
     public String userName;
     public String password;
+    public String messageTextPrefix = "";
 
     protected ConnectionFactory factory;
     protected ActiveMQConnection connection;
@@ -96,7 +97,7 @@ public class JmsTestSupport extends Comb
     protected void sendMessages(Session session, Destination destination, int count) throws JMSException {
         MessageProducer producer = session.createProducer(destination);
         for (int i = 0; i < count; i++) {
-            producer.send(session.createTextMessage("" + i));
+            producer.send(session.createTextMessage(messageTextPrefix  + i));
         }
         producer.close();
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java?rev=945692&r1=945691&r2=945692&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/util/SocketProxy.java Tue May 18 15:20:33 2010
@@ -20,6 +20,7 @@ package org.apache.activemq.util;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketException;
@@ -38,10 +39,11 @@ public class SocketProxy {
 
     private static final transient Log LOG = LogFactory.getLog(SocketProxy.class);
 
-    public static final int ACCEPT_TIMEOUT_MILLIS = 1000;
+    public static final int ACCEPT_TIMEOUT_MILLIS = 100;
 
     private URI proxyUrl;
     private URI target;
+
     private Acceptor acceptor;
     private ServerSocket serverSocket;
 
@@ -49,6 +51,11 @@ public class SocketProxy {
 
     private int listenPort = 0;
 
+    private int receiveBufferSize = -1;
+
+    public SocketProxy() throws Exception {    
+    }
+    
     public SocketProxy(URI uri) throws Exception {
         this(0, uri);
     }
@@ -59,12 +66,24 @@ public class SocketProxy {
         open();
     }
 
-    protected void open() throws Exception {
+    public void setReceiveBufferSize(int receiveBufferSize) {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+    
+    public void setTarget(URI tcpBrokerUri) {
+        target = tcpBrokerUri;
+    }
+
+    public void open() throws Exception {
+        serverSocket = new ServerSocket();
+        if (receiveBufferSize > 0) {
+            serverSocket.setReceiveBufferSize(receiveBufferSize);
+        }
         if (proxyUrl == null) {
-            serverSocket = new ServerSocket(listenPort);
+            serverSocket.bind(new InetSocketAddress(listenPort));
             proxyUrl = urlFromSocket(target, serverSocket);
         } else {
-            serverSocket = new ServerSocket(proxyUrl.getPort());
+            serverSocket.bind(new InetSocketAddress(proxyUrl.getPort()));
         }
         acceptor = new Acceptor(serverSocket, target);
         new Thread(null, acceptor, "SocketProxy-Acceptor-" + serverSocket.getLocalPort()).start();
@@ -151,9 +170,13 @@ public class SocketProxy {
 
         public Connection(Socket socket, URI target) throws Exception {
             receiveSocket = socket;
-            sendSocket = new Socket(target.getHost(), target.getPort());
+            sendSocket = new Socket();
+            if (receiveBufferSize > 0) {
+                sendSocket.setReceiveBufferSize(receiveBufferSize);
+            }
+            sendSocket.connect(new InetSocketAddress(target.getHost(), target.getPort()));
             linkWithThreads(receiveSocket, sendSocket);
-            LOG.info("proxy connection " + sendSocket);
+            LOG.info("proxy connection " + sendSocket + ", receiveBufferSize=" + sendSocket.getReceiveBufferSize());
         }
 
         public void goOn() {
@@ -210,6 +233,7 @@ public class SocketProxy {
                     while (true) {
                         int len = in.read(buf);
                         if (len == -1) {
+                            LOG.debug("read eof from:" + src);
                             break;
                         }
                         pause.get().await();
@@ -259,7 +283,12 @@ public class SocketProxy {
                     pause.get().await();
                     try {
                         Socket source = socket.accept();
-                        LOG.info("accepted " + source);
+                        LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
+                        pause.get().await();
+                        if (receiveBufferSize > 0) {
+                            source.setReceiveBufferSize(receiveBufferSize);
+                        }
+                        LOG.info("accepted " + source + ", receiveBufferSize:" + source.getReceiveBufferSize());
                         synchronized(connections) {
                             connections.add(new Connection(source, target));
                         }