You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC

svn commit: r563982 [25/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Wed Aug  8 11:56:59 2007
@@ -31,21 +31,22 @@
 import org.apache.activemq.wireformat.WireFormat;
 
 /**
- * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ * Implements marshalling and unmarsalling the <a
+ * href="http://stomp.codehaus.org/">Stomp</a> protocol.
  */
 public class StompWireFormat implements WireFormat {
 
-    private static final byte[] NO_DATA = new byte[]{};
-	private static final byte[] END_OF_FRAME = new byte[]{0,'\n'};
-	
-	private static final int MAX_COMMAND_LENGTH = 1024;
-	private static final int MAX_HEADER_LENGTH = 1024*10;
-	private static final int MAX_HEADERS = 1000;
-	private static final int MAX_DATA_LENGTH = 1024*1024*100;
-    
-	private int version=1;
+    private static final byte[] NO_DATA = new byte[] {};
+    private static final byte[] END_OF_FRAME = new byte[] {0, '\n'};
 
-	public ByteSequence marshal(Object command) throws IOException {
+    private static final int MAX_COMMAND_LENGTH = 1024;
+    private static final int MAX_HEADER_LENGTH = 1024 * 10;
+    private static final int MAX_HEADERS = 1000;
+    private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+
+    private int version = 1;
+
+    public ByteSequence marshal(Object command) throws IOException {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(baos);
         marshal(command, dos);
@@ -60,140 +61,137 @@
     }
 
     public void marshal(Object command, DataOutput os) throws IOException {
-		StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame) command;
+        StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command;
+
+        StringBuffer buffer = new StringBuffer();
+        buffer.append(stomp.getAction());
+        buffer.append(Stomp.NEWLINE);
+
+        // Output the headers.
+        for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+            Map.Entry entry = (Map.Entry)iter.next();
+            buffer.append(entry.getKey());
+            buffer.append(Stomp.Headers.SEPERATOR);
+            buffer.append(entry.getValue());
+            buffer.append(Stomp.NEWLINE);
+        }
 
-		StringBuffer buffer = new StringBuffer();
-		buffer.append(stomp.getAction());
-		buffer.append(Stomp.NEWLINE);
-
-		// Output the headers.
-		for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
-			Map.Entry entry = (Map.Entry) iter.next();
-			buffer.append(entry.getKey());
-			buffer.append(Stomp.Headers.SEPERATOR);
-			buffer.append(entry.getValue());
-			buffer.append(Stomp.NEWLINE);
-		}
-
-		// Add a newline to seperate the headers from the content.
-		buffer.append(Stomp.NEWLINE);
-
-		os.write(buffer.toString().getBytes("UTF-8"));
-		os.write(stomp.getContent());
-		os.write(END_OF_FRAME);
-	}
-    
+        // Add a newline to seperate the headers from the content.
+        buffer.append(Stomp.NEWLINE);
+
+        os.write(buffer.toString().getBytes("UTF-8"));
+        os.write(stomp.getContent());
+        os.write(END_OF_FRAME);
+    }
 
     public Object unmarshal(DataInput in) throws IOException {
-        	
+
         try {
-			String action = null;
-			
-			// skip white space to next real action line
-			while (true) {
-				action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
-				if (action == null) {
-					throw new IOException("connection was closed");
-				} else {
-					action = action.trim();
-					if (action.length() > 0) {
-						break;
-					}
-				}
-			}
-			
-			// Parse the headers
-			HashMap headers = new HashMap(25);
-			while (true) {
-			    String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
-			    if (line != null && line.trim().length() > 0) {
-			    	
-			    	if( headers.size() > MAX_HEADERS )
-			    		throw new ProtocolException("The maximum number of headers was exceeded", true);
-			    	
-			        try {
-			            int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
-			            String name = line.substring(0, seperator_index).trim();
-			            String value = line.substring(seperator_index + 1, line.length()).trim();
-			            headers.put(name, value);
-			        }
-			        catch (Exception e) {
-			            throw new ProtocolException("Unable to parser header line [" + line + "]", true);
-			        }
-			    }
-			    else {
-			        break;
-			    }
-			}
-			
-			// Read in the data part.
-			byte[] data = NO_DATA;
-			String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
-			if (contentLength!=null) {
-			    
-				// Bless the client, he's telling us how much data to read in.        	
-				int length;
-				try {
-					length = Integer.parseInt(contentLength.trim());
-				} catch (NumberFormatException e) {
-					throw new ProtocolException("Specified content-length is not a valid integer", true);
-				}
-
-				if( length > MAX_DATA_LENGTH )
-					throw new ProtocolException("The maximum data length was exceeded", true);
-				
-			    data = new byte[length];
-			    in.readFully(data);
-			    
-			    if (in.readByte() != 0) {
-			        throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte", true);
-			    }
-			
-			} else {
-
-				// We don't know how much to read.. data ends when we hit a 0
-			    byte b;
-			    ByteArrayOutputStream baos=null;
-			    while ((b = in.readByte()) != 0) {
-					
-					if( baos == null ) {
-			    		baos = new ByteArrayOutputStream();
-			    	} else if( baos.size() > MAX_DATA_LENGTH ) {
-			    		throw new ProtocolException("The maximum data length was exceeded", true);
-			    	}
-			    
-			        baos.write(b);
-			    }
-			    
-			    if( baos!=null ) {
-			        baos.close();
-			        data = baos.toByteArray();
-			    }
-			    
-			}
-			
-			return new StompFrame(action, headers, data);
-			
-		} catch (ProtocolException e) {
-			return new StompFrameError(e);
-		} 
+            String action = null;
+
+            // skip white space to next real action line
+            while (true) {
+                action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+                if (action == null) {
+                    throw new IOException("connection was closed");
+                } else {
+                    action = action.trim();
+                    if (action.length() > 0) {
+                        break;
+                    }
+                }
+            }
+
+            // Parse the headers
+            HashMap headers = new HashMap(25);
+            while (true) {
+                String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+                if (line != null && line.trim().length() > 0) {
+
+                    if (headers.size() > MAX_HEADERS)
+                        throw new ProtocolException("The maximum number of headers was exceeded", true);
+
+                    try {
+                        int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+                        String name = line.substring(0, seperator_index).trim();
+                        String value = line.substring(seperator_index + 1, line.length()).trim();
+                        headers.put(name, value);
+                    } catch (Exception e) {
+                        throw new ProtocolException("Unable to parser header line [" + line + "]", true);
+                    }
+                } else {
+                    break;
+                }
+            }
+
+            // Read in the data part.
+            byte[] data = NO_DATA;
+            String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
+            if (contentLength != null) {
+
+                // Bless the client, he's telling us how much data to read in.
+                int length;
+                try {
+                    length = Integer.parseInt(contentLength.trim());
+                } catch (NumberFormatException e) {
+                    throw new ProtocolException("Specified content-length is not a valid integer", true);
+                }
+
+                if (length > MAX_DATA_LENGTH)
+                    throw new ProtocolException("The maximum data length was exceeded", true);
+
+                data = new byte[length];
+                in.readFully(data);
+
+                if (in.readByte() != 0) {
+                    throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
+                }
+
+            } else {
+
+                // We don't know how much to read.. data ends when we hit a 0
+                byte b;
+                ByteArrayOutputStream baos = null;
+                while ((b = in.readByte()) != 0) {
+
+                    if (baos == null) {
+                        baos = new ByteArrayOutputStream();
+                    } else if (baos.size() > MAX_DATA_LENGTH) {
+                        throw new ProtocolException("The maximum data length was exceeded", true);
+                    }
+
+                    baos.write(b);
+                }
+
+                if (baos != null) {
+                    baos.close();
+                    data = baos.toByteArray();
+                }
+
+            }
+
+            return new StompFrame(action, headers, data);
+
+        } catch (ProtocolException e) {
+            return new StompFrameError(e);
+        }
 
     }
 
     private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException {
         byte b;
-        ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
         while ((b = in.readByte()) != '\n') {
-        	if( baos.size() > maxLength )
-        		throw new ProtocolException(errorMessage, true);
+            if (baos.size() > maxLength)
+                throw new ProtocolException(errorMessage, true);
             baos.write(b);
         }
         baos.close();
         ByteSequence sequence = baos.toByteSequence();
-		return new String(sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8");
-	}
+        return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8");
+    }
 
