You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/10 05:10:59 UTC

svn commit: r1371551 - in /activemq/activemq-apollo/trunk: ./ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-website/src/documentation/

Author: chirino
Date: Fri Aug 10 03:10:59 2012
New Revision: 1371551

URL: http://svn.apache.org/viewvc?rev=1371551&view=rev
Log:
Fixes  APLO-163: Consider auto-tuning the per-connection buffers (receive_buffer_size and send_buffer_size).

As a broker gets loaded up with connections now, we automatically start using smaller buffer sizes the new connections that it accepts.  The first 1024 connections will still use a 64k buffer but additional connections will start using smaller and smaller buffers.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AcceptingConnectorDTO.java
    activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1371551&r1=1371550&r2=1371551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Aug 10 03:10:59 2012
@@ -40,6 +40,7 @@ import org.fusesource.hawtdispatch.util.
 import org.apache.activemq.apollo.filter.{Filterable, XPathExpression, XalanXPathEvaluator}
 import org.xml.sax.InputSource
 import java.util
+import javax.management.openmbean.CompositeData
 
 /**
  * <p>
@@ -155,6 +156,15 @@ object BrokerRegistry extends Log {
 
 object Broker extends Log {
 
+  val mbean_server = ManagementFactory.getPlatformMBeanServer()
+
+  val MAX_JVM_HEAP_SIZE = try {
+    val data = mbean_server.getAttribute(new ObjectName("java.lang:type=Memory"), "HeapMemoryUsage").asInstanceOf[CompositeData]
+    data.get("max").asInstanceOf[java.lang.Long].longValue()
+  } catch {
+    case _ => 1024L * 1024 * 1024 // assume it's 1 GIG (that's the default apollo ships with)
+  }
+
   val BLOCKABLE_THREAD_POOL = ApolloThreadPool.INSTANCE
   private val SERVICE_TIMEOUT = 1000*5;
   val buffer_pools = new BufferPools
@@ -221,7 +231,6 @@ object Broker extends Log {
     if( System.getProperty("os.name").toLowerCase().startsWith("windows") ) {
       None
     } else {
-      val mbean_server = ManagementFactory.getPlatformMBeanServer()
       mbean_server.getAttribute(new ObjectName("java.lang:type=OperatingSystem"), "MaxFileDescriptorCount") match {
         case x:java.lang.Long=> Some(x.longValue)
         case _ => None
@@ -270,6 +279,27 @@ class Broker() extends BaseService with 
   val connectors = LinkedHashMap[String, Connector]()
   val connections = LinkedHashMap[Long, BrokerConnection]()
 
+  // Each period is 1 second long..
+  object PeriodStat {
+    def apply(values:Seq[PeriodStat]) = {
+      val rc = new PeriodStat
+      for( s <- values ) {
+        rc.max_connections = rc.max_connections.max(s.max_connections)
+      }
+      rc
+    }
+  }
+
+  class PeriodStat {
+    // yeah just tracking max connections for now.. but we might add more later.
+    var max_connections = 0
+  }
+
+  var current_period:PeriodStat = new PeriodStat
+  val stats_of_5min = new CircularBuffer[PeriodStat](5*60)  // collects 5 min stats.
+  var max_connections_in_5min = 0
+  var auto_tuned_send_receiver_buffer_size = 64*1024
+
   val dispatch_queue = createQueue("broker")
 
   def id = "default"
@@ -324,6 +354,8 @@ class Broker() extends BaseService with 
     }
     schedule_reoccurring(1, SECONDS) {
       virtualhost_maintenance
+      roll_current_period
+      tune_send_receive_buffers
     }
 
     val tracker = new LoggingTracker("broker startup", console_log, SERVICE_TIMEOUT)
@@ -366,6 +398,44 @@ class Broker() extends BaseService with 
 
   }
 
+  def roll_current_period = {
+    stats_of_5min.add(current_period)
+    current_period = new PeriodStat
+    current_period.max_connections = connections.size
+    max_connections_in_5min = PeriodStat(stats_of_5min).max_connections
+  }
+
+  def tune_send_receive_buffers = {
+
+    max_connections_in_5min = max_connections_in_5min.max(current_period.max_connections)
+    if ( max_connections_in_5min == 0 ) {
+      auto_tuned_send_receiver_buffer_size = 64*1024
+    } else {
+      // We start with the JVM heap.
+      var x = MAX_JVM_HEAP_SIZE
+      // Lets only use 1/8th of the heap for connection buffers.
+      x = x / 8
+      // 1/2 for send buffers, the other 1/2 for receive buffers.
+      x = x / 2
+      // Ok, how much space can we use per connection?
+      x = x / max_connections_in_5min
+      // Drop the bottom bits so that we are working /w 1k increments.
+      x = x & 0xFFFFFF00
+
+      // Constrain the result to be between a 2k and a 64k buffer.
+      auto_tuned_send_receiver_buffer_size = x.toInt.max(1024*2).min(1024*64)
+    }
+
+    // Basically this means that we will use a 64k send/receive buffer
+    // for the first 1024 connections established and then the buffer
+    // size will start getting reduced down until it gets to 2k buffers.
+    // Which will occur when you get to about 32,000 connections.
+
+    for( connector <- connectors.values ) {
+      connector.update_buffer_settings
+    }
+  }
+
   def virtualhost_maintenance = {
     val active_sessions = connections.values.flatMap(_.session_id).toSet
     virtual_hosts.values.foreach { host=>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1371551&r1=1371550&r2=1371551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Fri Aug 10 03:10:59 2012
@@ -48,6 +48,8 @@ trait Connector extends BaseService with
   def socket_address:SocketAddress
   def status:ServiceStatusDTO
   def resource_kind = SecuredResource.ConnectorKind
+  def update_buffer_settings = {}
+
 }
 
 trait ConnectorFactory {
@@ -156,6 +158,12 @@ class AcceptingConnector(val broker:Brok
       connection.transport = transport
 
       broker.connections.put(connection.id, connection)
+      broker.current_period.max_connections = broker.current_period.max_connections.max(broker.connections.size)
+      if ( broker.current_period.max_connections > broker.max_connections_in_5min ) {
+        // re-tune the buffer settings if max is getting bumped.
+        broker.tune_send_receive_buffers
+      }
+
       try {
         connection.start(NOOP)
       } catch {
@@ -192,9 +200,15 @@ class AcceptingConnector(val broker:Brok
     }
   }
 
+  var send_buffer_size:Option[Int] = None
+  var receive_buffer_size:Option[Int] = None
 
   override def _start(on_completed:Task) = {
+    def mem_size(value:String) = Option(value).map(MemoryPropertyEditor.parse(_).toInt)
+
     assert(config!=null, "Connector must be configured before it is started.")
+    send_buffer_size = mem_size(config.send_buffer_size)
+    receive_buffer_size = mem_size(config.receive_buffer_size)
 
     accepted.set(0)
     connected.set(0)
@@ -206,6 +220,10 @@ class AcceptingConnector(val broker:Brok
     transport_server match {
       case transport_server:BrokerAware =>
         transport_server.set_broker(broker)
+      case _ =>
+    }
+
+    transport_server match {
       case transport_server:SslTransportServer =>
         transport_server.setBlockingExecutor(Broker.BLOCKABLE_THREAD_POOL);
         if( broker.key_storage!=null ) {
@@ -217,12 +235,45 @@ class AcceptingConnector(val broker:Brok
       case _ =>
     }
 
+    update_buffer_settings
+
     transport_server.start(^{
       broker.console_log.info("Accepting connections at: "+transport_server.getBoundAddress)
       on_completed.run
     })
   }
 
+  var last_receive_buffer_size = 0
+  var last_send_buffer_size = 0
+
+  //
+  // This method get call once a second to re-tune the socket buffer sizes if needed.
+  //
+  override
+  def update_buffer_settings {
+    transport_server match {
+      case transport_server: TcpTransportServer =>
+
+        val next_receive_buffer_size = receive_buffer_size.getOrElse(broker.auto_tuned_send_receiver_buffer_size)
+        if( next_receive_buffer_size!=last_receive_buffer_size ) {
+          debug("%s connector receive_buffer_size set to: %d", id, next_receive_buffer_size)
+
+          // lets avoid updating the socket settings each period.
+          transport_server.setReceiveBufferSize(next_receive_buffer_size)
+          last_receive_buffer_size = next_receive_buffer_size
+        }
+
+        val next_send_buffer_size = send_buffer_size.getOrElse(broker.auto_tuned_send_receiver_buffer_size)
+        if( next_send_buffer_size!=last_send_buffer_size ) {
+          debug("%s connector send_buffer_size set to: %d", id, next_send_buffer_size)
+          // lets avoid updating the socket settings each period.
+          transport_server.setSendBufferSize(next_send_buffer_size)
+          last_send_buffer_size = next_send_buffer_size
+        }
+
+      case _ =>
+    }
+  }
 
   override def _stop(on_completed:Task): Unit = {
     transport_server.stop(^{

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AcceptingConnectorDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AcceptingConnectorDTO.java?rev=1371551&r1=1371550&r2=1371551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AcceptingConnectorDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AcceptingConnectorDTO.java Fri Aug 10 03:10:59 2012
@@ -34,49 +34,67 @@ import java.util.List;
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class AcceptingConnectorDTO extends ConnectorTypeDTO {
 
-  /**
-   * The transport that the connector will listen on, it includes the ip address and port that it will bind to.
-   * Transports are specified using a URI syntax.
-   */
-  @XmlAttribute
-  public String bind;
-
-  /**
-   * Defaults to 'any' which means that any of the broker's supported protocols can connect via this transport.
-   */
-  @XmlAttribute
-  public String protocol;
-
-  /**
-   * A broker accepts connections via it's configured connectors.
-   */
-  @XmlElementRef
-  public List<ProtocolDTO> protocols = new ArrayList<ProtocolDTO>();
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (!(o instanceof AcceptingConnectorDTO)) return false;
-    if (!super.equals(o)) return false;
-
-    AcceptingConnectorDTO that = (AcceptingConnectorDTO) o;
-
-    if (bind != null ? !bind.equals(that.bind) : that.bind != null)
-      return false;
-    if (protocol != null ? !protocol.equals(that.protocol) : that.protocol != null)
-      return false;
-    if (protocols != null ? !protocols.equals(that.protocols) : that.protocols != null)
-      return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    int result = super.hashCode();
-    result = 31 * result + (bind != null ? bind.hashCode() : 0);
-    result = 31 * result + (protocol != null ? protocol.hashCode() : 0);
-    result = 31 * result + (protocols != null ? protocols.hashCode() : 0);
-    return result;
-  }
+    /**
+     * The transport that the connector will listen on, it includes the ip address and port that it will bind to.
+     * Transports are specified using a URI syntax.
+     */
+    @XmlAttribute
+    public String bind;
+
+    /**
+     * Defaults to 'any' which means that any of the broker's supported protocols can connect via this transport.
+     */
+    @XmlAttribute
+    public String protocol;
+
+    /**
+     * Sets the size of the internal socket receive buffer (aka setting the socket's SO_RCVBUF). Defaults to 64k
+     */
+    @XmlAttribute(name="receive_buffer_size")
+    public String receive_buffer_size;
+
+    /**
+     * Sets the size of the internal socket send buffer (aka setting the socket's SO_SNDBUF). Defaults to 64k
+     */
+    @XmlAttribute(name="send_buffer_size")
+    public String send_buffer_size;
+
+    /**
+     * A broker accepts connections via it's configured connectors.
+     */
+    @XmlElementRef
+    public List<ProtocolDTO> protocols = new ArrayList<ProtocolDTO>();
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof AcceptingConnectorDTO)) return false;
+        if (!super.equals(o)) return false;
+
+        AcceptingConnectorDTO that = (AcceptingConnectorDTO) o;
+
+        if (bind != null ? !bind.equals(that.bind) : that.bind != null)
+            return false;
+        if (protocol != null ? !protocol.equals(that.protocol) : that.protocol != null)
+            return false;
+        if (protocols != null ? !protocols.equals(that.protocols) : that.protocols != null)
+            return false;
+        if (receive_buffer_size != null ? !receive_buffer_size.equals(that.receive_buffer_size) : that.receive_buffer_size != null)
+            return false;
+        if (send_buffer_size != null ? !send_buffer_size.equals(that.send_buffer_size) : that.send_buffer_size != null)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (bind != null ? bind.hashCode() : 0);
+        result = 31 * result + (protocol != null ? protocol.hashCode() : 0);
+        result = 31 * result + (receive_buffer_size != null ? receive_buffer_size.hashCode() : 0);
+        result = 31 * result + (send_buffer_size != null ? send_buffer_size.hashCode() : 0);
+        result = 31 * result + (protocols != null ? protocols.hashCode() : 0);
+        return result;
+    }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1371551&r1=1371550&r2=1371551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Fri Aug 10 03:10:59 2012
@@ -160,6 +160,17 @@ A `connector` element can be configured 
 * `protocol` : Defaults to `any` which means that any of the broker's 
    supported protocols can connect via this transport.
 
