You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/08 16:13:17 UTC

svn commit: r1133409 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/...

Author: chirino
Date: Wed Jun  8 14:13:16 2011
New Revision: 1133409

URL: http://svn.apache.org/viewvc?rev=1133409&view=rev
Log:
Support getting stats about how big the last io operation was on a connection.  This should help figure out how efficiently we are doing socket level system calls.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java
    activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
    activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jun  8 14:13:16 2011
@@ -146,6 +146,8 @@ class BrokerConnection(var connector: Co
     if( wf!=null ) {
       result.write_counter = wf.getWriteCounter
       result.read_counter = wf.getReadCounter
+      result.last_read_size = wf.getLastReadSize
+      result.last_write_size = wf.getLastWriteSize
     }
     result
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/AnyProtocol.scala Wed Jun  8 14:13:16 2011
@@ -133,6 +133,9 @@ class AnyProtocolCodec(val protocols: Ar
 
   def protocol = "any"
 
+  def getLastWriteSize = 0
+
+  def getLastReadSize = 0
 }
 
 /**

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ConnectionStatusDTO.java Wed Jun  8 14:13:16 2011
@@ -36,6 +36,18 @@ public class ConnectionStatusDTO extends
     /**
      * The number of bytes that have been read from the connection.
      */
+	@XmlAttribute(name="last_read_size")
+	public long last_read_size;
+
+    /**
+     * The number of bytes that have been written to the connection.
+     */
+	@XmlAttribute(name="last_write_size")
+	public long last_write_size;
+
+    /**
+     * The number of bytes that have been read from the connection.
+     */
 	@XmlAttribute(name="read_counter")
 	public long read_counter;
 

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompCodec.scala Wed Jun  8 14:13:16 2011
@@ -169,6 +169,7 @@ class StompCodec extends ProtocolCodec {
   var write_buffer = ByteBuffer.allocate(0)
   var write_direct:ZeroCopyBuffer = null
   var write_direct_pos = 0
+  var last_write_io_size = 0
 
   def full = next_write_direct!=null || next_write_buffer.size >= (write_buffer_size >> 1)
   def is_empty = write_buffer.remaining == 0 && write_direct==null
@@ -176,12 +177,13 @@ class StompCodec extends ProtocolCodec {
   def setWritableByteChannel(channel: WritableByteChannel) = {
     this.write_channel = channel
     if( this.write_channel.isInstanceOf[SocketChannel] ) {
-      this.write_channel.asInstanceOf[SocketChannel].socket().setSendBufferSize(write_buffer_size);
+      write_buffer_size = this.write_channel.asInstanceOf[SocketChannel].socket().getSendBufferSize
     }
   }
 
   def getWriteCounter = write_counter
 
+  def getLastWriteSize = last_write_io_size
 
   def write(command: Any):ProtocolCodec.BufferState =  {
     if ( full) {
@@ -253,7 +255,8 @@ class StompCodec extends ProtocolCodec {
 
     // if we have a pending write that is being sent over the socket...
     if ( write_buffer.remaining() != 0 ) {
-      write_counter += write_channel.write(write_buffer)
+      last_write_io_size = write_channel.write(write_buffer)
+      write_counter += last_write_io_size
     }
     if ( write_buffer.remaining() == 0 && write_direct!=null ) {
       val count = write_direct.read(write_direct_pos, write_channel)
@@ -305,6 +308,8 @@ class StompCodec extends ProtocolCodec {
   var read_end = 0
   var read_start = 0
 
+  var last_read_io_size = 0
+
   var read_direct:ZeroCopyBuffer = null
   var read_direct_pos = 0
 
@@ -326,6 +331,8 @@ class StompCodec extends ProtocolCodec {
 
   def getReadCounter = read_counter
 
+  def getLastReadSize = last_read_io_size
+
   override def read():Object = {
 
     var command:Object = null
@@ -371,13 +378,13 @@ class StompCodec extends ProtocolCodec {
 
           // Try to fill the buffer with data from the socket..
           var p = read_buffer.position()
-          var count = read_channel.read(read_buffer)
-          if (count == -1) {
+          last_read_io_size = read_channel.read(read_buffer)
+          if (last_read_io_size == -1) {
               throw new EOFException("Peer disconnected")
-          } else if (count == 0) {
+          } else if (last_read_io_size == 0) {
               return null
           }
-          read_counter += count
+          read_counter += last_read_io_size
       }
 
       command = next_action(read_buffer)

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransport.java Wed Jun  8 14:13:16 2011
@@ -47,8 +47,6 @@ public class TcpTransport extends JavaBa
 
     private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
 
-    protected Map<String, Object> socketOptions;
-
     abstract static class SocketState {
         void onStop(Runnable onCompleted) {
         }
@@ -186,6 +184,7 @@ public class TcpTransport extends JavaBa
     int max_read_rate;
     int max_write_rate;
     int receive_buffer_size = 1024*64;
+    int send_buffer_size = 1024*64;
 
 
     public static final int IPTOS_LOWCOST = 0x02;
@@ -357,6 +356,12 @@ public class TcpTransport extends JavaBa
         try {
             socket.setReceiveBufferSize(receive_buffer_size);
         } catch (SocketException e) {
+            e.printStackTrace();
+        }
+        try {
+            socket.setSendBufferSize(send_buffer_size);
+        } catch (SocketException e) {
+            e.printStackTrace();
         }
     }
 
@@ -708,10 +713,6 @@ public class TcpTransport extends JavaBa
         return false;
     }
 
-    public void setSocketOptions(Map<String, Object> socketOptions) {
-        this.socketOptions = socketOptions;
-    }
-
     public boolean isUseLocalHost() {
         return useLocalHost;
     }
@@ -788,4 +789,12 @@ public class TcpTransport extends JavaBa
     public void setReceive_buffer_size(int receive_buffer_size) {
         this.receive_buffer_size = receive_buffer_size;
     }
+
+    public int getSend_buffer_size() {
+        return send_buffer_size;
+    }
+
+    public void setSend_buffer_size(int send_buffer_size) {
+        this.send_buffer_size = send_buffer_size;
+    }
 }

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportFactory.java Wed Jun  8 14:13:16 2011
@@ -63,14 +63,12 @@ public class TcpTransportFactory impleme
 
         Map<String, String> options = new HashMap<String, String>(URISupport.parseParamters(uri));
         URI localLocation = getLocalLocation(uri);
+        configure(transport, options);
+        verify(transport, options);
 
         transport.connecting(uri, localLocation);
 
-        Map<String, Object> socketOptions = IntrospectionSupport.extractProperties(options, "socket.");
-        transport.setSocketOptions(socketOptions);
-
-        configure(transport, options);
-        return verify(transport, options);
+        return transport;
     }
 
     /**

Modified: activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java (original)
+++ activemq/activemq-apollo/trunk/apollo-tcp/src/main/java/org/apache/activemq/apollo/transport/tcp/TcpTransportServer.java Wed Jun  8 14:13:16 2011
@@ -31,6 +31,7 @@ import java.net.*;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -192,7 +193,7 @@ public class TcpTransportServer implemen
     protected final void handleSocket(SocketChannel socket) throws Exception {
         TcpTransport transport = createTransport();
         if (transportOptions != null) {
-            IntrospectionSupport.setProperties(transport, transportOptions);
+            IntrospectionSupport.setProperties(transport, new HashMap<String,String>(transportOptions) );
         }
         transport.connected(socket);
         listener.onAccept(transport);

Modified: activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java?rev=1133409&r1=1133408&r2=1133409&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java (original)
+++ activemq/activemq-apollo/trunk/apollo-transport/src/main/java/org/apache/activemq/apollo/transport/ProtocolCodec.java Wed Jun  8 14:13:16 2011
@@ -67,6 +67,11 @@ public interface ProtocolCodec {
      */
     public long getReadCounter();
 
+    /**
+     * @return The number of bytes read in the last read io performed.
+     */
+    public int getLastReadSize();
+
 
     ///////////////////////////////////////////////////////////////////
     //
@@ -110,5 +115,10 @@ public interface ProtocolCodec {
      */
     public long getWriteCounter();
 
+    /**
+     * @return The number of bytes read in the last write io performed.
+     */
+    public int getLastWriteSize();
+
 
 }