-	public int getVersion() {
+    public int getVersion() {
         return version;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java Wed Aug  8 11:56:59 2007
@@ -33,13 +33,13 @@
 
 /**
  * A Transport class that uses SSL and client-side certificate authentication.
- *
- * Client-side certificate authentication must be enabled through the constructor.
- * By default, this class will have the same client authentication behavior as the socket it is passed.
- * This class will set ConnectionInfo's transportContext to the SSL certificates of the client.
- * NOTE: Accessor method for needClientAuth was not provided on purpose. This is because needClientAuth's value must be
- *      set before the socket is connected. Otherwise, unexpected situations may occur.
- * 
+ * Client-side certificate authentication must be enabled through the
+ * constructor. By default, this class will have the same client authentication
+ * behavior as the socket it is passed. This class will set ConnectionInfo's
+ * transportContext to the SSL certificates of the client. NOTE: Accessor method
+ * for needClientAuth was not provided on purpose. This is because
+ * needClientAuth's value must be set before the socket is connected. Otherwise,
+ * unexpected situations may occur.
  */
 public class SslTransport extends TcpTransport {
     /**
@@ -47,11 +47,11 @@
      * 
      * @param wireFormat The WireFormat to be used.
      * @param socketFactory The socket factory to be used. Forcing SSLSockets
-     *      for obvious reasons.
+     *                for obvious reasons.
      * @param remoteLocation The remote location.
      * @param localLocation The local location.
      * @param needClientAuth If set to true, the underlying socket will need
-     *      client certificate authentication.
+     *                client certificate authentication.
      * @throws UnknownHostException If TcpTransport throws.
      * @throws IOException If TcpTransport throws.
      */
@@ -61,12 +61,10 @@
             ((SSLSocket)this.socket).setNeedClientAuth(needClientAuth);
         }
     }
-    
+
     /**
-     * Initialize from a ServerSocket.
-     * 
-     * No access to needClientAuth is given since it is already set within the
-     *      provided socket.
+     * Initialize from a ServerSocket. No access to needClientAuth is given
+     * since it is already set within the provided socket.
      * 
      * @param wireFormat The WireFormat to be used.
      * @param socket The Socket to be used. Forcing SSL.
@@ -75,31 +73,31 @@
     public SslTransport(WireFormat wireFormat, SSLSocket socket) throws IOException {
         super(wireFormat, socket);
     }
-    
+
     /**
-     * Overriding in order to add the client's certificates to ConnectionInfo Commmands. 
+     * Overriding in order to add the client's certificates to ConnectionInfo
+     * Commmands.
      * 
      * @param command The Command coming in.
      */
     public void doConsume(Command command) {
         // The instanceof can be avoided, but that would require modifying the
-        //      Command clas tree and that would require too much effort right
-        //      now.
-        if ( command instanceof ConnectionInfo ) {
+        // Command clas tree and that would require too much effort right
+        // now.
+        if (command instanceof ConnectionInfo) {
             ConnectionInfo connectionInfo = (ConnectionInfo)command;
-            
+
             SSLSocket sslSocket = (SSLSocket)this.socket;
-            
+
             SSLSession sslSession = sslSocket.getSession();
-            
+
             X509Certificate[] clientCertChain;
             try {
-                clientCertChain =
-                    (X509Certificate[]) sslSession.getPeerCertificates();
-            } catch(SSLPeerUnverifiedException e) {
+                clientCertChain = (X509Certificate[])sslSession.getPeerCertificates();
+            } catch (SSLPeerUnverifiedException e) {
                 clientCertChain = null;
             }
-            
+
             connectionInfo.setTransportContext(clientCertChain);
         }
 
@@ -110,8 +108,7 @@
      * @return pretty print of 'this'
      */
     public String toString() {
-        return "ssl://"+socket.getInetAddress()+":"+socket.getPort();
+        return "ssl://" + socket.getInetAddress() + ":" + socket.getPort();
     }
 
 }
-

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java Wed Aug  8 11:56:59 2007
@@ -49,10 +49,10 @@
 import javax.net.ssl.TrustManager;
 
 /**
- * An implementation of the TcpTransportFactory using SSL.
- * 
- * The major contribution from this class is that it is aware of SslTransportServer and SslTransport classes.
- * All Transports and TransportServers created from this factory will have their needClientAuth option set to false.
+ * An implementation of the TcpTransportFactory using SSL. The major
+ * contribution from this class is that it is aware of SslTransportServer and
+ * SslTransport classes. All Transports and TransportServers created from this
+ * factory will have their needClientAuth option set to false.
  * 
  * @author sepandm@gmail.com (Sepand)
  * @version $Revision: $
@@ -60,17 +60,16 @@
 public class SslTransportFactory extends TcpTransportFactory {
     // The context used to creat ssl sockets.
     private SSLContext sslContext = null;
-    
+
     // The log this uses.,
     private static final Log log = LogFactory.getLog(SslTransportFactory.class);
-    
+
     /**
      * Constructor. Nothing special.
-     *
      */
     public SslTransportFactory() {
     }
-    
+
     /**
      * Overriding to use SslTransportServer and allow for proper reflection.
      */
@@ -79,31 +78,29 @@
             Map options = new HashMap(URISupport.parseParamters(location));
 
             ServerSocketFactory serverSocketFactory = createServerSocketFactory();
-            SslTransportServer server =
-                new SslTransportServer(this, location, (SSLServerSocketFactory)serverSocketFactory);  
+            SslTransportServer server = new SslTransportServer(this, location, (SSLServerSocketFactory)serverSocketFactory);
             server.setWireFormatFactory(createWireFormatFactory(options));
             IntrospectionSupport.setProperties(server, options);
             Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
             server.setTransportOption(transportOptions);
             server.bind();
-            
+
             return server;
-        }
-        catch (URISyntaxException e) {
+        } catch (URISyntaxException e) {
             throw IOExceptionSupport.create(e);
         }
     }