+* `receive_buffer_size` : Sets the size of the internal socket receive 
+   buffer (aka setting the socket's SO_RCVBUF).  
+
+* `send_buffer_size` : Sets the size of the internal socket send buffer
+  (aka setting the socket's SO_SNDBUF).  
+
+When the `receive_buffer_size` or `send_buffer_size` attributes are not set, 
+then the broker will 'auto-tune' them to be between '64k' and '2k' based on the
+max number of connections established against the broker in the last 5 minutes 
+and the size of the JVM heap.
+
 Furthermore, the connector element may contain protocol specific
 configuration elements. For example, to have the broker set the `user_id`
 header of messages to the id of user that sent the message, you would
@@ -204,12 +215,6 @@ settings used on the socket.  The suppor
 
 * `backlog` : Sets the listen backlog size.  Defaults to 100.
 
-* `receive_buffer_size` : Sets the size of the internal socket receive 
-   buffer (aka setting the socket's SO_RCVBUF).  Defaults to 65536 (64k)
-
-* `send_buffer_size` : Sets the size of the internal socket send buffer
-  (aka setting the socket's SO_SNDBUF).  Defaults to 65536 (64k)
-
 * `keep_alive` : Enable or disable the SO_KEEPALIVE socket option  
    (aka setting the socket's SO_KEEPALIVE). Defaults to true.
 

Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1371551&r1=1371550&r2=1371551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Fri Aug 10 03:10:59 2012
@@ -96,7 +96,7 @@
     <xbean-version>3.4</xbean-version>
     <felix-version>1.0.0</felix-version>
 
-    <hawtdispatch-version>1.11</hawtdispatch-version>
+    <hawtdispatch-version>1.12-SNAPSHOT</hawtdispatch-version>
     <hawtbuf-version>1.9</hawtbuf-version>
     <stompjms-version>1.13-SNAPSHOT</stompjms-version>