You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/08 16:13:17 UTC
svn commit: r1133409 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/...
Author: chirino
Date: Wed Jun 8 14:13:16 2011
New Revision: 1133409
URL: http://svn.apache.org/viewvc?rev=1133409&view=rev
Log:
Support getting stats about how big the last io operation was on a connection. This should help figure out how efficiently we are doing socket level system calls.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java
activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jun 8 14:13:16 2011
@@ -146,6 +146,8 @@ class BrokerConnection(var connector: Co
if( wf!=null ) {
result.write_counter = wf.getWriteCounter
result.read_counter = wf.getReadCounter
+ result.last_read_size = wf.getLastReadSize
+ result.last_write_size = wf.getLastWriteSize
}
result
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Wed Jun 8 14:13:16 2011
@@ -133,6 +133,9 @@ class AnyProtocolCodec(val protocols: Ar
def protocol = "any"
+ def getLastWriteSize = 0
+
+ def getLastReadSize = 0
}
/**
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java Wed Jun 8 14:13:16 2011
@@ -36,6 +36,18 @@ public class ConnectionStatusDTO extends
/**
* The number of bytes that have been read from the connection.
*/
+ @XmlAttribute(name="last_read_size")
+ public long last_read_size;
+
+ /**
+ * The number of bytes that have been written to the connection.
+ */
+ @XmlAttribute(name="last_write_size")
+ public long last_write_size;
+
+ /**
+ * The number of bytes that have been read from the connection.
+ */
@XmlAttribute(name="read_counter")
public long read_counter;
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Wed Jun 8 14:13:16 2011
@@ -169,6 +169,7 @@ class StompCodec extends ProtocolCodec {
var write_buffer = ByteBuffer.allocate(0)
var write_direct:ZeroCopyBuffer = null
var write_direct_pos = 0
+ var last_write_io_size = 0
def full = next_write_direct!=null || next_write_buffer.size >= (write_buffer_size >> 1)
def is_empty = write_buffer.remaining == 0 && write_direct==null
@@ -176,12 +177,13 @@ class StompCodec extends ProtocolCodec {
def setWritableByteChannel(channel: WritableByteChannel) = {
this.write_channel = channel
if( this.write_channel.isInstanceOf[SocketChannel] ) {
- this.write_channel.asInstanceOf[SocketChannel].socket().setSendBufferSize(write_buffer_size);
+ write_buffer_size = this.write_channel.asInstanceOf[SocketChannel].socket().getSendBufferSize
}
}
def getWriteCounter = write_counter
+ def getLastWriteSize = last_write_io_size
def write(command: Any):ProtocolCodec.BufferState = {
if ( full) {
@@ -253,7 +255,8 @@ class StompCodec extends ProtocolCodec {
// if we have a pending write that is being sent over the socket...
if ( write_buffer.remaining() != 0 ) {
- write_counter += write_channel.write(write_buffer)
+ last_write_io_size = write_channel.write(write_buffer)
+ write_counter += last_write_io_size
}
if ( write_buffer.remaining() == 0 && write_direct!=null ) {
val count = write_direct.read(write_direct_pos, write_channel)
@@ -305,6 +308,8 @@ class StompCodec extends ProtocolCodec {
var read_end = 0
var read_start = 0
+ var last_read_io_size = 0
+
var read_direct:ZeroCopyBuffer = null
var read_direct_pos = 0
@@ -326,6 +331,8 @@ class StompCodec extends ProtocolCodec {
def getReadCounter = read_counter
+ def getLastReadSize = last_read_io_size
+
override def read():Object = {
var command:Object = null
@@ -371,13 +378,13 @@ class StompCodec extends ProtocolCodec {
// Try to fill the buffer with data from the socket..
var p = read_buffer.position()
- var count = read_channel.read(read_buffer)
- if (count == -1) {
+ last_read_io_size = read_channel.read(read_buffer)
+ if (last_read_io_size == -1) {
throw new EOFException("Peer disconnected")
- } else if (count == 0) {
+ } else if (last_read_io_size == 0) {
return null
}
- read_counter += count
+ read_counter += last_read_io_size
}
command = next_action(read_buffer)
Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java Wed Jun 8 14:13:16 2011
@@ -47,8 +47,6 @@ public class TcpTransport extends JavaBa
private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
- protected Map<String, Object> socketOptions;
-
abstract static class SocketState {
void onStop(Runnable onCompleted) {
}
@@ -186,6 +184,7 @@ public class TcpTransport extends JavaBa
int max_read_rate;
int max_write_rate;
int receive_buffer_size = 1024*64;
+ int send_buffer_size = 1024*64;
public static final int IPTOS_LOWCOST = 0x02;
@@ -357,6 +356,12 @@ public class TcpTransport extends JavaBa
try {
socket.setReceiveBufferSize(receive_buffer_size);
} catch (SocketException e) {
+ e.printStackTrace();
+ }
+ try {
+ socket.setSendBufferSize(send_buffer_size);
+ } catch (SocketException e) {
+ e.printStackTrace();
}
}
@@ -708,10 +713,6 @@ public class TcpTransport extends JavaBa
return false;
}
- public void setSocketOptions(Map<String, Object> socketOptions) {
- this.socketOptions = socketOptions;
- }
-
public boolean isUseLocalHost() {
return useLocalHost;
}
@@ -788,4 +789,12 @@ public class TcpTransport extends JavaBa
public void setReceive_buffer_size(int receive_buffer_size) {
this.receive_buffer_size = receive_buffer_size;
}
+
+ public int getSend_buffer_size() {
+ return send_buffer_size;
+ }
+
+ public void setSend_buffer_size(int send_buffer_size) {
+ this.send_buffer_size = send_buffer_size;
+ }
}
Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java Wed Jun 8 14:13:16 2011
@@ -63,14 +63,12 @@ public class TcpTransportFactory impleme
Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
URI localLocation = getLocalLocation(uri);
+ configure(transport, options);
+ verify(transport, options);
transport.connecting(uri, localLocation);
- Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
- transport.setSocketOptions(socketOptions);
-
- configure(transport, options);
- return verify(transport, options);
+ return transport;
}
/**
Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java Wed Jun 8 14:13:16 2011
@@ -31,6 +31,7 @@ import java.net.*;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -192,7 +193,7 @@ public class TcpTransportServer implemen
protected final void handleSocket(SocketChannel socket) throws Exception {
TcpTransport transport = createTransport();
if (transportOptions != null) {
- IntrospectionSupport.setProperties(transport, transportOptions);
+ IntrospectionSupport.setProperties(transport, new HashMap<String,String>(transportOptions) );
}
transport.connected(socket);
listener.onAccept(transport);
Modified: activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java (original)
+++ activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java Wed Jun 8 14:13:16 2011
@@ -67,6 +67,11 @@ public interface ProtocolCodec {
*/
public long getReadCounter();
+ /**
+ * @return The number of bytes read in the last read io performed.
+ */
+ public int getLastReadSize();
+
///////////////////////////////////////////////////////////////////
//
@@ -110,5 +115,10 @@ public interface ProtocolCodec {
*/
public long getWriteCounter();
+ /**
+ * @return The number of bytes read in the last write io performed.
+ */
+ public int getLastWriteSize();
+
}