-    
+
     /**
      * Overriding to allow for proper configuration through reflection.
      */
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        
-        SslTransport sslTransport = (SslTransport) transport.narrow(SslTransport.class);
+
+        SslTransport sslTransport = (SslTransport)transport.narrow(SslTransport.class);
         IntrospectionSupport.setProperties(sslTransport, options);
-        
+
         Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
-        
+
         sslTransport.setSocketOptions(socketOptions);
 
         if (sslTransport.isTrace()) {
@@ -116,14 +113,14 @@
         if (format instanceof OpenWireFormat) {
             transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, sslTransport.getMinmumWireFormatVersion());
         }
-        
+
         return transport;
     }
-    
+
     /**
      * Overriding to use SslTransports.
      */
-    protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{
+    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
         URI localLocation = null;
         String path = location.getPath();
         // see if the path is a local URI location
@@ -133,18 +130,16 @@
                 Integer.parseInt(path.substring((localPortIndex + 1), path.length()));
                 String localString = location.getScheme() + ":/" + path;
                 localLocation = new URI(localString);
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 log.warn("path isn't a valid local location for SslTransport to use", e);
             }
         }
         SocketFactory socketFactory = createSocketFactory();
         return new SslTransport(wf, (SSLSocketFactory)socketFactory, location, localLocation, false);
     }
-    
+
     /**
      * Sets the key and trust managers used in constructed socket factories.
-     * 
      * Passes given arguments to SSLContext.init(...).
      * 
      * @param km The sources of authentication keys or null.
@@ -161,36 +156,31 @@
         }
         sslContext.init(km, tm, random);
     }
-    
+
     /**
-     * Creates a new SSL ServerSocketFactory.
-     * 
-     * The given factory will use user-provided key and trust managers (if the user provided them).
+     * Creates a new SSL ServerSocketFactory. The given factory will use
+     * user-provided key and trust managers (if the user provided them).
      * 
      * @return Newly created (Ssl)ServerSocketFactory.
      */
     protected ServerSocketFactory createServerSocketFactory() {
         if (sslContext == null) {
             return SSLServerSocketFactory.getDefault();
-        }
-        else
+        } else
             return sslContext.getServerSocketFactory();
     }
 
     /**
-     * Creates a new SSL SocketFactory.
-     * 
-     * The given factory will use user-provided key and trust managers (if the user provided them).
+     * Creates a new SSL SocketFactory. The given factory will use user-provided
+     * key and trust managers (if the user provided them).
      * 
      * @return Newly created (Ssl)SocketFactory.
      */
     protected SocketFactory createSocketFactory() {
-        if ( sslContext == null ) {
+        if (sslContext == null) {
             return SSLSocketFactory.getDefault();
-        }
-        else
+        } else
             return sslContext.getSocketFactory();
     }
-    
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java Wed Aug  8 11:56:59 2007
@@ -19,107 +19,108 @@
 import java.io.FilterInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+
 /**
  * An optimized buffered input stream for Tcp
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public class TcpBufferedInputStream extends FilterInputStream{
-    private static final int DEFAULT_BUFFER_SIZE=8192;
+public class TcpBufferedInputStream extends FilterInputStream {
+    private static final int DEFAULT_BUFFER_SIZE = 8192;
     protected byte internalBuffer[];
     protected int count;
     protected int position;
 
-    public TcpBufferedInputStream(InputStream in){
-        this(in,DEFAULT_BUFFER_SIZE);
+    public TcpBufferedInputStream(InputStream in) {
+        this(in, DEFAULT_BUFFER_SIZE);
     }
 
-    public TcpBufferedInputStream(InputStream in,int size){
+    public TcpBufferedInputStream(InputStream in, int size) {
         super(in);
-        if(size<=0){
+        if (size <= 0) {
             throw new IllegalArgumentException("Buffer size <= 0");
         }
-        internalBuffer=new byte[size];
+        internalBuffer = new byte[size];
     }
 
-    private void fill() throws IOException{
-        byte[] buffer=internalBuffer;
-        count=position=0;
-        int n=in.read(buffer,position,buffer.length-position);
-        if(n>0)
-            count=n+position;
+    private void fill() throws IOException {
+        byte[] buffer = internalBuffer;
+        count = position = 0;
+        int n = in.read(buffer, position, buffer.length - position);
+        if (n > 0)
+            count = n + position;
     }
 
-    public int read() throws IOException{
-        if(position>=count){
+    public int read() throws IOException {
+        if (position >= count) {
             fill();
-            if(position>=count)
+            if (position >= count)
                 return -1;
         }
-        return internalBuffer[position++]&0xff;
+        return internalBuffer[position++] & 0xff;
     }
 
-    private int readStream(byte[] b,int off,int len) throws IOException{
-        int avail=count-position;
-        if(avail<=0){
-            if(len>=internalBuffer.length){
-                return in.read(b,off,len);
+    private int readStream(byte[] b, int off, int len) throws IOException {
+        int avail = count - position;
+        if (avail <= 0) {
+            if (len >= internalBuffer.length) {
+                return in.read(b, off, len);
             }
             fill();
-            avail=count-position;
-            if(avail<=0)
+            avail = count - position;
+            if (avail <= 0)
                 return -1;
         }
-        int cnt=(avail<len)?avail:len;
-        System.arraycopy(internalBuffer,position,b,off,cnt);
-        position+=cnt;
+        int cnt = (avail < len) ? avail : len;
+        System.arraycopy(internalBuffer, position, b, off, cnt);
+        position += cnt;
         return cnt;
     }
 
-    public int read(byte b[],int off,int len) throws IOException{
-        if((off|len|(off+len)|(b.length-(off+len)))<0){
+    public int read(byte b[], int off, int len) throws IOException {
+        if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
             throw new IndexOutOfBoundsException();
-        }else if(len==0){
+        } else if (len == 0) {
             return 0;
         }
-        int n=0;
-        for(;;){
-            int nread=readStream(b,off+n,len-n);
-            if(nread<=0)
-                return (n==0)?nread:n;
-            n+=nread;
-            if(n>=len)
+        int n = 0;
+        for (;;) {
+            int nread = readStream(b, off + n, len - n);
+            if (nread <= 0)
+                return (n == 0) ? nread : n;
+            n += nread;
+            if (n >= len)
                 return n;
             // if not closed but no bytes available, return
-            InputStream input=in;
-            if(input!=null&&input.available()<=0)
+            InputStream input = in;
+            if (input != null && input.available() <= 0)
                 return n;
         }
     }
 
-    public long skip(long n) throws IOException{
-        if(n<=0){
+    public long skip(long n) throws IOException {
+        if (n <= 0) {
             return 0;
         }
-        long avail=count-position;
-        if(avail<=0){
+        long avail = count - position;
+        if (avail <= 0) {
             return in.skip(n);
         }
-        long skipped=(avail<n)?avail:n;
-        position+=skipped;
+        long skipped = (avail < n) ? avail : n;
+        position += skipped;
         return skipped;
     }
 
-    public int available() throws IOException{
-        return in.available()+(count-position);
+    public int available() throws IOException {
+        return in.available() + (count - position);
     }
 
-    public boolean markSupported(){
+    public boolean markSupported() {
         return false;
     }
 
-    public void close() throws IOException{
-        if(in!=null)
+    public void close() throws IOException {
+        if (in != null)
             in.close();
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java Wed Aug  8 11:56:59 2007
@@ -24,7 +24,7 @@
 
 /**
  * An optimized buffered outputstream for Tcp
- *
+ * 
  * @version $Revision: 1.1.1.1 $
  */
 
