You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/18 15:55:04 UTC
svn commit: r1220417 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/
apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/
apollo-stomp/src/main/scala/org/apache/activemq/apol...
Author: chirino
Date: Sun Dec 18 14:55:04 2011
New Revision: 1220417
URL: http://svn.apache.org/viewvc?rev=1220417&view=rev
Log:
Fixes APLO-117 : HeartBeatMonitor should be support having the read side checking disable for when the transport reads are suspended.
Removed:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1220417&r1=1220416&r2=1220417&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Sun Dec 18 14:55:04 2011
@@ -105,7 +105,7 @@ class OpenwireProtocolHandler extends Pr
val security_context = new SecurityContext
var config:OpenwireDTO = _
- var heart_beat_monitor: HeartBeatMonitor = new HeartBeatMonitor
+ var heart_beat_monitor = new HeartBeatMonitor
var waiting_on: String = "client request"
var current_command: Object = _
@@ -149,14 +149,16 @@ class OpenwireProtocolHandler extends Pr
}
}
- def suspendRead(reason: String) = {
+ def suspend_read(reason: String) = {
waiting_on = reason
connection.transport.suspendRead
+ heart_beat_monitor.suspendRead
}
- def resumeRead() = {
+ def resume_read() = {
waiting_on = "client request"
connection.transport.resumeRead
+ heart_beat_monitor.resumeRead
}
def ack(command: Command):Unit = {
@@ -170,7 +172,7 @@ class OpenwireProtocolHandler extends Pr
override def on_transport_failure(error: IOException) = {
if (!connection.stopped) {
error.printStackTrace
- suspendRead("shutdown")
+ suspend_read("shutdown")
debug(error, "Shutting connection down due to: %s", error)
connection.stop
}
@@ -191,11 +193,11 @@ class OpenwireProtocolHandler extends Pr
// Send our preferred wire format settings..
connection.transport.offer(preferred_wireformat_settings)
- resumeRead
+ resume_read
reset {
- suspendRead("virtual host lookup")
+ suspend_read("virtual host lookup")
this.host = broker.get_default_virtual_host
- resumeRead
+ resume_read
if(host==null) {
async_die("Could not find default virtual host")
}
@@ -384,26 +386,25 @@ class OpenwireProtocolHandler extends Pr
val initial_delay = preferred_wireformat_settings.getMaxInactivityDurationInitalDelay().min(info.getMaxInactivityDurationInitalDelay())
if (inactive_time > 0) {
- heart_beat_monitor.read_interval = inactive_time
- // lets be a little forgiving to account to packet transmission latency.
- heart_beat_monitor.read_interval += inactive_time.min(5000)
+ heart_beat_monitor.setReadInterval((inactive_time.min(5000)*1.5).toLong)
- heart_beat_monitor.on_dead = () => {
+ heart_beat_monitor.setOnDead(^{
async_die("Stale connection. Missed heartbeat.")
- }
+ })
- heart_beat_monitor.write_interval = inactive_time
- heart_beat_monitor.on_keep_alive = () => {
+ heart_beat_monitor.setWriteInterval(inactive_time)
+ heart_beat_monitor.setOnKeepAlive(^{
// we don't care if the offer gets rejected.. since that just
// means there is other traffic getting transmitted.
connection.transport.offer(new KeepAliveInfo)
- }
+ })
}
- heart_beat_monitor.initial_read_check_delay = initial_delay
- heart_beat_monitor.initial_write_check_delay = initial_delay
+ heart_beat_monitor.setInitialReadCheckDelay(initial_delay)
+ heart_beat_monitor.setInitialWriteCheckDelay(initial_delay)
- heart_beat_monitor.transport = connection.transport
+ heart_beat_monitor.suspendRead()
+ heart_beat_monitor.setTransport(connection.transport)
heart_beat_monitor.start
// Give the client some info about this broker.
@@ -428,7 +429,7 @@ class OpenwireProtocolHandler extends Pr
reset {
if( host.authenticator!=null && host.authorizer!=null ) {
- suspendRead("authenticating and authorizing connect")
+ suspend_read("authenticating and authorizing connect")
if( !host.authenticator.authenticate(security_context) ) {
async_die("Authentication failed. Credentials="+security_context.credential_dump)
noop // to make the cps compiler plugin happy.
@@ -439,7 +440,7 @@ class OpenwireProtocolHandler extends Pr
async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
noop // to make the cps compiler plugin happy.
} else {
- resumeRead
+ resume_read
ack(info);
noop
}
@@ -599,7 +600,7 @@ class OpenwireProtocolHandler extends Pr
override def connection = Some(OpenwireProtocolHandler.this.connection)
override def dispatch_queue = queue
refiller = ^ {
- resumeRead
+ resume_read
}
}
@@ -612,7 +613,7 @@ class OpenwireProtocolHandler extends Pr
async_die(failure, msg)
case None =>
if (!connection.stopped) {
- resumeRead
+ resume_read
producerRoutes.put(key, route)
send_via_route(route, msg, uow)
}
@@ -649,7 +650,7 @@ class OpenwireProtocolHandler extends Pr
if( route.full ) {
// but once it gets full.. suspend, so that we get more messages
// until it's not full anymore.
- suspendRead("blocked destination: "+route.overflowSessions.mkString(", "))
+ suspend_read("blocked destination: "+route.overflowSessions.mkString(", "))
}
} else {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1220417&r1=1220416&r2=1220417&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sun Dec 18 14:55:04 2011
@@ -23,7 +23,7 @@ import org.fusesource.hawtdispatch._
import org.apache.activemq.apollo.broker._
import Buffer._
import java.lang.String
-import protocol.{ProtocolFilter, HeartBeatMonitor, ProtocolHandler}
+import protocol.{ProtocolFilter, ProtocolHandler}
import security.SecurityContext
import Stomp._
import org.apache.activemq.apollo.selector.SelectorParser
@@ -34,12 +34,11 @@ import java.util.concurrent.TimeUnit
import java.util.Map.Entry
import path.PathParser
import scala.util.continuations._
-import org.fusesource.hawtdispatch.transport.SslTransport
-import org.fusesource.hawtdispatch.transport.SslTransport
import java.security.cert.X509Certificate
import collection.mutable.{ListBuffer, HashMap}
import java.io.IOException
import org.apache.activemq.apollo.dto._
+import org.fusesource.hawtdispatch.transport.{HeartBeatMonitor, SslTransport}
case class RichBuffer(self:Buffer) extends Proxy {
@@ -542,7 +541,7 @@ class StompProtocolHandler extends Proto
var protocol_version:AsciiBuffer = _
- var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
+ var heart_beat_monitor = new HeartBeatMonitor
val security_context = new SecurityContext
var waiting_on:String = "client request"
var config:StompDTO = _
@@ -681,7 +680,7 @@ class StompProtocolHandler extends Proto
x
})
connection_sink = new OverflowSink(sink_manager.open());
- resumeRead
+ resume_read
}
override def on_transport_disconnected() = {
@@ -789,14 +788,15 @@ class StompProtocolHandler extends Proto
}
}
-
- def suspendRead(reason:String) = {
+ def suspend_read(reason:String) = {
waiting_on = reason
connection.transport.suspendRead
+ heart_beat_monitor.suspendRead
}
- def resumeRead() = {
+ def resume_read() = {
waiting_on = "client request"
connection.transport.resumeRead
+ heart_beat_monitor.resumeRead
}
def on_stomp_connect(headers:HeaderMap):Unit = {
@@ -836,23 +836,21 @@ class StompProtocolHandler extends Proto
val please_send = cy.toString.toLong
if( inbound_heartbeat>=0 && can_send > 0 ) {
- heart_beat_monitor.read_interval = inbound_heartbeat.max(can_send)
-
- // lets be a little forgiving to account to packet transmission latency.
- heart_beat_monitor.read_interval += heart_beat_monitor.read_interval.min(5000)
+ heart_beat_monitor.setReadInterval((inbound_heartbeat.max(can_send)*1.5).toLong)
- heart_beat_monitor.on_dead = () => {
+ heart_beat_monitor.setOnDead(^{
async_die("Stale connection. Missed heartbeat.")
- }
+ });
}
if( outbound_heartbeat>=0 && please_send > 0 ) {
- heart_beat_monitor.write_interval = outbound_heartbeat.max(please_send)
- heart_beat_monitor.on_keep_alive = () => {
+ heart_beat_monitor.setWriteInterval(outbound_heartbeat.max(please_send))
+ heart_beat_monitor.setOnKeepAlive(^{
connection.transport.offer(NEWLINE_BUFFER)
- }
+ })
}
- heart_beat_monitor.transport = connection.transport
+ heart_beat_monitor.suspendRead()
+ heart_beat_monitor.setTransport(connection.transport)
heart_beat_monitor.start
} catch {
@@ -886,7 +884,7 @@ class StompProtocolHandler extends Proto
}
reset {
- suspendRead("virtual host lookup")
+ suspend_read("virtual host lookup")
val host_header = get(headers, HOST)
val host = host_header match {
case None=>
@@ -894,7 +892,7 @@ class StompProtocolHandler extends Proto
case Some(host)=>
connection.connector.broker.get_virtual_host(host)
}
- resumeRead
+ resume_read
if(host==null) {
async_die("Invalid virtual host: "+host_header.get)
@@ -908,7 +906,7 @@ class StompProtocolHandler extends Proto
this.host=host
connection_log = host.connection_log
if( host.authenticator!=null && host.authorizer!=null ) {
- suspendRead("authenticating and authorizing connect")
+ suspend_read("authenticating and authorizing connect")
if( !host.authenticator.authenticate(security_context) ) {
async_die("Authentication failed. Credentials="+security_context.credential_dump)
noop // to make the cps compiler plugin happy.
@@ -919,7 +917,7 @@ class StompProtocolHandler extends Proto
async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
noop // to make the cps compiler plugin happy.
} else {
- resumeRead
+ resume_read
send_connected
noop // to make the cps compiler plugin happy.
}
@@ -982,7 +980,7 @@ class StompProtocolHandler extends Proto
override def dispatch_queue = queue
refiller = ^{
- resumeRead
+ resume_read
}
}
@@ -995,7 +993,7 @@ class StompProtocolHandler extends Proto
async_die(failure)
case None =>
if (!connection.stopped) {
- resumeRead
+ resume_read
producerRoutes.put(key, route)
send_via_route(destination, route, frame, uow)
}
@@ -1102,7 +1100,7 @@ class StompProtocolHandler extends Proto
if( route.full ) {
// but once it gets full.. suspend, so that we get more stomp messages
// until it's not full anymore.
- suspendRead("blocked sending to: "+route.overflowSessions.mkString(", "))
+ suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
}
} else {
@@ -1321,7 +1319,7 @@ class StompProtocolHandler extends Proto
override def on_transport_failure(error: IOException) = {
if( !connection.stopped ) {
- suspendRead("shutdown")
+ suspend_read("shutdown")
connection_log.info(error, "Shutting connection '%s' down due to: %s", security_context.remote_address, error)
super.on_transport_failure(error);
}