You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/10 05:10:59 UTC
svn commit: r1371551 - in /activemq/activemq-apollo/trunk: ./
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-website/src/documentation/
Author: chirino
Date: Fri Aug 10 03:10:59 2012
New Revision: 1371551
URL: http://svn.apache.org/viewvc?rev=1371551&view=rev
Log:
Fixes APLO-163: Consider auto-tuning the per-connection buffers (receive_buffer_size and send_buffer_size).
As a broker gets loaded up with connections now, we automatically start using smaller buffer sizes the new connections that it accepts. The first 1024 connections will still use a 64k buffer but additional connections will start using smaller and smaller buffers.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AcceptingConnectorDTO.java
activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
activemq/activemq-apollo/trunk/pom.xml
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=1371551&r1=1371550&r2=1371551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Fri Aug 10 03:10:59 2012
@@ -40,6 +40,7 @@ import org.fusesource.hawtdispatch.util.
import org.apache.activemq.apollo.filter.{Filterable, XPathExpression, XalanXPathEvaluator}
import org.xml.sax.InputSource
import java.util
+import javax.management.openmbean.CompositeData
/**
* <p>
@@ -155,6 +156,15 @@ object BrokerRegistry extends Log {
object Broker extends Log {
+ val mbean_server = ManagementFactory.getPlatformMBeanServer()
+
+ val MAX_JVM_HEAP_SIZE = try {
+ val data = mbean_server.getAttribute(new ObjectName("java.lang:type=Memory"), "HeapMemoryUsage").asInstanceOf[CompositeData]
+ data.get("max").asInstanceOf[java.lang.Long].longValue()
+ } catch {
+ case _ => 1024L * 1024 * 1024 // assume it's 1 GIG (that's the default apollo ships with)
+ }
+
val BLOCKABLE_THREAD_POOL = ApolloThreadPool.INSTANCE
private val SERVICE_TIMEOUT = 1000*5;
val buffer_pools = new BufferPools
@@ -221,7 +231,6 @@ object Broker extends Log {
if( System.getProperty("os.name").toLowerCase().startsWith("windows") ) {
None
} else {
- val mbean_server = ManagementFactory.getPlatformMBeanServer()
mbean_server.getAttribute(new ObjectName("java.lang:type=OperatingSystem"), "MaxFileDescriptorCount") match {
case x:java.lang.Long=> Some(x.longValue)
case _ => None
@@ -270,6 +279,27 @@ class Broker() extends BaseService with
val connectors = LinkedHashMap[String, Connector]()
val connections = LinkedHashMap[Long, BrokerConnection]()
+ // Each period is 1 second long..
+ object PeriodStat {
+ def apply(values:Seq[PeriodStat]) = {
+ val rc = new PeriodStat
+ for( s <- values ) {
+ rc.max_connections = rc.max_connections.max(s.max_connections)
+ }
+ rc
+ }
+ }
+
+ class PeriodStat {
+ // yeah just tracking max connections for now.. but we might add more later.
+ var max_connections = 0
+ }
+
+ var current_period:PeriodStat = new PeriodStat
+ val stats_of_5min = new CircularBuffer[PeriodStat](5*60) // collects 5 min stats.
+ var max_connections_in_5min = 0
+ var auto_tuned_send_receiver_buffer_size = 64*1024
+
val dispatch_queue = createQueue("broker")
def id = "default"
@@ -324,6 +354,8 @@ class Broker() extends BaseService with
}
schedule_reoccurring(1, SECONDS) {
virtualhost_maintenance
+ roll_current_period
+ tune_send_receive_buffers
}
val tracker = new LoggingTracker("broker startup", console_log, SERVICE_TIMEOUT)
@@ -366,6 +398,44 @@ class Broker() extends BaseService with
}
+ def roll_current_period = {
+ stats_of_5min.add(current_period)
+ current_period = new PeriodStat
+ current_period.max_connections = connections.size
+ max_connections_in_5min = PeriodStat(stats_of_5min).max_connections
+ }
+
+ def tune_send_receive_buffers = {
+
+ max_connections_in_5min = max_connections_in_5min.max(current_period.max_connections)
+ if ( max_connections_in_5min == 0 ) {
+ auto_tuned_send_receiver_buffer_size = 64*1024
+ } else {
+ // We start with the JVM heap.
+ var x = MAX_JVM_HEAP_SIZE
+ // Lets only use 1/8th of the heap for connection buffers.
+ x = x / 8
+ // 1/2 for send buffers, the other 1/2 for receive buffers.
+ x = x / 2
+ // Ok, how much space can we use per connection?
+ x = x / max_connections_in_5min
+ // Drop the bottom bits so that we are working /w 1k increments.
+ x = x & 0xFFFFFF00
+
+ // Constrain the result to be between a 2k and a 64k buffer.
+ auto_tuned_send_receiver_buffer_size = x.toInt.max(1024*2).min(1024*64)
+ }
+
+ // Basically this means that we will use a 64k send/receive buffer
+ // for the first 1024 connections established and then the buffer
+ // size will start getting reduced down until it gets to 2k buffers.
+ // Which will occur when you get to about 32,000 connections.
+
+ for( connector <- connectors.values ) {
+ connector.update_buffer_settings
+ }
+ }
+
def virtualhost_maintenance = {
val active_sessions = connections.values.flatMap(_.session_id).toSet
virtual_hosts.values.foreach { host=>
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala?rev=1371551&r1=1371550&r2=1371551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Connector.scala Fri Aug 10 03:10:59 2012
@@ -48,6 +48,8 @@ trait Connector extends BaseService with
def socket_address:SocketAddress
def status:ServiceStatusDTO
def resource_kind = SecuredResource.ConnectorKind
+ def update_buffer_settings = {}
+
}
trait ConnectorFactory {
@@ -156,6 +158,12 @@ class AcceptingConnector(val broker:Brok
connection.transport = transport
broker.connections.put(connection.id, connection)
+ broker.current_period.max_connections = broker.current_period.max_connections.max(broker.connections.size)
+ if ( broker.current_period.max_connections > broker.max_connections_in_5min ) {
+ // re-tune the buffer settings if max is getting bumped.
+ broker.tune_send_receive_buffers
+ }
+
try {
connection.start(NOOP)
} catch {
@@ -192,9 +200,15 @@ class AcceptingConnector(val broker:Brok
}
}
+ var send_buffer_size:Option[Int] = None
+ var receive_buffer_size:Option[Int] = None
override def _start(on_completed:Task) = {
+ def mem_size(value:String) = Option(value).map(MemoryPropertyEditor.parse(_).toInt)
+
assert(config!=null, "Connector must be configured before it is started.")
+ send_buffer_size = mem_size(config.send_buffer_size)
+ receive_buffer_size = mem_size(config.receive_buffer_size)
accepted.set(0)
connected.set(0)
@@ -206,6 +220,10 @@ class AcceptingConnector(val broker:Brok
transport_server match {
case transport_server:BrokerAware =>
transport_server.set_broker(broker)
+ case _ =>
+ }
+
+ transport_server match {
case transport_server:SslTransportServer =>
transport_server.setBlockingExecutor(Broker.BLOCKABLE_THREAD_POOL);
if( broker.key_storage!=null ) {
@@ -217,12 +235,45 @@ class AcceptingConnector(val broker:Brok
case _ =>
}
+ update_buffer_settings
+
transport_server.start(^{
broker.console_log.info("Accepting connections at: "+transport_server.getBoundAddress)
on_completed.run
})
}
+ var last_receive_buffer_size = 0
+ var last_send_buffer_size = 0
+
+ //
+ // This method get call once a second to re-tune the socket buffer sizes if needed.
+ //
+ override
+ def update_buffer_settings {
+ transport_server match {
+ case transport_server: TcpTransportServer =>
+
+ val next_receive_buffer_size = receive_buffer_size.getOrElse(broker.auto_tuned_send_receiver_buffer_size)
+ if( next_receive_buffer_size!=last_receive_buffer_size ) {
+ debug("%s connector receive_buffer_size set to: %d", id, next_receive_buffer_size)
+
+ // lets avoid updating the socket settings each period.
+ transport_server.setReceiveBufferSize(next_receive_buffer_size)
+ last_receive_buffer_size = next_receive_buffer_size
+ }
+
+ val next_send_buffer_size = send_buffer_size.getOrElse(broker.auto_tuned_send_receiver_buffer_size)
+ if( next_send_buffer_size!=last_send_buffer_size ) {
+ debug("%s connector send_buffer_size set to: %d", id, next_send_buffer_size)
+ // lets avoid updating the socket settings each period.
+ transport_server.setSendBufferSize(next_send_buffer_size)
+ last_send_buffer_size = next_send_buffer_size
+ }
+
+ case _ =>
+ }
+ }
override def _stop(on_completed:Task): Unit = {
transport_server.stop(^{
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AcceptingConnectorDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AcceptingConnectorDTO.java?rev=1371551&r1=1371550&r2=1371551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AcceptingConnectorDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/AcceptingConnectorDTO.java Fri Aug 10 03:10:59 2012
@@ -34,49 +34,67 @@ import java.util.List;
@JsonIgnoreProperties(ignoreUnknown = true)
public class AcceptingConnectorDTO extends ConnectorTypeDTO {
- /**
- * The transport that the connector will listen on, it includes the ip address and port that it will bind to.
- * Transports are specified using a URI syntax.
- */
- @XmlAttribute
- public String bind;
-
- /**
- * Defaults to 'any' which means that any of the broker's supported protocols can connect via this transport.
- */
- @XmlAttribute
- public String protocol;
-
- /**
- * A broker accepts connections via it's configured connectors.
- */
- @XmlElementRef
- public List<ProtocolDTO> protocols = new ArrayList<ProtocolDTO>();
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof AcceptingConnectorDTO)) return false;
- if (!super.equals(o)) return false;
-
- AcceptingConnectorDTO that = (AcceptingConnectorDTO) o;
-
- if (bind != null ? !bind.equals(that.bind) : that.bind != null)
- return false;
- if (protocol != null ? !protocol.equals(that.protocol) : that.protocol != null)
- return false;
- if (protocols != null ? !protocols.equals(that.protocols) : that.protocols != null)
- return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = super.hashCode();
- result = 31 * result + (bind != null ? bind.hashCode() : 0);
- result = 31 * result + (protocol != null ? protocol.hashCode() : 0);
- result = 31 * result + (protocols != null ? protocols.hashCode() : 0);
- return result;
- }
+ /**
+ * The transport that the connector will listen on, it includes the ip address and port that it will bind to.
+ * Transports are specified using a URI syntax.
+ */
+ @XmlAttribute
+ public String bind;
+
+ /**
+ * Defaults to 'any' which means that any of the broker's supported protocols can connect via this transport.
+ */
+ @XmlAttribute
+ public String protocol;
+
+ /**
+ * Sets the size of the internal socket receive buffer (aka setting the socket's SO_RCVBUF). Defaults to 64k
+ */
+ @XmlAttribute(name="receive_buffer_size")
+ public String receive_buffer_size;
+
+ /**
+ * Sets the size of the internal socket send buffer (aka setting the socket's SO_SNDBUF). Defaults to 64k
+ */
+ @XmlAttribute(name="send_buffer_size")
+ public String send_buffer_size;
+
+ /**
+ * A broker accepts connections via it's configured connectors.
+ */
+ @XmlElementRef
+ public List<ProtocolDTO> protocols = new ArrayList<ProtocolDTO>();
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof AcceptingConnectorDTO)) return false;
+ if (!super.equals(o)) return false;
+
+ AcceptingConnectorDTO that = (AcceptingConnectorDTO) o;
+
+ if (bind != null ? !bind.equals(that.bind) : that.bind != null)
+ return false;
+ if (protocol != null ? !protocol.equals(that.protocol) : that.protocol != null)
+ return false;
+ if (protocols != null ? !protocols.equals(that.protocols) : that.protocols != null)
+ return false;
+ if (receive_buffer_size != null ? !receive_buffer_size.equals(that.receive_buffer_size) : that.receive_buffer_size != null)
+ return false;
+ if (send_buffer_size != null ? !send_buffer_size.equals(that.send_buffer_size) : that.send_buffer_size != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (bind != null ? bind.hashCode() : 0);
+ result = 31 * result + (protocol != null ? protocol.hashCode() : 0);
+ result = 31 * result + (receive_buffer_size != null ? receive_buffer_size.hashCode() : 0);
+ result = 31 * result + (send_buffer_size != null ? send_buffer_size.hashCode() : 0);
+ result = 31 * result + (protocols != null ? protocols.hashCode() : 0);
+ return result;
+ }
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md?rev=1371551&r1=1371550&r2=1371551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md (original)
+++ activemq/activemq-apollo/trunk/apollo-website/src/documentation/user-manual.md Fri Aug 10 03:10:59 2012
@@ -160,6 +160,17 @@ A `connector` element can be configured
* `protocol` : Defaults to `any` which means that any of the broker's
supported protocols can connect via this transport.
+* `receive_buffer_size` : Sets the size of the internal socket receive
+ buffer (aka setting the socket's SO_RCVBUF).
+
+* `send_buffer_size` : Sets the size of the internal socket send buffer
+ (aka setting the socket's SO_SNDBUF).
+
+When the `receive_buffer_size` or `send_buffer_size` attributes are not set,
+then the broker will 'auto-tune' them to be between '64k' and '2k' based on the
+max number of connections established against the broker in the last 5 minutes
+and the size of the JVM heap.
+
Furthermore, the connector element may contain protocol specific
configuration elements. For example, to have the broker set the `user_id`
header of messages to the id of user that sent the message, you would
@@ -204,12 +215,6 @@ settings used on the socket. The suppor
* `backlog` : Sets the listen backlog size. Defaults to 100.
-* `receive_buffer_size` : Sets the size of the internal socket receive
- buffer (aka setting the socket's SO_RCVBUF). Defaults to 65536 (64k)
-
-* `send_buffer_size` : Sets the size of the internal socket send buffer
- (aka setting the socket's SO_SNDBUF). Defaults to 65536 (64k)
-
* `keep_alive` : Enable or disable the SO_KEEPALIVE socket option
(aka setting the socket's SO_KEEPALIVE). Defaults to true.
Modified: activemq/activemq-apollo/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/pom.xml?rev=1371551&r1=1371550&r2=1371551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/pom.xml (original)
+++ activemq/activemq-apollo/trunk/pom.xml Fri Aug 10 03:10:59 2012
@@ -96,7 +96,7 @@
<xbean-version>3.4</xbean-version>
<felix-version>1.0.0</felix-version>
- <hawtdispatch-version>1.11</hawtdispatch-version>
+ <hawtdispatch-version>1.12-SNAPSHOT</hawtdispatch-version>
<hawtbuf-version>1.9</hawtbuf-version>
<stompjms-version>1.13-SNAPSHOT</stompjms-version>