@@ -37,7 +37,7 @@
 
     /**
      * Constructor
-     *
+     * 
      * @param out
      */
     public TcpBufferedOutputStream(OutputStream out) {
@@ -45,10 +45,10 @@
     }
 
     /**
-     * Creates a new buffered output stream to write data to the specified underlying output stream with the specified
-     * buffer size.
-     *
-     * @param out  the underlying output stream.
+     * Creates a new buffered output stream to write data to the specified
+     * underlying output stream with the specified buffer size.
+     * 
+     * @param out the underlying output stream.
      * @param size the buffer size.
      * @throws IllegalArgumentException if size <= 0.
      */
@@ -58,49 +58,47 @@
             throw new IllegalArgumentException("Buffer size <= 0");
         }
         buffer = new byte[size];
-        bufferlen=size;
+        bufferlen = size;
     }
 
     /**
      * write a byte on to the stream
-     *
+     * 
      * @param b - byte to write
      * @throws IOException
      */
     public void write(int b) throws IOException {
-        if ((bufferlen-count) < 1) {
+        if ((bufferlen - count) < 1) {
             flush();
         }
-        buffer[count++] = (byte) b;
+        buffer[count++] = (byte)b;
     }
 
-
     /**
      * write a byte array to the stream
-     *
-     * @param b   the byte buffer
+     * 
+     * @param b the byte buffer
      * @param off the offset into the buffer
      * @param len the length of data to write
      * @throws IOException
      */
     public void write(byte b[], int off, int len) throws IOException {
-        if ((bufferlen-count) < len) {
+        if ((bufferlen - count) < len) {
             flush();
         }
         if (buffer.length >= len) {
             System.arraycopy(b, off, buffer, count, len);
             count += len;
-        }
-        else {
+        } else {
             out.write(b, off, len);
         }
     }
 
     /**
-     * flush the data to the output stream
-     * This doesn't call flush on the underlying outputstream, because
-     * Tcp is particularly efficent at doing this itself ....
-     *
+     * flush the data to the output stream This doesn't call flush on the
+     * underlying outputstream, because Tcp is particularly efficent at doing
+     * this itself ....
+     * 
      * @throws IOException
      */
     public void flush() throws IOException {
@@ -112,7 +110,7 @@
 
     /**
      * close this stream
-     *
+     * 
      * @throws IOException
      */
     public void close() throws IOException {
@@ -120,10 +118,9 @@
         closed = true;
     }
 
-
     /**
      * Checks that the stream has not been closed
-     *
+     * 
      * @throws IOException
      */
     private final void checkClosed() throws IOException {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Aug  8 11:56:59 2007
@@ -45,7 +45,7 @@
 
 /**
  * An implementation of the {@link Transport} interface using raw tcp/ip
- *
+ * 
  * @version $Revision$
  */
 public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
@@ -74,12 +74,11 @@
 
     /**
      * Connect to a remote Node - e.g. a Broker
-     *
+     * 
      * @param wireFormat
      * @param socketFactory
      * @param remoteLocation
-     * @param localLocation  -
-     *                       e.g. local InetAddress and local port
+     * @param localLocation - e.g. local InetAddress and local port
      * @throws IOException
      * @throws UnknownHostException
      */
@@ -88,8 +87,7 @@
         this.socketFactory = socketFactory;
         try {
             this.socket = socketFactory.createSocket();
-        }
-        catch (SocketException e) {
+        } catch (SocketException e) {
             this.socket = null;
         }
         this.remoteLocation = remoteLocation;
@@ -97,10 +95,9 @@
         setDaemon(false);
     }
 
-
     /**
      * Initialize from a server Socket
-     *
+     * 
      * @param wireFormat
      * @param socket
      * @throws IOException
@@ -135,28 +132,25 @@
     public void run() {
         log.trace("TCP consumer thread starting");
         try {
-	        while (!isStopped()) {
-	            doRun();
-	        }
+            while (!isStopped()) {
+                doRun();
+            }
         } catch (IOException e) {
-        	stoppedLatch.get().countDown();
+            stoppedLatch.get().countDown();
             onException(e);
         } finally {
-        	stoppedLatch.get().countDown();
+            stoppedLatch.get().countDown();
         }
     }
 
-
-	protected void doRun() throws IOException {
-		try {
-		    Object command = readCommand();
-		    doConsume(command);
-		}
-		catch (SocketTimeoutException e) {
-		}
-		catch (InterruptedIOException e) {
-		}
-	}
+    protected void doRun() throws IOException {
+        try {
+            Object command = readCommand();
+            doConsume(command);
+        } catch (SocketTimeoutException e) {
+        } catch (InterruptedIOException e) {
+        }
+    }
 
     protected Object readCommand() throws IOException {
         return wireFormat.unmarshal(dataIn);
@@ -248,22 +242,21 @@
     public void setTcpNoDelay(Boolean tcpNoDelay) {
         this.tcpNoDelay = tcpNoDelay;
     }
-    
+
     /**
      * @return the ioBufferSize
      */
-    public int getIoBufferSize(){
+    public int getIoBufferSize() {
         return this.ioBufferSize;
     }
 
     /**
      * @param ioBufferSize the ioBufferSize to set
      */
-    public void setIoBufferSize(int ioBufferSize){
-        this.ioBufferSize=ioBufferSize;
+    public void setIoBufferSize(int ioBufferSize) {
+        this.ioBufferSize = ioBufferSize;
     }
 
-
     // Implementation methods
     // -------------------------------------------------------------------------
     protected String resolveHostName(String host) throws UnknownHostException {
@@ -278,7 +271,7 @@
 
     /**
      * Configures the socket for use
-     *
+     * 
      * @param sock
      * @throws SocketException
      */
@@ -290,8 +283,7 @@
         try {
             sock.setReceiveBufferSize(socketBufferSize);
             sock.setSendBufferSize(socketBufferSize);
-        }
-        catch (SocketException se) {
+        } catch (SocketException se) {
             log.warn("Cannot set socket buffer size = " + socketBufferSize);
             log.debug("Cannot set socket buffer size. Reason: " + se, se);
         }
@@ -340,20 +332,17 @@
             if (remoteAddress != null) {
                 if (connectionTimeout >= 0) {
                     socket.connect(remoteAddress, connectionTimeout);
-                }
-                else {
+                } else {
                     socket.connect(remoteAddress);
                 }
             }
 
-        }
-        else {
+        } else {
             // For SSL sockets.. you can't create an unconnected socket :(
             // This means the timout option are not supported either.
             if (localAddress != null) {
                 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
-            }
-            else {
+            } else {
                 socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
             }
         }
@@ -362,7 +351,6 @@
         initializeStreams();
     }
 
-    
     protected void doStop(ServiceStopper stopper) throws Exception {
         if (log.isDebugEnabled()) {
             log.debug("Stopping transport " + this);
@@ -375,18 +363,17 @@
             socket.close();
         }
     }
-    
-    
+
     /**
      * Override so that stop() blocks until the run thread is no longer running.
      */
     @Override
     public void stop() throws Exception {
-    	super.stop();
-    	CountDownLatch countDownLatch = stoppedLatch.get();
-    	if( countDownLatch!=null ) {
-    		countDownLatch.await();
-    	}
+        super.stop();
+        CountDownLatch countDownLatch = stoppedLatch.get();
+        if (countDownLatch != null) {
+            countDownLatch.await();
+        }
     }
 
     protected void initializeStreams() throws Exception {
@@ -416,7 +403,4 @@
         return null;
     }
 
-
-    
-  
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Aug  8 11:56:59 2007
@@ -42,6 +42,7 @@
 
 public class TcpTransportFactory extends TransportFactory {
     private static final Log log = LogFactory.getLog(TcpTransportFactory.class);
+
     public TransportServer doBind(String brokerId, final URI location) throws IOException {
         try {
             Map options = new HashMap(URISupport.parseParamters(location));
@@ -53,16 +54,16 @@
             Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
             server.setTransportOption(transportOptions);
             server.bind();
-            
+
             return server;
-        }
-        catch (URISyntaxException e) {
+        } catch (URISyntaxException e) {
             throw IOExceptionSupport.create(e);
         }
     }
 
     /**
-     * Allows subclasses of TcpTransportFactory to create custom instances of TcpTransportServer.
+     * Allows subclasses of TcpTransportFactory to create custom instances of
+     * TcpTransportServer.
      * 
      * @param location
      * @param serverSocketFactory
@@ -70,16 +71,16 @@
      * @throws IOException
      * @throws URISyntaxException
      */
-	protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
-		return new TcpTransportServer(this, location, serverSocketFactory);
-	}
+    protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+        return new TcpTransportServer(this, location, serverSocketFactory);
+    }
 
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        
-        TcpTransport tcpTransport = (TcpTransport) transport.narrow(TcpTransport.class);
+
+        TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class);
         IntrospectionSupport.setProperties(tcpTransport, options);
-        
-        Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");        
+
+        Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
         tcpTransport.setSocketOptions(socketOptions);
 
         if (tcpTransport.isTrace()) {
@@ -91,10 +92,10 @@
         }
 
         // Only need the WireFormatNegotiator if using openwire
-        if( format instanceof OpenWireFormat ) {
-        	transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
+        if (format instanceof OpenWireFormat) {
+            transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
         }
-        
+
         return transport;
     }
 
@@ -105,9 +106,9 @@
         return true;
     }
 
-    protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{
-        URI localLocation=null;
-        String path=location.getPath();
+    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+        URI localLocation = null;
+        String path = location.getPath();
         // see if the path is a local URI location
         if (path != null && path.length() > 0) {
             int localPortIndex = path.indexOf(':');
@@ -115,8 +116,7 @@
                 Integer.parseInt(path.substring((localPortIndex + 1), path.length()));
                 String localString = location.getScheme() + ":/" + path;
                 localLocation = new URI(localString);
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 log.warn("path isn't a valid local location for TcpTransport to use", e);
             }
         }
@@ -125,19 +125,20 @@
     }
 
     /**
-     * Allows subclasses of TcpTransportFactory to provide a create custom TcpTransport intances. 
+     * Allows subclasses of TcpTransportFactory to provide a create custom
+     * TcpTransport intances.
      * 
      * @param location
      * @param wf
      * @param socketFactory
-     * @param localLocation 
+     * @param localLocation
      * @return
      * @throws UnknownHostException
      * @throws IOException
      */
