You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/08/08 20:58:13 UTC
svn commit: r563982 [25/32] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/jm...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Wed Aug 8 11:56:59 2007
@@ -31,21 +31,22 @@
import org.apache.activemq.wireformat.WireFormat;
/**
- * Implements marshalling and unmarsalling the <a href="http://stomp.codehaus.org/">Stomp</a> protocol.
+ * Implements marshalling and unmarsalling the <a
+ * href="http://stomp.codehaus.org/">Stomp</a> protocol.
*/
public class StompWireFormat implements WireFormat {
- private static final byte[] NO_DATA = new byte[]{};
- private static final byte[] END_OF_FRAME = new byte[]{0,'\n'};
-
- private static final int MAX_COMMAND_LENGTH = 1024;
- private static final int MAX_HEADER_LENGTH = 1024*10;
- private static final int MAX_HEADERS = 1000;
- private static final int MAX_DATA_LENGTH = 1024*1024*100;
-
- private int version=1;
+ private static final byte[] NO_DATA = new byte[] {};
+ private static final byte[] END_OF_FRAME = new byte[] {0, '\n'};
- public ByteSequence marshal(Object command) throws IOException {
+ private static final int MAX_COMMAND_LENGTH = 1024;
+ private static final int MAX_HEADER_LENGTH = 1024 * 10;
+ private static final int MAX_HEADERS = 1000;
+ private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
+
+ private int version = 1;
+
+ public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
marshal(command, dos);
@@ -60,140 +61,137 @@
}
public void marshal(Object command, DataOutput os) throws IOException {
- StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame) command;
+ StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command;
+
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(stomp.getAction());
+ buffer.append(Stomp.NEWLINE);
+
+ // Output the headers.
+ for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
+ Map.Entry entry = (Map.Entry)iter.next();
+ buffer.append(entry.getKey());
+ buffer.append(Stomp.Headers.SEPERATOR);
+ buffer.append(entry.getValue());
+ buffer.append(Stomp.NEWLINE);
+ }
- StringBuffer buffer = new StringBuffer();
- buffer.append(stomp.getAction());
- buffer.append(Stomp.NEWLINE);
-
- // Output the headers.
- for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
- Map.Entry entry = (Map.Entry) iter.next();
- buffer.append(entry.getKey());
- buffer.append(Stomp.Headers.SEPERATOR);
- buffer.append(entry.getValue());
- buffer.append(Stomp.NEWLINE);
- }
-
- // Add a newline to seperate the headers from the content.
- buffer.append(Stomp.NEWLINE);
-
- os.write(buffer.toString().getBytes("UTF-8"));
- os.write(stomp.getContent());
- os.write(END_OF_FRAME);
- }
-
+ // Add a newline to seperate the headers from the content.
+ buffer.append(Stomp.NEWLINE);
+
+ os.write(buffer.toString().getBytes("UTF-8"));
+ os.write(stomp.getContent());
+ os.write(END_OF_FRAME);
+ }
public Object unmarshal(DataInput in) throws IOException {
-
+
try {
- String action = null;
-
- // skip white space to next real action line
- while (true) {
- action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
- if (action == null) {
- throw new IOException("connection was closed");
- } else {
- action = action.trim();
- if (action.length() > 0) {
- break;
- }
- }
- }
-
- // Parse the headers
- HashMap headers = new HashMap(25);
- while (true) {
- String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
- if (line != null && line.trim().length() > 0) {
-
- if( headers.size() > MAX_HEADERS )
- throw new ProtocolException("The maximum number of headers was exceeded", true);
-
- try {
- int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
- String name = line.substring(0, seperator_index).trim();
- String value = line.substring(seperator_index + 1, line.length()).trim();
- headers.put(name, value);
- }
- catch (Exception e) {
- throw new ProtocolException("Unable to parser header line [" + line + "]", true);
- }
- }
- else {
- break;
- }
- }
-
- // Read in the data part.
- byte[] data = NO_DATA;
- String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
- if (contentLength!=null) {
-
- // Bless the client, he's telling us how much data to read in.
- int length;
- try {
- length = Integer.parseInt(contentLength.trim());
- } catch (NumberFormatException e) {
- throw new ProtocolException("Specified content-length is not a valid integer", true);
- }
-
- if( length > MAX_DATA_LENGTH )
- throw new ProtocolException("The maximum data length was exceeded", true);
-
- data = new byte[length];
- in.readFully(data);
-
- if (in.readByte() != 0) {
- throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH+" bytes were read and " + "there was no trailing null byte", true);
- }
-
- } else {
-
- // We don't know how much to read.. data ends when we hit a 0
- byte b;
- ByteArrayOutputStream baos=null;
- while ((b = in.readByte()) != 0) {
-
- if( baos == null ) {
- baos = new ByteArrayOutputStream();
- } else if( baos.size() > MAX_DATA_LENGTH ) {
- throw new ProtocolException("The maximum data length was exceeded", true);
- }
-
- baos.write(b);
- }
-
- if( baos!=null ) {
- baos.close();
- data = baos.toByteArray();
- }
-
- }
-
- return new StompFrame(action, headers, data);
-
- } catch (ProtocolException e) {
- return new StompFrameError(e);
- }
+ String action = null;
+
+ // skip white space to next real action line
+ while (true) {
+ action = readLine(in, MAX_COMMAND_LENGTH, "The maximum command length was exceeded");
+ if (action == null) {
+ throw new IOException("connection was closed");
+ } else {
+ action = action.trim();
+ if (action.length() > 0) {
+ break;
+ }
+ }
+ }
+
+ // Parse the headers
+ HashMap headers = new HashMap(25);
+ while (true) {
+ String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+ if (line != null && line.trim().length() > 0) {
+
+ if (headers.size() > MAX_HEADERS)
+ throw new ProtocolException("The maximum number of headers was exceeded", true);
+
+ try {
+ int seperator_index = line.indexOf(Stomp.Headers.SEPERATOR);
+ String name = line.substring(0, seperator_index).trim();
+ String value = line.substring(seperator_index + 1, line.length()).trim();
+ headers.put(name, value);
+ } catch (Exception e) {
+ throw new ProtocolException("Unable to parser header line [" + line + "]", true);
+ }
+ } else {
+ break;
+ }
+ }
+
+ // Read in the data part.
+ byte[] data = NO_DATA;
+ String contentLength = (String)headers.get(Stomp.Headers.CONTENT_LENGTH);
+ if (contentLength != null) {
+
+ // Bless the client, he's telling us how much data to read in.
+ int length;
+ try {
+ length = Integer.parseInt(contentLength.trim());
+ } catch (NumberFormatException e) {
+ throw new ProtocolException("Specified content-length is not a valid integer", true);
+ }
+
+ if (length > MAX_DATA_LENGTH)
+ throw new ProtocolException("The maximum data length was exceeded", true);
+
+ data = new byte[length];
+ in.readFully(data);
+
+ if (in.readByte() != 0) {
+ throw new ProtocolException(Stomp.Headers.CONTENT_LENGTH + " bytes were read and " + "there was no trailing null byte", true);
+ }
+
+ } else {
+
+ // We don't know how much to read.. data ends when we hit a 0
+ byte b;
+ ByteArrayOutputStream baos = null;
+ while ((b = in.readByte()) != 0) {
+
+ if (baos == null) {
+ baos = new ByteArrayOutputStream();
+ } else if (baos.size() > MAX_DATA_LENGTH) {
+ throw new ProtocolException("The maximum data length was exceeded", true);
+ }
+
+ baos.write(b);
+ }
+
+ if (baos != null) {
+ baos.close();
+ data = baos.toByteArray();
+ }
+
+ }
+
+ return new StompFrame(action, headers, data);
+
+ } catch (ProtocolException e) {
+ return new StompFrameError(e);
+ }
}
private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException {
byte b;
- ByteArrayOutputStream baos=new ByteArrayOutputStream(maxLength);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
while ((b = in.readByte()) != '\n') {
- if( baos.size() > maxLength )
- throw new ProtocolException(errorMessage, true);
+ if (baos.size() > maxLength)
+ throw new ProtocolException(errorMessage, true);
baos.write(b);
}
baos.close();
ByteSequence sequence = baos.toByteSequence();
- return new String(sequence.getData(),sequence.getOffset(),sequence.getLength(),"UTF-8");
- }
+ return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8");
+ }
- public int getVersion() {
+ public int getVersion() {
return version;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java Wed Aug 8 11:56:59 2007
@@ -33,13 +33,13 @@
/**
* A Transport class that uses SSL and client-side certificate authentication.
- *
- * Client-side certificate authentication must be enabled through the constructor.
- * By default, this class will have the same client authentication behavior as the socket it is passed.
- * This class will set ConnectionInfo's transportContext to the SSL certificates of the client.
- * NOTE: Accessor method for needClientAuth was not provided on purpose. This is because needClientAuth's value must be
- * set before the socket is connected. Otherwise, unexpected situations may occur.
- *
+ * Client-side certificate authentication must be enabled through the
+ * constructor. By default, this class will have the same client authentication
+ * behavior as the socket it is passed. This class will set ConnectionInfo's
+ * transportContext to the SSL certificates of the client. NOTE: Accessor method
+ * for needClientAuth was not provided on purpose. This is because
+ * needClientAuth's value must be set before the socket is connected. Otherwise,
+ * unexpected situations may occur.
*/
public class SslTransport extends TcpTransport {
/**
@@ -47,11 +47,11 @@
*
* @param wireFormat The WireFormat to be used.
* @param socketFactory The socket factory to be used. Forcing SSLSockets
- * for obvious reasons.
+ * for obvious reasons.
* @param remoteLocation The remote location.
* @param localLocation The local location.
* @param needClientAuth If set to true, the underlying socket will need
- * client certificate authentication.
+ * client certificate authentication.
* @throws UnknownHostException If TcpTransport throws.
* @throws IOException If TcpTransport throws.
*/
@@ -61,12 +61,10 @@
((SSLSocket)this.socket).setNeedClientAuth(needClientAuth);
}
}
-
+
/**
- * Initialize from a ServerSocket.
- *
- * No access to needClientAuth is given since it is already set within the
- * provided socket.
+ * Initialize from a ServerSocket. No access to needClientAuth is given
+ * since it is already set within the provided socket.
*
* @param wireFormat The WireFormat to be used.
* @param socket The Socket to be used. Forcing SSL.
@@ -75,31 +73,31 @@
public SslTransport(WireFormat wireFormat, SSLSocket socket) throws IOException {
super(wireFormat, socket);
}
-
+
/**
- * Overriding in order to add the client's certificates to ConnectionInfo Commmands.
+ * Overriding in order to add the client's certificates to ConnectionInfo
+ * Commmands.
*
* @param command The Command coming in.
*/
public void doConsume(Command command) {
// The instanceof can be avoided, but that would require modifying the
- // Command clas tree and that would require too much effort right
- // now.
- if ( command instanceof ConnectionInfo ) {
+ // Command clas tree and that would require too much effort right
+ // now.
+ if (command instanceof ConnectionInfo) {
ConnectionInfo connectionInfo = (ConnectionInfo)command;
-
+
SSLSocket sslSocket = (SSLSocket)this.socket;
-
+
SSLSession sslSession = sslSocket.getSession();
-
+
X509Certificate[] clientCertChain;
try {
- clientCertChain =
- (X509Certificate[]) sslSession.getPeerCertificates();
- } catch(SSLPeerUnverifiedException e) {
+ clientCertChain = (X509Certificate[])sslSession.getPeerCertificates();
+ } catch (SSLPeerUnverifiedException e) {
clientCertChain = null;
}
-
+
connectionInfo.setTransportContext(clientCertChain);
}
@@ -110,8 +108,7 @@
* @return pretty print of 'this'
*/
public String toString() {
- return "ssl://"+socket.getInetAddress()+":"+socket.getPort();
+ return "ssl://" + socket.getInetAddress() + ":" + socket.getPort();
}
}
-
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java Wed Aug 8 11:56:59 2007
@@ -49,10 +49,10 @@
import javax.net.ssl.TrustManager;
/**
- * An implementation of the TcpTransportFactory using SSL.
- *
- * The major contribution from this class is that it is aware of SslTransportServer and SslTransport classes.
- * All Transports and TransportServers created from this factory will have their needClientAuth option set to false.
+ * An implementation of the TcpTransportFactory using SSL. The major
+ * contribution from this class is that it is aware of SslTransportServer and
+ * SslTransport classes. All Transports and TransportServers created from this
+ * factory will have their needClientAuth option set to false.
*
* @author sepandm@gmail.com (Sepand)
* @version $Revision: $
@@ -60,17 +60,16 @@
public class SslTransportFactory extends TcpTransportFactory {
// The context used to creat ssl sockets.
private SSLContext sslContext = null;
-
+
// The log this uses.,
private static final Log log = LogFactory.getLog(SslTransportFactory.class);
-
+
/**
* Constructor. Nothing special.
- *
*/
public SslTransportFactory() {
}
-
+
/**
* Overriding to use SslTransportServer and allow for proper reflection.
*/
@@ -79,31 +78,29 @@
Map options = new HashMap(URISupport.parseParamters(location));
ServerSocketFactory serverSocketFactory = createServerSocketFactory();
- SslTransportServer server =
- new SslTransportServer(this, location, (SSLServerSocketFactory)serverSocketFactory);
+ SslTransportServer server = new SslTransportServer(this, location, (SSLServerSocketFactory)serverSocketFactory);
server.setWireFormatFactory(createWireFormatFactory(options));
IntrospectionSupport.setProperties(server, options);
Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
server.setTransportOption(transportOptions);
server.bind();
-
+
return server;
- }
- catch (URISyntaxException e) {
+ } catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
-
+
/**
* Overriding to allow for proper configuration through reflection.
*/
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-
- SslTransport sslTransport = (SslTransport) transport.narrow(SslTransport.class);
+
+ SslTransport sslTransport = (SslTransport)transport.narrow(SslTransport.class);
IntrospectionSupport.setProperties(sslTransport, options);
-
+
Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
-
+
sslTransport.setSocketOptions(socketOptions);
if (sslTransport.isTrace()) {
@@ -116,14 +113,14 @@
if (format instanceof OpenWireFormat) {
transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, sslTransport.getMinmumWireFormatVersion());
}
-
+
return transport;
}
-
+
/**
* Overriding to use SslTransports.
*/
- protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{
+ protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
URI localLocation = null;
String path = location.getPath();
// see if the path is a local URI location
@@ -133,18 +130,16 @@
Integer.parseInt(path.substring((localPortIndex + 1), path.length()));
String localString = location.getScheme() + ":/" + path;
localLocation = new URI(localString);
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.warn("path isn't a valid local location for SslTransport to use", e);
}
}
SocketFactory socketFactory = createSocketFactory();
return new SslTransport(wf, (SSLSocketFactory)socketFactory, location, localLocation, false);
}
-
+
/**
* Sets the key and trust managers used in constructed socket factories.
- *
* Passes given arguments to SSLContext.init(...).
*
* @param km The sources of authentication keys or null.
@@ -161,36 +156,31 @@
}
sslContext.init(km, tm, random);
}
-
+
/**
- * Creates a new SSL ServerSocketFactory.
- *
- * The given factory will use user-provided key and trust managers (if the user provided them).
+ * Creates a new SSL ServerSocketFactory. The given factory will use
+ * user-provided key and trust managers (if the user provided them).
*
* @return Newly created (Ssl)ServerSocketFactory.
*/
protected ServerSocketFactory createServerSocketFactory() {
if (sslContext == null) {
return SSLServerSocketFactory.getDefault();
- }
- else
+ } else
return sslContext.getServerSocketFactory();
}
/**
- * Creates a new SSL SocketFactory.
- *
- * The given factory will use user-provided key and trust managers (if the user provided them).
+ * Creates a new SSL SocketFactory. The given factory will use user-provided
+ * key and trust managers (if the user provided them).
*
* @return Newly created (Ssl)SocketFactory.
*/
protected SocketFactory createSocketFactory() {
- if ( sslContext == null ) {
+ if (sslContext == null) {
return SSLSocketFactory.getDefault();
- }
- else
+ } else
return sslContext.getSocketFactory();
}
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java Wed Aug 8 11:56:59 2007
@@ -19,107 +19,108 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
+
/**
* An optimized buffered input stream for Tcp
*
* @version $Revision: 1.1.1.1 $
*/
-public class TcpBufferedInputStream extends FilterInputStream{
- private static final int DEFAULT_BUFFER_SIZE=8192;
+public class TcpBufferedInputStream extends FilterInputStream {
+ private static final int DEFAULT_BUFFER_SIZE = 8192;
protected byte internalBuffer[];
protected int count;
protected int position;
- public TcpBufferedInputStream(InputStream in){
- this(in,DEFAULT_BUFFER_SIZE);
+ public TcpBufferedInputStream(InputStream in) {
+ this(in, DEFAULT_BUFFER_SIZE);
}
- public TcpBufferedInputStream(InputStream in,int size){
+ public TcpBufferedInputStream(InputStream in, int size) {
super(in);
- if(size<=0){
+ if (size <= 0) {
throw new IllegalArgumentException("Buffer size <= 0");
}
- internalBuffer=new byte[size];
+ internalBuffer = new byte[size];
}
- private void fill() throws IOException{
- byte[] buffer=internalBuffer;
- count=position=0;
- int n=in.read(buffer,position,buffer.length-position);
- if(n>0)
- count=n+position;
+ private void fill() throws IOException {
+ byte[] buffer = internalBuffer;
+ count = position = 0;
+ int n = in.read(buffer, position, buffer.length - position);
+ if (n > 0)
+ count = n + position;
}
- public int read() throws IOException{
- if(position>=count){
+ public int read() throws IOException {
+ if (position >= count) {
fill();
- if(position>=count)
+ if (position >= count)
return -1;
}
- return internalBuffer[position++]&0xff;
+ return internalBuffer[position++] & 0xff;
}
- private int readStream(byte[] b,int off,int len) throws IOException{
- int avail=count-position;
- if(avail<=0){
- if(len>=internalBuffer.length){
- return in.read(b,off,len);
+ private int readStream(byte[] b, int off, int len) throws IOException {
+ int avail = count - position;
+ if (avail <= 0) {
+ if (len >= internalBuffer.length) {
+ return in.read(b, off, len);
}
fill();
- avail=count-position;
- if(avail<=0)
+ avail = count - position;
+ if (avail <= 0)
return -1;
}
- int cnt=(avail<len)?avail:len;
- System.arraycopy(internalBuffer,position,b,off,cnt);
- position+=cnt;
+ int cnt = (avail < len) ? avail : len;
+ System.arraycopy(internalBuffer, position, b, off, cnt);
+ position += cnt;
return cnt;
}
- public int read(byte b[],int off,int len) throws IOException{
- if((off|len|(off+len)|(b.length-(off+len)))<0){
+ public int read(byte b[], int off, int len) throws IOException {
+ if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
- }else if(len==0){
+ } else if (len == 0) {
return 0;
}
- int n=0;
- for(;;){
- int nread=readStream(b,off+n,len-n);
- if(nread<=0)
- return (n==0)?nread:n;
- n+=nread;
- if(n>=len)
+ int n = 0;
+ for (;;) {
+ int nread = readStream(b, off + n, len - n);
+ if (nread <= 0)
+ return (n == 0) ? nread : n;
+ n += nread;
+ if (n >= len)
return n;
// if not closed but no bytes available, return
- InputStream input=in;
- if(input!=null&&input.available()<=0)
+ InputStream input = in;
+ if (input != null && input.available() <= 0)
return n;
}
}
- public long skip(long n) throws IOException{
- if(n<=0){
+ public long skip(long n) throws IOException {
+ if (n <= 0) {
return 0;
}
- long avail=count-position;
- if(avail<=0){
+ long avail = count - position;
+ if (avail <= 0) {
return in.skip(n);
}
- long skipped=(avail<n)?avail:n;
- position+=skipped;
+ long skipped = (avail < n) ? avail : n;
+ position += skipped;
return skipped;
}
- public int available() throws IOException{
- return in.available()+(count-position);
+ public int available() throws IOException {
+ return in.available() + (count - position);
}
- public boolean markSupported(){
+ public boolean markSupported() {
return false;
}
- public void close() throws IOException{
- if(in!=null)
+ public void close() throws IOException {
+ if (in != null)
in.close();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedOutputStream.java Wed Aug 8 11:56:59 2007
@@ -24,7 +24,7 @@
/**
* An optimized buffered outputstream for Tcp
- *
+ *
* @version $Revision: 1.1.1.1 $
*/
@@ -37,7 +37,7 @@
/**
* Constructor
- *
+ *
* @param out
*/
public TcpBufferedOutputStream(OutputStream out) {
@@ -45,10 +45,10 @@
}
/**
- * Creates a new buffered output stream to write data to the specified underlying output stream with the specified
- * buffer size.
- *
- * @param out the underlying output stream.
+ * Creates a new buffered output stream to write data to the specified
+ * underlying output stream with the specified buffer size.
+ *
+ * @param out the underlying output stream.
* @param size the buffer size.
* @throws IllegalArgumentException if size <= 0.
*/
@@ -58,49 +58,47 @@
throw new IllegalArgumentException("Buffer size <= 0");
}
buffer = new byte[size];
- bufferlen=size;
+ bufferlen = size;
}
/**
* write a byte on to the stream
- *
+ *
* @param b - byte to write
* @throws IOException
*/
public void write(int b) throws IOException {
- if ((bufferlen-count) < 1) {
+ if ((bufferlen - count) < 1) {
flush();
}
- buffer[count++] = (byte) b;
+ buffer[count++] = (byte)b;
}
-
/**
* write a byte array to the stream
- *
- * @param b the byte buffer
+ *
+ * @param b the byte buffer
* @param off the offset into the buffer
* @param len the length of data to write
* @throws IOException
*/
public void write(byte b[], int off, int len) throws IOException {
- if ((bufferlen-count) < len) {
+ if ((bufferlen - count) < len) {
flush();
}
if (buffer.length >= len) {
System.arraycopy(b, off, buffer, count, len);
count += len;
- }
- else {
+ } else {
out.write(b, off, len);
}
}
/**
- * flush the data to the output stream
- * This doesn't call flush on the underlying outputstream, because
- * Tcp is particularly efficent at doing this itself ....
- *
+ * flush the data to the output stream This doesn't call flush on the
+ * underlying outputstream, because Tcp is particularly efficent at doing
+ * this itself ....
+ *
* @throws IOException
*/
public void flush() throws IOException {
@@ -112,7 +110,7 @@
/**
* close this stream
- *
+ *
* @throws IOException
*/
public void close() throws IOException {
@@ -120,10 +118,9 @@
closed = true;
}
-
/**
* Checks that the stream has not been closed
- *
+ *
* @throws IOException
*/
private final void checkClosed() throws IOException {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Aug 8 11:56:59 2007
@@ -45,7 +45,7 @@
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
- *
+ *
* @version $Revision$
*/
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
@@ -74,12 +74,11 @@
/**
* Connect to a remote Node - e.g. a Broker
- *
+ *
* @param wireFormat
* @param socketFactory
* @param remoteLocation
- * @param localLocation -
- * e.g. local InetAddress and local port
+ * @param localLocation - e.g. local InetAddress and local port
* @throws IOException
* @throws UnknownHostException
*/
@@ -88,8 +87,7 @@
this.socketFactory = socketFactory;
try {
this.socket = socketFactory.createSocket();
- }
- catch (SocketException e) {
+ } catch (SocketException e) {
this.socket = null;
}
this.remoteLocation = remoteLocation;
@@ -97,10 +95,9 @@
setDaemon(false);
}
-
/**
* Initialize from a server Socket
- *
+ *
* @param wireFormat
* @param socket
* @throws IOException
@@ -135,28 +132,25 @@
public void run() {
log.trace("TCP consumer thread starting");
try {
- while (!isStopped()) {
- doRun();
- }
+ while (!isStopped()) {
+ doRun();
+ }
} catch (IOException e) {
- stoppedLatch.get().countDown();
+ stoppedLatch.get().countDown();
onException(e);
} finally {
- stoppedLatch.get().countDown();
+ stoppedLatch.get().countDown();
}
}
-
- protected void doRun() throws IOException {
- try {
- Object command = readCommand();
- doConsume(command);
- }
- catch (SocketTimeoutException e) {
- }
- catch (InterruptedIOException e) {
- }
- }
+ protected void doRun() throws IOException {
+ try {
+ Object command = readCommand();
+ doConsume(command);
+ } catch (SocketTimeoutException e) {
+ } catch (InterruptedIOException e) {
+ }
+ }
protected Object readCommand() throws IOException {
return wireFormat.unmarshal(dataIn);
@@ -248,22 +242,21 @@
public void setTcpNoDelay(Boolean tcpNoDelay) {
this.tcpNoDelay = tcpNoDelay;
}
-
+
/**
* @return the ioBufferSize
*/
- public int getIoBufferSize(){
+ public int getIoBufferSize() {
return this.ioBufferSize;
}
/**
* @param ioBufferSize the ioBufferSize to set
*/
- public void setIoBufferSize(int ioBufferSize){
- this.ioBufferSize=ioBufferSize;
+ public void setIoBufferSize(int ioBufferSize) {
+ this.ioBufferSize = ioBufferSize;
}
-
// Implementation methods
// -------------------------------------------------------------------------
protected String resolveHostName(String host) throws UnknownHostException {
@@ -278,7 +271,7 @@
/**
* Configures the socket for use
- *
+ *
* @param sock
* @throws SocketException
*/
@@ -290,8 +283,7 @@
try {
sock.setReceiveBufferSize(socketBufferSize);
sock.setSendBufferSize(socketBufferSize);
- }
- catch (SocketException se) {
+ } catch (SocketException se) {
log.warn("Cannot set socket buffer size = " + socketBufferSize);
log.debug("Cannot set socket buffer size. Reason: " + se, se);
}
@@ -340,20 +332,17 @@
if (remoteAddress != null) {
if (connectionTimeout >= 0) {
socket.connect(remoteAddress, connectionTimeout);
- }
- else {
+ } else {
socket.connect(remoteAddress);
}
}
- }
- else {
+ } else {
// For SSL sockets.. you can't create an unconnected socket :(
// This means the timout option are not supported either.
if (localAddress != null) {
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort(), localAddress.getAddress(), localAddress.getPort());
- }
- else {
+ } else {
socket = socketFactory.createSocket(remoteAddress.getAddress(), remoteAddress.getPort());
}
}
@@ -362,7 +351,6 @@
initializeStreams();
}
-
protected void doStop(ServiceStopper stopper) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Stopping transport " + this);
@@ -375,18 +363,17 @@
socket.close();
}
}
-
-
+
/**
* Override so that stop() blocks until the run thread is no longer running.
*/
@Override
public void stop() throws Exception {
- super.stop();
- CountDownLatch countDownLatch = stoppedLatch.get();
- if( countDownLatch!=null ) {
- countDownLatch.await();
- }
+ super.stop();
+ CountDownLatch countDownLatch = stoppedLatch.get();
+ if (countDownLatch != null) {
+ countDownLatch.await();
+ }
}
protected void initializeStreams() throws Exception {
@@ -416,7 +403,4 @@
return null;
}
-
-
-
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java Wed Aug 8 11:56:59 2007
@@ -42,6 +42,7 @@
public class TcpTransportFactory extends TransportFactory {
private static final Log log = LogFactory.getLog(TcpTransportFactory.class);
+
public TransportServer doBind(String brokerId, final URI location) throws IOException {
try {
Map options = new HashMap(URISupport.parseParamters(location));
@@ -53,16 +54,16 @@
Map transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
server.setTransportOption(transportOptions);
server.bind();
-
+
return server;
- }
- catch (URISyntaxException e) {
+ } catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
/**
- * Allows subclasses of TcpTransportFactory to create custom instances of TcpTransportServer.
+ * Allows subclasses of TcpTransportFactory to create custom instances of
+ * TcpTransportServer.
*
* @param location
* @param serverSocketFactory
@@ -70,16 +71,16 @@
* @throws IOException
* @throws URISyntaxException
*/
- protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
- return new TcpTransportServer(this, location, serverSocketFactory);
- }
+ protected TcpTransportServer createTcpTransportServer(final URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
+ return new TcpTransportServer(this, location, serverSocketFactory);
+ }
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-
- TcpTransport tcpTransport = (TcpTransport) transport.narrow(TcpTransport.class);
+
+ TcpTransport tcpTransport = (TcpTransport)transport.narrow(TcpTransport.class);
IntrospectionSupport.setProperties(tcpTransport, options);
-
- Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
+
+ Map socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
tcpTransport.setSocketOptions(socketOptions);
if (tcpTransport.isTrace()) {
@@ -91,10 +92,10 @@
}
// Only need the WireFormatNegotiator if using openwire
- if( format instanceof OpenWireFormat ) {
- transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
+ if (format instanceof OpenWireFormat) {
+ transport = new WireFormatNegotiator(transport, (OpenWireFormat)format, tcpTransport.getMinmumWireFormatVersion());
}
-
+
return transport;
}
@@ -105,9 +106,9 @@
return true;
}
- protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{
- URI localLocation=null;
- String path=location.getPath();
+ protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
+ URI localLocation = null;
+ String path = location.getPath();
// see if the path is a local URI location
if (path != null && path.length() > 0) {
int localPortIndex = path.indexOf(':');
@@ -115,8 +116,7 @@
Integer.parseInt(path.substring((localPortIndex + 1), path.length()));
String localString = location.getScheme() + ":/" + path;
localLocation = new URI(localString);
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.warn("path isn't a valid local location for TcpTransport to use", e);
}
}
@@ -125,19 +125,20 @@
}
/**
- * Allows subclasses of TcpTransportFactory to provide a create custom TcpTransport intances.
+ * Allows subclasses of TcpTransportFactory to provide a create custom
+ * TcpTransport intances.
*
* @param location
* @param wf
* @param socketFactory
- * @param localLocation
+ * @param localLocation
* @return
* @throws UnknownHostException
* @throws IOException
*/
- protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
- return new TcpTransport(wf, socketFactory, location, localLocation);
- }
+ protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
+ return new TcpTransport(wf, socketFactory, location, localLocation);
+ }
protected ServerSocketFactory createServerSocketFactory() {
return ServerSocketFactory.getDefault();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java Wed Aug 8 11:56:59 2007
@@ -49,7 +49,7 @@
*/
public class TcpTransportServer extends TransportServerThreadSupport {
-
+
private static final Log log = LogFactory.getLog(TcpTransportServer.class);
protected ServerSocket serverSocket;
protected int backlog = 5000;
@@ -60,16 +60,16 @@
protected boolean trace;
protected Map transportOptions;
protected final ServerSocketFactory serverSocketFactory;
-
+
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
super(location);
- this.transportFactory=transportFactory;
- this.serverSocketFactory = serverSocketFactory;
+ this.transportFactory = transportFactory;
+ this.serverSocketFactory = serverSocketFactory;
}
public void bind() throws IOException {
- URI bind = getBindLocation();
-
+ URI bind = getBindLocation();
+
String host = bind.getHost();
host = (host == null || host.length() == 0) ? "localhost" : host;
InetAddress addr = InetAddress.getByName(host);
@@ -77,31 +77,29 @@
try {
if (host.trim().equals("localhost") || addr.equals(InetAddress.getLocalHost())) {
this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog);
- }
- else {
+ } else {
this.serverSocket = serverSocketFactory.createServerSocket(bind.getPort(), backlog, addr);
}
this.serverSocket.setSoTimeout(2000);
- }
- catch (IOException e) {
+ } catch (IOException e) {
throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
}
try {
- setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(),
- bind.getQuery(), bind.getFragment()));
- } catch (URISyntaxException e) {
+ setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(bind.getHost()), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
+ .getFragment()));
+ } catch (URISyntaxException e) {
- // it could be that the host name contains invalid characters such as _ on unix platforms
+ // it could be that the host name contains invalid characters such
+ // as _ on unix platforms
// so lets try use the IP address instead
try {
- setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
- bind.getQuery(), bind.getFragment()));
+ setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
} catch (URISyntaxException e2) {
throw IOExceptionSupport.create(e2);
}
}
}
-
+
/**
* @return Returns the wireFormatFactory.
*/
@@ -110,8 +108,7 @@
}
/**
- * @param wireFormatFactory
- * The wireFormatFactory to set.
+ * @param wireFormatFactory The wireFormatFactory to set.
*/
public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
this.wireFormatFactory = wireFormatFactory;
@@ -161,8 +158,7 @@
if (socket != null) {
if (isStopped() || getAcceptListener() == null) {
socket.close();
- }
- else {
+ } else {
HashMap options = new HashMap();
options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
@@ -174,13 +170,11 @@
getAcceptListener().onAccept(configuredTransport);
}
}
- }
- catch (SocketTimeoutException ste) {
+ } catch (SocketTimeoutException ste) {
// expect this to happen
- }
- catch (Exception e) {
+ } catch (Exception e) {
if (!isStopping()) {
- onAcceptError(e);
+ onAcceptError(e);
} else if (!isStopped()) {
log.warn("run()", e);
onAcceptError(e);
@@ -190,25 +184,26 @@
}
/**
- * Allow derived classes to override the Transport implementation that this transport server creates.
+ * Allow derived classes to override the Transport implementation that this
+ * transport server creates.
+ *
* @param socket
* @param format
* @return
* @throws IOException
*/
- protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
- return new TcpTransport(format, socket);
- }
+ protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
+ return new TcpTransport(format, socket);
+ }
/**
* @return pretty print of this
*/
public String toString() {
- return ""+getBindLocation();
+ return "" + getBindLocation();
}
/**
- *
* @param hostName
* @return real hostName
* @throws UnknownHostException
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java Wed Aug 8 11:56:59 2007
@@ -56,8 +56,8 @@
private int defaultMarshalBufferSize = 64 * 1024;
public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
- SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramChannel channel,
- ByteBufferPool bufferPool) {
+ SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
+ DatagramChannel channel, ByteBufferPool bufferPool) {
super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
this.channel = channel;
this.bufferPool = bufferPool;
@@ -97,7 +97,7 @@
// the ByteBuffer to avoid object allocation and unnecessary
// buffering?
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
- answer = (Command) wireFormat.unmarshal(dataIn);
+ answer = (Command)wireFormat.unmarshal(dataIn);
break;
}
}
@@ -151,15 +151,14 @@
// lets remove the header of the partial command
// which is the byte for the type and an int for the size of
// the byte[]
- chunkSize -= 1 // the data type
- + 4 // the command ID
- + 4; // the size of the partial data
+
+ // data type + the command ID + size of the partial data
+ chunkSize -= 1 + 4 + 4;
// the boolean flags
if (bs != null) {
chunkSize -= bs.marshalledSize();
- }
- else {
+ } else {
chunkSize -= 1;
}
@@ -176,8 +175,7 @@
if (lastFragment) {
writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE);
- }
- else {
+ } else {
writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
}
@@ -191,7 +189,7 @@
}
writeBuffer.putInt(commandId);
if (bs == null) {
- writeBuffer.put((byte) 1);
+ writeBuffer.put((byte)1);
}
// size of byte array
@@ -203,8 +201,7 @@
offset += chunkSize;
sendWriteBuffer(commandId, address, writeBuffer, false);
}
- }
- else {
+ } else {
writeBuffer.put(data);
sendWriteBuffer(command.getCommandId(), address, writeBuffer, false);
}
@@ -227,15 +224,15 @@
// Implementation methods
// -------------------------------------------------------------------------
- protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery)
- throws IOException {
+ protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer,
+ boolean redelivery) throws IOException {
// lets put the datagram into the replay buffer first to prevent timing
// issues
ReplayBuffer bufferCache = getReplayBuffer();
if (bufferCache != null && !redelivery) {
bufferCache.addBuffer(commandId, writeBuffer);
}
-
+
writeBuffer.flip();
if (log.isDebugEnabled()) {
@@ -247,10 +244,9 @@
public void sendBuffer(int commandId, Object buffer) throws IOException {
if (buffer != null) {
- ByteBuffer writeBuffer = (ByteBuffer) buffer;
+ ByteBuffer writeBuffer = (ByteBuffer)buffer;
sendWriteBuffer(commandId, getReplayAddress(), writeBuffer, true);
- }
- else {
+ } else {
if (log.isWarnEnabled()) {
log.warn("Request for buffer: " + commandId + " is no longer present");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java Wed Aug 8 11:56:59 2007
@@ -48,8 +48,8 @@
private Object readLock = new Object();
private Object writeLock = new Object();
- public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
- SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel) {
+ public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
+ DatagramSocket channel) {
super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
this.channel = channel;
}
@@ -73,7 +73,7 @@
DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData()));
from = headerMarshaller.createEndpoint(datagram, dataIn);
- answer = (Command) wireFormat.unmarshal(dataIn);
+ answer = (Command)wireFormat.unmarshal(dataIn);
break;
}
}
@@ -100,8 +100,7 @@
if (remaining(writeBuffer) >= 0) {
sendWriteBuffer(address, writeBuffer, command.getCommandId());
- }
- else {
+ } else {
// lets split the command up into chunks
byte[] data = writeBuffer.toByteArray();
boolean lastFragment = false;
@@ -125,15 +124,14 @@
// lets remove the header of the partial command
// which is the byte for the type and an int for the size of
// the byte[]
- chunkSize -= 1 // the data type
- + 4 // the command ID
- + 4; // the size of the partial data
+
+ // data type + the command ID + size of the partial data
+ chunkSize -= 1 + 4 + 4;
// the boolean flags
if (bs != null) {
chunkSize -= bs.marshalledSize();
- }
- else {
+ } else {
chunkSize -= 1;
}
@@ -150,8 +148,7 @@
if (lastFragment) {
dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE);
- }
- else {
+ } else {
dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
}
@@ -165,7 +162,7 @@
}
dataOut.writeInt(commandId);
if (bs == null) {
- dataOut.write((byte) 1);
+ dataOut.write((byte)1);
}
// size of byte array
@@ -191,14 +188,12 @@
// Implementation methods
// -------------------------------------------------------------------------
- protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId)
- throws IOException {
+ protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
byte[] data = writeBuffer.toByteArray();
sendWriteBuffer(commandId, address, data, false);
}
- protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery)
- throws IOException {
+ protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery) throws IOException {
// lets put the datagram into the replay buffer first to prevent timing
// issues
ReplayBuffer bufferCache = getReplayBuffer();
@@ -216,10 +211,9 @@
public void sendBuffer(int commandId, Object buffer) throws IOException {
if (buffer != null) {
- byte[] data = (byte[]) buffer;
+ byte[] data = (byte[])buffer;
sendWriteBuffer(commandId, replayAddress, data, true);
- }
- else {
+ } else {
if (log.isWarnEnabled()) {
log.warn("Request for buffer: " + commandId + " is no longer present");
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Wed Aug 8 11:56:59 2007
@@ -52,8 +52,8 @@
public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
private static final Log log = LogFactory.getLog(UdpTransport.class);
- private static final int MAX_BIND_ATTEMPTS = 50;
- private static final long BIND_ATTEMPT_DELAY = 100;
+ private static final int MAX_BIND_ATTEMPTS = 50;
+ private static final long BIND_ATTEMPT_DELAY = 100;
private CommandChannel commandChannel;
private OpenWireFormat wireFormat;
@@ -98,12 +98,11 @@
this.description = getProtocolName() + "Server@";
}
-
/**
* Creates a replayer for working with the reliable transport
*/
public Replayer createReplayer() throws IOException {
- if (replayEnabled ) {
+ if (replayEnabled) {
return getCommandChannel();
}
return null;
@@ -124,7 +123,7 @@
log.debug("Sending oneway from: " + this + " to target: " + targetAddress + " command: " + command);
}
checkStarted();
- commandChannel.write((Command) command, address);
+ commandChannel.write((Command)command, address);
}
/**
@@ -133,8 +132,7 @@
public String toString() {
if (description != null) {
return description + port;
- }
- else {
+ } else {
return getProtocolUriScheme() + targetAddress + "@" + port;
}
}
@@ -148,47 +146,38 @@
try {
Command command = commandChannel.read();
doConsume(command);
- }
- catch (AsynchronousCloseException e) {
+ } catch (AsynchronousCloseException e) {
// DatagramChannel closed
try {
stop();
- }
- catch (Exception e2) {
+ } catch (Exception e2) {
log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
}
- }
- catch (SocketException e) {
+ } catch (SocketException e) {
// DatagramSocket closed
log.debug("Socket closed: " + e, e);
try {
stop();
- }
- catch (Exception e2) {
+ } catch (Exception e2) {
log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
}
- }
- catch (EOFException e) {
+ } catch (EOFException e) {
// DataInputStream closed
log.debug("Socket closed: " + e, e);
try {
stop();
- }
- catch (Exception e2) {
+ } catch (Exception e2) {
log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
}
- }
- catch (Exception e) {
+ } catch (Exception e) {
try {
stop();
- }
- catch (Exception e2) {
+ } catch (Exception e2) {
log.warn("Caught in: " + this + " while closing: " + e2 + ". Now Closed", e2);
}
if (e instanceof IOException) {
- onException((IOException) e);
- }
- else {
+ onException((IOException)e);
+ } else {
log.error("Caught: " + e, e);
e.printStackTrace();
}
@@ -204,7 +193,7 @@
*/
public void setTargetEndpoint(Endpoint newTarget) {
if (newTarget instanceof DatagramEndpoint) {
- DatagramEndpoint endpoint = (DatagramEndpoint) newTarget;
+ DatagramEndpoint endpoint = (DatagramEndpoint)newTarget;
SocketAddress address = endpoint.getAddress();
if (address != null) {
if (originalTargetAddress == null) {
@@ -305,14 +294,15 @@
public void setSequenceGenerator(IntSequenceGenerator sequenceGenerator) {
this.sequenceGenerator = sequenceGenerator;
}
-
+
public boolean isReplayEnabled() {
return replayEnabled;
}
/**
- * Sets whether or not replay should be enabled when using the reliable transport.
- * i.e. should we maintain a buffer of messages that can be replayed?
+ * Sets whether or not replay should be enabled when using the reliable
+ * transport. i.e. should we maintain a buffer of messages that can be
+ * replayed?
*/
public void setReplayEnabled(boolean replayEnabled) {
this.replayEnabled = replayEnabled;
@@ -328,7 +318,7 @@
public void setBufferPool(ByteBufferPool bufferPool) {
this.bufferPool = bufferPool;
}
-
+
public ReplayBuffer getReplayBuffer() {
return replayBuffer;
}
@@ -338,7 +328,6 @@
getCommandChannel().setReplayBuffer(replayBuffer);
}
-
// Implementation methods
// -------------------------------------------------------------------------
@@ -391,26 +380,28 @@
if (log.isDebugEnabled()) {
log.debug("Binding to address: " + localAddress);
}
-
+
//
- // We have noticed that on some platfoms like linux, after you close down
- // a previously bound socket, it can take a little while before we can bind it again.
+ // We have noticed that on some platfoms like linux, after you close
+ // down
+ // a previously bound socket, it can take a little while before we can
+ // bind it again.
//
- for(int i=0; i < MAX_BIND_ATTEMPTS; i++){
- try {
- socket.bind(localAddress);
- return;
- } catch (BindException e) {
- if ( i+1 == MAX_BIND_ATTEMPTS )
- throw e;
- try {
- Thread.sleep(BIND_ATTEMPT_DELAY);
- } catch (InterruptedException e1) {
+ for (int i = 0; i < MAX_BIND_ATTEMPTS; i++) {
+ try {
+ socket.bind(localAddress);
+ return;
+ } catch (BindException e) {
+ if (i + 1 == MAX_BIND_ATTEMPTS)
+ throw e;
+ try {
+ Thread.sleep(BIND_ATTEMPT_DELAY);
+ } catch (InterruptedException e1) {
Thread.currentThread().interrupt();
- throw e;
- }
- }
- }
+ throw e;
+ }
+ }
+ }
}
@@ -457,17 +448,17 @@
}
public InetSocketAddress getLocalSocketAddress() {
- if( channel==null ) {
+ if (channel == null) {
return null;
} else {
return (InetSocketAddress)channel.socket().getLocalSocketAddress();
}
}
- public String getRemoteAddress() {
- if(targetAddress != null){
- return "" + targetAddress;
- }
- return null;
- }
+ public String getRemoteAddress() {
+ if (targetAddress != null) {
+ return "" + targetAddress;
+ }
+ return null;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java Wed Aug 8 11:56:59 2007
@@ -41,178 +41,176 @@
*
* @version $Revision$
*/
-public class VMTransport implements Transport,Task{
+public class VMTransport implements Transport, Task {
- private static final Log log=LogFactory.getLog(VMTransport.class);
- private static final AtomicLong nextId=new AtomicLong(0);
- private static final TaskRunnerFactory taskRunnerFactory=new TaskRunnerFactory("VMTransport",Thread.NORM_PRIORITY,
- true,1000);
+ private static final Log log = LogFactory.getLog(VMTransport.class);
+ private static final AtomicLong nextId = new AtomicLong(0);
+ private static final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory("VMTransport", Thread.NORM_PRIORITY, true, 1000);
protected VMTransport peer;
protected TransportListener transportListener;
protected boolean disposed;
protected boolean marshal;
protected boolean network;
- protected boolean async=true;
- protected int asyncQueueDepth=2000;
- protected LinkedBlockingQueue messageQueue=null;
+ protected boolean async = true;
+ protected int asyncQueueDepth = 2000;
+ protected LinkedBlockingQueue messageQueue = null;
protected boolean started;
protected final URI location;
protected final long id;
private TaskRunner taskRunner;
- private final Object mutex=new Object();
+ private final Object mutex = new Object();
- public VMTransport(URI location){
- this.location=location;
- this.id=nextId.getAndIncrement();
+ public VMTransport(URI location) {
+ this.location = location;
+ this.id = nextId.getAndIncrement();
}
- public VMTransport getPeer(){
- synchronized(mutex){
+ public VMTransport getPeer() {
+ synchronized (mutex) {
return peer;
}
}
- public void setPeer(VMTransport peer){
- synchronized(mutex){
- this.peer=peer;
+ public void setPeer(VMTransport peer) {
+ synchronized (mutex) {
+ this.peer = peer;
}
}
- public void oneway(Object command) throws IOException{
- if(disposed){
+ public void oneway(Object command) throws IOException {
+ if (disposed) {
throw new TransportDisposedIOException("Transport disposed.");
}
- if(peer==null)
+ if (peer == null)
throw new IOException("Peer not connected.");
- TransportListener tl=null;
- synchronized(peer.mutex) {
- if( peer.disposed ) {
- throw new TransportDisposedIOException("Peer ("+peer.toString()+") disposed.");
- }
- if( peer.started ) {
- if(peer.async){
+ TransportListener tl = null;
+ synchronized (peer.mutex) {
+ if (peer.disposed) {
+ throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
+ }
+ if (peer.started) {
+ if (peer.async) {
peer.enqueue(command);
- peer.wakeup();
+ peer.wakeup();
} else {
- tl = peer.transportListener;
+ tl = peer.transportListener;
}
- } else {
- peer.enqueue(command);
- }
- }
-
- if( tl!=null ) {
- tl.onCommand(command);
- }
-
- }
-
- private void enqueue(Object command) throws IOException {
- try{
- getMessageQueue().put(command);
- }catch(final InterruptedException e){
- throw IOExceptionSupport.create(e);
- }
- }
+ } else {
+ peer.enqueue(command);
+ }
+ }
+
+ if (tl != null) {
+ tl.onCommand(command);
+ }
+
+ }
+
+ private void enqueue(Object command) throws IOException {
+ try {
+ getMessageQueue().put(command);
+ } catch (final InterruptedException e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
- public FutureResponse asyncRequest(Object command,ResponseCallback responseCallback) throws IOException{
+ public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");
}
- public Object request(Object command) throws IOException{
+ public Object request(Object command) throws IOException {
throw new AssertionError("Unsupported Method");
}
- public Object request(Object command,int timeout) throws IOException{
+ public Object request(Object command, int timeout) throws IOException {
throw new AssertionError("Unsupported Method");
}
- public TransportListener getTransportListener(){
- synchronized(mutex){
+ public TransportListener getTransportListener() {
+ synchronized (mutex) {
return transportListener;
}
}
- public void setTransportListener(TransportListener commandListener){
- synchronized(mutex){
- this.transportListener=commandListener;
+ public void setTransportListener(TransportListener commandListener) {
+ synchronized (mutex) {
+ this.transportListener = commandListener;
wakeup();
}
}
private LinkedBlockingQueue getMessageQueue() {
- synchronized(mutex) {
- if( messageQueue==null ) {
- messageQueue=new LinkedBlockingQueue(this.asyncQueueDepth);
- }
- return messageQueue;
- }
- }
-
-
- public void start() throws Exception{
- if(transportListener==null)
+ synchronized (mutex) {
+ if (messageQueue == null) {
+ messageQueue = new LinkedBlockingQueue(this.asyncQueueDepth);
+ }
+ return messageQueue;
+ }
+ }
+
+ public void start() throws Exception {
+ if (transportListener == null)
throw new IOException("TransportListener not set.");
-
- synchronized(mutex) {
- if( messageQueue!=null ) {
- Object command;
- while( (command = messageQueue.poll()) !=null ) {
- transportListener.onCommand(command);
- }
- }
- started = true;
+
+ synchronized (mutex) {
+ if (messageQueue != null) {
+ Object command;
+ while ((command = messageQueue.poll()) != null) {
+ transportListener.onCommand(command);
+ }
+ }
+ started = true;
wakeup();
}
}
- public void stop() throws Exception{
- TaskRunner tr=null;
- synchronized(mutex) {
- if(!disposed){
- started=false;
- disposed=true;
- if(taskRunner!=null){
- tr = taskRunner;
- taskRunner=null;
+ public void stop() throws Exception {
+ TaskRunner tr = null;
+ synchronized (mutex) {
+ if (!disposed) {
+ started = false;
+ disposed = true;
+ if (taskRunner != null) {
+ tr = taskRunner;
+ taskRunner = null;
}
}
}
- if( tr !=null ) {
- tr.shutdown(1000);
- }
+ if (tr != null) {
+ tr.shutdown(1000);
+ }
}
- public Object narrow(Class target){
- if(target.isAssignableFrom(getClass())){
+ public Object narrow(Class target) {
+ if (target.isAssignableFrom(getClass())) {
return this;
}
return null;
}
- public boolean isMarshal(){
+ public boolean isMarshal() {
return marshal;
}
- public void setMarshal(boolean marshal){
- this.marshal=marshal;
+ public void setMarshal(boolean marshal) {
+ this.marshal = marshal;
}
- public boolean isNetwork(){
+ public boolean isNetwork() {
return network;
}
- public void setNetwork(boolean network){
- this.network=network;
+ public void setNetwork(boolean network) {
+ this.network = network;
}
- public String toString(){
- return location+"#"+id;
+ public String toString() {
+ return location + "#" + id;
}
- public String getRemoteAddress(){
- if(peer!=null){
+ public String getRemoteAddress() {
+ if (peer != null) {
return peer.toString();
}
return null;
@@ -221,68 +219,68 @@
/**
* @see org.apache.activemq.thread.Task#iterate()
*/
- public boolean iterate(){
+ public boolean iterate() {
final TransportListener tl;
- synchronized(mutex){
- tl = transportListener;
- if( !started || disposed || tl==null )
- return false;
+ synchronized (mutex) {
+ tl = transportListener;
+ if (!started || disposed || tl == null)
+ return false;
}
-
+
LinkedBlockingQueue mq = getMessageQueue();
- final Command command = (Command)mq.poll();
- if( command!=null ) {
+ final Command command = (Command)mq.poll();
+ if (command != null) {
tl.onCommand(command);
return !mq.isEmpty();
} else {
- return false;
- }
+ return false;
+ }
}
/**
* @return the async
*/
- public boolean isAsync(){
+ public boolean isAsync() {
return async;
}
/**
* @param async the async to set
*/
- public void setAsync(boolean async){
- this.async=async;
+ public void setAsync(boolean async) {
+ this.async = async;
}
/**
* @return the asyncQueueDepth
*/
- public int getAsyncQueueDepth(){
+ public int getAsyncQueueDepth() {
return asyncQueueDepth;
}
/**
* @param asyncQueueDepth the asyncQueueDepth to set
*/
- public void setAsyncQueueDepth(int asyncQueueDepth){
- this.asyncQueueDepth=asyncQueueDepth;
+ public void setAsyncQueueDepth(int asyncQueueDepth) {
+ this.asyncQueueDepth = asyncQueueDepth;
}
- protected void wakeup(){
- if(async){
- synchronized(mutex){
- if(taskRunner==null){
- taskRunner=taskRunnerFactory.createTaskRunner(this,"VMTransport: "+toString());
+ protected void wakeup() {
+ if (async) {
+ synchronized (mutex) {
+ if (taskRunner == null) {
+ taskRunner = taskRunnerFactory.createTaskRunner(this, "VMTransport: " + toString());
}
}
- try{
+ try {
taskRunner.wakeup();
- }catch(InterruptedException e){
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- public boolean isFaultTolerant(){
+ public boolean isFaultTolerant() {
return false;
}
}