-	protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
-		return new TcpTransport(wf, socketFactory, location, localLocation);
-	}
+    protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+        return new TcpTransport(wf, socketFactory, location, localLocation);
+    }
 
     protected ServerSocketFactory createServerSocketFactory() {
         return ServerSocketFactory.getDefault();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Aug  8 11:56:59 2007
@@ -49,7 +49,7 @@
  */
 
 public class TcpTransportServer extends TransportServerThreadSupport {
-	
+
     private static final Log log = LogFactory.getLog(TcpTransportServer.class);
     protected ServerSocket serverSocket;
     protected int backlog = 5000;
@@ -60,16 +60,16 @@
     protected boolean trace;
     protected Map transportOptions;
     protected final ServerSocketFactory serverSocketFactory;
-    
+
     public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         super(location);
-        this.transportFactory=transportFactory;
-		this.serverSocketFactory = serverSocketFactory;
+        this.transportFactory = transportFactory;
+        this.serverSocketFactory = serverSocketFactory;
     }
 
     public void bind() throws IOException {
-    	URI bind = getBindLocation();
-    	
+        URI bind = getBindLocation();
+
         String host = bind.getHost();
         host = (host == null || host.length() == 0) ? "localhost" : host;
         InetAddress addr = InetAddress.getByName(host);
@@ -77,31 +77,29 @@
         try {
             if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
                 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog);
-            }
-            else {
+            } else {
                 this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
             }
             this.serverSocket.setSoTimeout(2000);
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
             throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
         }
         try {
-			setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(),
-					bind.getQuery(), bind.getFragment()));
-		} catch (URISyntaxException e) {
+            setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
+                .getFragment()));
+        } catch (URISyntaxException e) {
 
-            // it could be that the host name contains invalid characters such as _ on unix platforms
+            // it could be that the host name contains invalid characters such
+            // as _ on unix platforms
             // so lets try use the IP address instead
             try {
-                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
-                        bind.getQuery(), bind.getFragment()));
+                setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
             } catch (URISyntaxException e2) {
                 throw IOExceptionSupport.create(e2);
             }
         }
     }
-    
+
     /**
      * @return Returns the wireFormatFactory.
      */
@@ -110,8 +108,7 @@
     }
 
     /**
-     * @param wireFormatFactory
-     *            The wireFormatFactory to set.
+     * @param wireFormatFactory The wireFormatFactory to set.
      */
     public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
         this.wireFormatFactory = wireFormatFactory;
@@ -161,8 +158,7 @@
                 if (socket != null) {
                     if (isStopped() || getAcceptListener() == null) {
                         socket.close();
-                    }
-                    else {
+                    } else {
                         HashMap options = new HashMap();
                         options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
                         options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
@@ -174,13 +170,11 @@
                         getAcceptListener().onAccept(configuredTransport);
                     }
                 }
-            }
-            catch (SocketTimeoutException ste) {
+            } catch (SocketTimeoutException ste) {
                 // expect this to happen
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 if (!isStopping()) {
-                    onAcceptError(e); 
+                    onAcceptError(e);
                 } else if (!isStopped()) {
                     log.warn("run()", e);
                     onAcceptError(e);
@@ -190,25 +184,26 @@
     }
 
     /**
-     * Allow derived classes to override the Transport implementation that this transport server creates.
+     * Allow derived classes to override the Transport implementation that this
+     * transport server creates.
+     * 
      * @param socket
      * @param format
      * @return
      * @throws IOException
      */
-	protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
-		return new TcpTransport(format, socket);
-	}
+    protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+        return new TcpTransport(format, socket);
+    }
 
     /**
      * @return pretty print of this
      */
     public String toString() {
-        return ""+getBindLocation();
+        return "" + getBindLocation();
     }
 
     /**
-     * 
      * @param hostName
      * @return real hostName
      * @throws UnknownHostException

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java Wed Aug  8 11:56:59 2007
@@ -56,8 +56,8 @@
     private int defaultMarshalBufferSize = 64 * 1024;
 
     public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
-            SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramChannel channel,
-            ByteBufferPool bufferPool) {
+                                  SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
+                                  DatagramChannel channel, ByteBufferPool bufferPool) {
         super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
         this.channel = channel;
         this.bufferPool = bufferPool;
@@ -97,7 +97,7 @@
                 // the ByteBuffer to avoid object allocation and unnecessary
                 // buffering?
                 DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
-                answer = (Command) wireFormat.unmarshal(dataIn);
+                answer = (Command)wireFormat.unmarshal(dataIn);
                 break;
             }
         }
@@ -151,15 +151,14 @@
                     // lets remove the header of the partial command
                     // which is the byte for the type and an int for the size of
                     // the byte[]
-                    chunkSize -= 1 // the data type
-                    + 4 // the command ID
-                    + 4; // the size of the partial data
+
+                    // data type + the command ID + size of the partial data
+                    chunkSize -= 1 + 4 + 4;
 
                     // the boolean flags
                     if (bs != null) {
                         chunkSize -= bs.marshalledSize();
-                    }
-                    else {
+                    } else {
                         chunkSize -= 1;
                     }
 
@@ -176,8 +175,7 @@
 
                     if (lastFragment) {
                         writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE);
-                    }
-                    else {
+                    } else {
                         writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
                     }
 
@@ -191,7 +189,7 @@
                     }
                     writeBuffer.putInt(commandId);
                     if (bs == null) {
-                        writeBuffer.put((byte) 1);
+                        writeBuffer.put((byte)1);
                     }
 
                     // size of byte array
@@ -203,8 +201,7 @@
                     offset += chunkSize;
                     sendWriteBuffer(commandId, address, writeBuffer, false);
                 }
-            }
-            else {
+            } else {
                 writeBuffer.put(data);
                 sendWriteBuffer(command.getCommandId(), address, writeBuffer, false);
             }
@@ -227,15 +224,15 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery)
-            throws IOException {
+    protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer,
+                                   boolean redelivery) throws IOException {
         // lets put the datagram into the replay buffer first to prevent timing
         // issues
         ReplayBuffer bufferCache = getReplayBuffer();
         if (bufferCache != null && !redelivery) {
             bufferCache.addBuffer(commandId, writeBuffer);
         }
-        
+
         writeBuffer.flip();
 
         if (log.isDebugEnabled()) {
@@ -247,10 +244,9 @@
 
     public void sendBuffer(int commandId, Object buffer) throws IOException {
         if (buffer != null) {
-            ByteBuffer writeBuffer = (ByteBuffer) buffer;
+            ByteBuffer writeBuffer = (ByteBuffer)buffer;
             sendWriteBuffer(commandId, getReplayAddress(), writeBuffer, true);
-        }
-        else {
+        } else {
             if (log.isWarnEnabled()) {
                 log.warn("Request for buffer: " + commandId + " is no longer present");
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java Wed Aug  8 11:56:59 2007
@@ -48,8 +48,8 @@
     private Object readLock = new Object();
     private Object writeLock = new Object();
 
-    public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
-            SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel) {
+    public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
+                                 DatagramSocket channel) {
         super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
         this.channel = channel;
     }
@@ -73,7 +73,7 @@
                 DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData()));
 
                 from = headerMarshaller.createEndpoint(datagram, dataIn);
-                answer = (Command) wireFormat.unmarshal(dataIn);
+                answer = (Command)wireFormat.unmarshal(dataIn);
                 break;
             }
         }
@@ -100,8 +100,7 @@
 
             if (remaining(writeBuffer) >= 0) {
                 sendWriteBuffer(address, writeBuffer, command.getCommandId());
-            }
-            else {
+            } else {
                 // lets split the command up into chunks
                 byte[] data = writeBuffer.toByteArray();
                 boolean lastFragment = false;
@@ -125,15 +124,14 @@
                     // lets remove the header of the partial command
                     // which is the byte for the type and an int for the size of
                     // the byte[]
-                    chunkSize -= 1 // the data type
-                    + 4 // the command ID
-                    + 4; // the size of the partial data
+
+                    // data type + the command ID + size of the partial data
+                    chunkSize -= 1 + 4 + 4;
 
                     // the boolean flags
                     if (bs != null) {
                         chunkSize -= bs.marshalledSize();
-                    }
-                    else {
+                    } else {
                         chunkSize -= 1;
                     }
 
@@ -150,8 +148,7 @@
 
                     if (lastFragment) {
                         dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE);
-                    }
-                    else {
+                    } else {
                         dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
                     }
 
@@ -165,7 +162,7 @@
                     }
                     dataOut.writeInt(commandId);
                     if (bs == null) {
-                        dataOut.write((byte) 1);
+                        dataOut.write((byte)1);
                     }
 
                     // size of byte array
@@ -191,14 +188,12 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId)
-            throws IOException {
+    protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
         byte[] data = writeBuffer.toByteArray();
         sendWriteBuffer(commandId, address, data, false);
     }
 
-    protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery)
-            throws IOException {
+    protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery) throws IOException {
         // lets put the datagram into the replay buffer first to prevent timing
         // issues
         ReplayBuffer bufferCache = getReplayBuffer();
@@ -216,10 +211,9 @@
 
     public void sendBuffer(int commandId, Object buffer) throws IOException {
         if (buffer != null) {
-            byte[] data = (byte[]) buffer;
+            byte[] data = (byte[])buffer;
             sendWriteBuffer(commandId, replayAddress, data, true);
-        }
-        else {
+        } else {
             if (log.isWarnEnabled()) {
                 log.warn("Request for buffer: " + commandId + " is no longer present");
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Wed Aug  8 11:56:59 2007
@@ -52,8 +52,8 @@
 public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
     private static final Log log = LogFactory.getLog(UdpTransport.class);
 
-	private static final int MAX_BIND_ATTEMPTS = 50;
-	private static final long BIND_ATTEMPT_DELAY = 100;
+    private static final int MAX_BIND_ATTEMPTS = 50;
+    private static final long BIND_ATTEMPT_DELAY = 100;
 
     private CommandChannel commandChannel;
     private OpenWireFormat wireFormat;
@@ -98,12 +98,11 @@
         this.description = getProtocolName() + "Server@";
     }
 
-
     /**
      * Creates a replayer for working with the reliable transport
      */
     public Replayer createReplayer() throws IOException {
-        if (replayEnabled ) {
+        if (replayEnabled) {
             return getCommandChannel();
         }
         return null;
@@ -124,7 +123,7 @@
             log.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command);
         }
         checkStarted();
-        commandChannel.write((Command) command, address);
+        commandChannel.write((Command)command, address);
     }
 
     /**
@@ -133,8 +132,7 @@
     public String toString() {
         if (description != null) {
             return description + port;
-        }
-        else {
+        } else {
             return getProtocolUriScheme() + targetAddress + "@" + port;
         }
     }
@@ -148,47 +146,38 @@
             try {
                 Command command = commandChannel.read();
                 doConsume(command);
-            }
-            catch (AsynchronousCloseException e) {
+            } catch (AsynchronousCloseException e) {
                 // DatagramChannel closed
                 try {
                     stop();
-                }
-                catch (Exception e2) {
+                } catch (Exception e2) {
                     log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
                 }
-            }
-            catch (SocketException e) {
+            } catch (SocketException e) {
                 // DatagramSocket closed
                 log.debug("Socket closed: " + e, e);
                 try {
                     stop();
-                }
-                catch (Exception e2) {
+                } catch (Exception e2) {
                     log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
                 }
-            }
-            catch (EOFException e) {
+            } catch (EOFException e) {
                 // DataInputStream closed
                 log.debug("Socket closed: " + e, e);
                 try {
                     stop();
-                }
-                catch (Exception e2) {
+                } catch (Exception e2) {
                     log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
                 }
-            }
-            catch (Exception e) {
+            } catch (Exception e) {
                 try {
                     stop();
-                }
-                catch (Exception e2) {
+                } catch (Exception e2) {
                     log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
                 }
                 if (e instanceof IOException) {
-                    onException((IOException) e);
-                }
-                else {
+                    onException((IOException)e);
+                } else {
                     log.error("Caught: " + e, e);
                     e.printStackTrace();
                 }
@@ -204,7 +193,7 @@
      */
     public void setTargetEndpoint(Endpoint newTarget) {
         if (newTarget instanceof DatagramEndpoint) {
-            DatagramEndpoint endpoint = (DatagramEndpoint) newTarget;
+            DatagramEndpoint endpoint = (DatagramEndpoint)newTarget;
             SocketAddress address = endpoint.getAddress();
             if (address != null) {
                 if (originalTargetAddress == null) {
@@ -305,14 +294,15 @@
     public void setSequenceGenerator(IntSequenceGenerator sequenceGenerator) {
         this.sequenceGenerator = sequenceGenerator;
     }
-    
+
     public boolean isReplayEnabled() {
         return replayEnabled;
     }
 
     /**
-     * Sets whether or not replay should be enabled when using the reliable transport.
-     * i.e. should we maintain a buffer of messages that can be replayed?
+     * Sets whether or not replay should be enabled when using the reliable
+     * transport. i.e. should we maintain a buffer of messages that can be
+     * replayed?
      */
     public void setReplayEnabled(boolean replayEnabled) {
         this.replayEnabled = replayEnabled;
@@ -328,7 +318,7 @@
     public void setBufferPool(ByteBufferPool bufferPool) {
         this.bufferPool = bufferPool;
     }
-    
+
     public ReplayBuffer getReplayBuffer() {
         return replayBuffer;
     }
@@ -338,7 +328,6 @@
         getCommandChannel().setReplayBuffer(replayBuffer);
     }
 
-    
     // Implementation methods
     // -------------------------------------------------------------------------
 
@@ -391,26 +380,28 @@
         if (log.isDebugEnabled()) {
             log.debug("Binding to address: " + localAddress);
         }
-        
+
         //
-        // We have noticed that on some platfoms like linux, after you close down
-        // a previously bound socket, it can take a little while before we can bind it again.
+        // We have noticed that on some platfoms like linux, after you close
+        // down
+        // a previously bound socket, it can take a little while before we can
+        // bind it again.
         // 
-        for(int i=0; i < MAX_BIND_ATTEMPTS; i++){
-			try {
-				socket.bind(localAddress);
-				return;
-			} catch (BindException e) {
-				if ( i+1 == MAX_BIND_ATTEMPTS )
-					throw e;
-				try {
-					Thread.sleep(BIND_ATTEMPT_DELAY);
-				} catch (InterruptedException e1) {
+        for (int i = 0; i < MAX_BIND_ATTEMPTS; i++) {
+            try {
+                socket.bind(localAddress);
+                return;
+            } catch (BindException e) {
+                if (i + 1 == MAX_BIND_ATTEMPTS)
+                    throw e;
+                try {
+                    Thread.sleep(BIND_ATTEMPT_DELAY);
+                } catch (InterruptedException e1) {
                     Thread.currentThread().interrupt();
-					throw e;
-				}
-			}			
-    	}
+                    throw e;
+                }
+            }
+        }
 
     }
 
@@ -457,17 +448,17 @@
     }
 
     public InetSocketAddress getLocalSocketAddress() {
-        if( channel==null ) {
+        if (channel == null) {
             return null;
         } else {
             return (InetSocketAddress)channel.socket().getLocalSocketAddress();
         }
     }
 
-	public String getRemoteAddress() {
-		if(targetAddress != null){
-			return "" + targetAddress;
-		}
-		return null;
-	}
+    public String getRemoteAddress() {
+        if (targetAddress != null) {
+            return "" + targetAddress;
+        }
+        return null;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Wed Aug  8 11:56:59 2007
@@ -41,178 +41,176 @@
  * 
  * @version $Revision$
  */
-public class VMTransport implements Transport,Task{
+public class VMTransport implements Transport, Task {
 
-    private static final Log log=LogFactory.getLog(VMTransport.class);
-    private static final AtomicLong nextId=new AtomicLong(0);
-    private static final TaskRunnerFactory taskRunnerFactory=new TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY,
-            true,1000);
+    private static final Log log = LogFactory.getLog(VMTransport.class);
+    private static final AtomicLong nextId = new AtomicLong(0);
+    private static final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000);
     protected VMTransport peer;
     protected TransportListener transportListener;
     protected boolean disposed;
     protected boolean marshal;
     protected boolean network;
-    protected boolean async=true;
-    protected int asyncQueueDepth=2000;
-    protected LinkedBlockingQueue messageQueue=null;
+    protected boolean async = true;
+    protected int asyncQueueDepth = 2000;
+    protected LinkedBlockingQueue messageQueue = null;
     protected boolean started;
     protected final URI location;
     protected final long id;
     private TaskRunner taskRunner;
-    private final Object mutex=new Object();
+    private final Object mutex = new Object();
 
-    public VMTransport(URI location){
-        this.location=location;
-        this.id=nextId.getAndIncrement();
+    public VMTransport(URI location) {
+        this.location = location;
+        this.id = nextId.getAndIncrement();
     }
 
-    public VMTransport getPeer(){
-        synchronized(mutex){
+    public VMTransport getPeer() {
+        synchronized (mutex) {
             return peer;
         }
     }
 
-    public void setPeer(VMTransport peer){
-        synchronized(mutex){
-            this.peer=peer;
+    public void setPeer(VMTransport peer) {
+        synchronized (mutex) {
+            this.peer = peer;
         }
     }
 
-    public void oneway(Object command) throws IOException{
-        if(disposed){
+    public void oneway(Object command) throws IOException {
+        if (disposed) {
             throw new TransportDisposedIOException("Transport disposed.");
         }
-        if(peer==null)
+        if (peer == null)
             throw new IOException("Peer not connected.");
 
-    	TransportListener tl=null;
-    	synchronized(peer.mutex) {
-    		if( peer.disposed ) {
-    			throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
-    		}
-        	if( peer.started ) {
-                if(peer.async){
+        TransportListener tl = null;
+        synchronized (peer.mutex) {
+            if (peer.disposed) {
+                throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
+            }
+            if (peer.started) {
+                if (peer.async) {
                     peer.enqueue(command);
-        		    peer.wakeup();
+                    peer.wakeup();
                 } else {
-                    tl = peer.transportListener;                   
+                    tl = peer.transportListener;
                 }
-        	} else {
-        		peer.enqueue(command);
-        	}
-    	}
-    	
-    	if( tl!=null ) {
-    		tl.onCommand(command);
-    	}
-        
-    }
-
-	private void enqueue(Object command) throws IOException {
-		try{
-			getMessageQueue().put(command);
-		}catch(final InterruptedException e){
-		    throw IOExceptionSupport.create(e);
-		}
-	}
+            } else {
+                peer.enqueue(command);
+            }
+        }
+
+        if (tl != null) {
+            tl.onCommand(command);
+        }
+
+    }
+
+    private void enqueue(Object command) throws IOException {
+        try {
+            getMessageQueue().put(command);
+        } catch (final InterruptedException e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
 
-    public FutureResponse asyncRequest(Object command,ResponseCallback responseCallback) throws IOException{
+    public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 
-    public Object request(Object command) throws IOException{
+    public Object request(Object command) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 
-    public Object request(Object command,int timeout) throws IOException{
+    public Object request(Object command, int timeout) throws IOException {
         throw new AssertionError("Unsupported Method");
     }
 
-    public TransportListener getTransportListener(){
-        synchronized(mutex){
+    public TransportListener getTransportListener() {
+        synchronized (mutex) {
             return transportListener;
         }
     }
 
-    public void setTransportListener(TransportListener commandListener){
-        synchronized(mutex){
-            this.transportListener=commandListener;
+    public void setTransportListener(TransportListener commandListener) {
+        synchronized (mutex) {
+            this.transportListener = commandListener;
             wakeup();
         }
     }
 
     private LinkedBlockingQueue getMessageQueue() {
-    	synchronized(mutex) {
-	        if( messageQueue==null ) {
-	            messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
-	        }
-	        return messageQueue;
-    	}
-    }
-    
-    
-    public void start() throws Exception{
-        if(transportListener==null)
+        synchronized (mutex) {
+            if (messageQueue == null) {
+                messageQueue = new LinkedBlockingQueue(this.asyncQueueDepth);
+            }
+            return messageQueue;
+        }
+    }
+
+    public void start() throws Exception {
+        if (transportListener == null)
             throw new IOException("TransportListener not set.");
-        
-        synchronized(mutex) {
-        	if( messageQueue!=null ) {
-	           Object command;
-	           while( (command = messageQueue.poll()) !=null ) {
-	        	   transportListener.onCommand(command);
-	           }
-        	}
-	        started = true;
+
+        synchronized (mutex) {
+            if (messageQueue != null) {
+                Object command;
+                while ((command = messageQueue.poll()) != null) {
+                    transportListener.onCommand(command);
+                }
+            }
+            started = true;
             wakeup();
         }
     }
 
-    public void stop() throws Exception{
-    	TaskRunner tr=null;
-    	synchronized(mutex) {
-            if(!disposed){
-    	        started=false;
-                disposed=true;
-                if(taskRunner!=null){
-                	tr = taskRunner;
-                    taskRunner=null;
+    public void stop() throws Exception {
+        TaskRunner tr = null;
+        synchronized (mutex) {
+            if (!disposed) {
+                started = false;
+                disposed = true;
+                if (taskRunner != null) {
+                    tr = taskRunner;
+                    taskRunner = null;
                 }
             }
         }
-    	if( tr !=null ) {
-    		tr.shutdown(1000);
-    	}
+        if (tr != null) {
+            tr.shutdown(1000);
+        }
     }
 
-    public Object narrow(Class target){
-        if(target.isAssignableFrom(getClass())){
+    public Object narrow(Class target) {
+        if (target.isAssignableFrom(getClass())) {
             return this;
         }
         return null;
     }
 
-    public boolean isMarshal(){
+    public boolean isMarshal() {
         return marshal;
     }
 
-    public void setMarshal(boolean marshal){
-        this.marshal=marshal;
+    public void setMarshal(boolean marshal) {
+        this.marshal = marshal;
     }
 
-    public boolean isNetwork(){
+    public boolean isNetwork() {
         return network;
     }
 
-    public void setNetwork(boolean network){
-        this.network=network;
+    public void setNetwork(boolean network) {
+        this.network = network;
     }
 
-    public String toString(){
-        return location+"#"+id;
+    public String toString() {
+        return location + "#" + id;
     }
 
-    public String getRemoteAddress(){
-        if(peer!=null){
+    public String getRemoteAddress() {
+        if (peer != null) {
             return peer.toString();
         }
         return null;
@@ -221,68 +219,68 @@
     /**
      * @see org.apache.activemq.thread.Task#iterate()
      */
-    public boolean iterate(){
+    public boolean iterate() {
         final TransportListener tl;
-        synchronized(mutex){
-        	tl = transportListener;
-        	if( !started || disposed || tl==null )
-        		return false;
+        synchronized (mutex) {
+            tl = transportListener;
+            if (!started || disposed || tl == null)
+                return false;
         }
-        
+
         LinkedBlockingQueue mq = getMessageQueue();
-        final Command command = (Command)mq.poll();                
-        if( command!=null ) {
+        final Command command = (Command)mq.poll();
+        if (command != null) {
             tl.onCommand(command);
             return !mq.isEmpty();
         } else {
-        	return false;
-    	}        
+            return false;
+        }
     }
 
     /**
      * @return the async
      */
-    public boolean isAsync(){
+    public boolean isAsync() {
         return async;
     }
 
     /**
      * @param async the async to set
      */
-    public void setAsync(boolean async){
-        this.async=async;
+    public void setAsync(boolean async) {
+        this.async = async;
     }
 
     /**
      * @return the asyncQueueDepth
      */
-    public int getAsyncQueueDepth(){
+    public int getAsyncQueueDepth() {
         return asyncQueueDepth;
     }
 
     /**
      * @param asyncQueueDepth the asyncQueueDepth to set
      */
-    public void setAsyncQueueDepth(int asyncQueueDepth){
-        this.asyncQueueDepth=asyncQueueDepth;
+    public void setAsyncQueueDepth(int asyncQueueDepth) {
+        this.asyncQueueDepth = asyncQueueDepth;
     }
 
-    protected void wakeup(){
-        if(async){
-            synchronized(mutex){
-                if(taskRunner==null){
-                    taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
+    protected void wakeup() {
+        if (async) {
+            synchronized (mutex) {
+                if (taskRunner == null) {
+                    taskRunner = taskRunnerFactory.createTaskRunner(this, "VMTransport: " + toString());
                 }
             }
-            try{
+            try {
                 taskRunner.wakeup();
-            }catch(InterruptedException e){
+            } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
             }
         }
     }
 
-    public boolean isFaultTolerant(){
+    public boolean isFaultTolerant() {
         return false;
     }
 }