You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/12/18 15:55:04 UTC

svn commit: r1220417 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/ apollo-stomp/src/main/scala/org/apache/activemq/apol...

Author: chirino
Date: Sun Dec 18 14:55:04 2011
New Revision: 1220417

URL: http://svn.apache.org/viewvc?rev=1220417&view=rev
Log:
Fixes APLO-117 : HeartBeatMonitor should be support having the read side checking disable for when the transport reads are suspended.

Removed:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/HeartBeatMonitor.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1220417&r1=1220416&r2=1220417&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Sun Dec 18 14:55:04 2011
@@ -105,7 +105,7 @@ class OpenwireProtocolHandler extends Pr
   val security_context = new SecurityContext
   var config:OpenwireDTO = _
 
-  var heart_beat_monitor: HeartBeatMonitor = new HeartBeatMonitor
+  var heart_beat_monitor = new HeartBeatMonitor
 
   var waiting_on: String = "client request"
   var current_command: Object = _
@@ -149,14 +149,16 @@ class OpenwireProtocolHandler extends Pr
     }
   }
 
-  def suspendRead(reason: String) = {
+  def suspend_read(reason: String) = {
     waiting_on = reason
     connection.transport.suspendRead
+    heart_beat_monitor.suspendRead
   }
 
-  def resumeRead() = {
+  def resume_read() = {
     waiting_on = "client request"
     connection.transport.resumeRead
+    heart_beat_monitor.resumeRead
   }
 
   def ack(command: Command):Unit = {
@@ -170,7 +172,7 @@ class OpenwireProtocolHandler extends Pr
   override def on_transport_failure(error: IOException) = {
     if (!connection.stopped) {
       error.printStackTrace
-      suspendRead("shutdown")
+      suspend_read("shutdown")
       debug(error, "Shutting connection down due to: %s", error)
       connection.stop
     }
@@ -191,11 +193,11 @@ class OpenwireProtocolHandler extends Pr
     // Send our preferred wire format settings..
     connection.transport.offer(preferred_wireformat_settings)
 
-    resumeRead
+    resume_read
     reset {
-      suspendRead("virtual host lookup")
+      suspend_read("virtual host lookup")
       this.host = broker.get_default_virtual_host
-      resumeRead
+      resume_read
       if(host==null) {
         async_die("Could not find default virtual host")
       }
@@ -384,26 +386,25 @@ class OpenwireProtocolHandler extends Pr
     val initial_delay = preferred_wireformat_settings.getMaxInactivityDurationInitalDelay().min(info.getMaxInactivityDurationInitalDelay())
 
     if (inactive_time > 0) {
-      heart_beat_monitor.read_interval = inactive_time
-      // lets be a little forgiving to account to packet transmission latency.
-      heart_beat_monitor.read_interval += inactive_time.min(5000)
+      heart_beat_monitor.setReadInterval((inactive_time.min(5000)*1.5).toLong)
 
-      heart_beat_monitor.on_dead = () => {
+      heart_beat_monitor.setOnDead(^{
         async_die("Stale connection.  Missed heartbeat.")
-      }
+      })
 
-      heart_beat_monitor.write_interval = inactive_time
-      heart_beat_monitor.on_keep_alive = () => {
+      heart_beat_monitor.setWriteInterval(inactive_time)
+      heart_beat_monitor.setOnKeepAlive(^{
         // we don't care if the offer gets rejected.. since that just
         // means there is other traffic getting transmitted.
         connection.transport.offer(new KeepAliveInfo)
-      }
+      })
     }
 
-    heart_beat_monitor.initial_read_check_delay = initial_delay
-    heart_beat_monitor.initial_write_check_delay = initial_delay
+    heart_beat_monitor.setInitialReadCheckDelay(initial_delay)
+    heart_beat_monitor.setInitialWriteCheckDelay(initial_delay)
 
-    heart_beat_monitor.transport = connection.transport
+    heart_beat_monitor.suspendRead()
+    heart_beat_monitor.setTransport(connection.transport)
     heart_beat_monitor.start
 
     // Give the client some info about this broker.
@@ -428,7 +429,7 @@ class OpenwireProtocolHandler extends Pr
 
       reset {
         if( host.authenticator!=null &&  host.authorizer!=null ) {
-          suspendRead("authenticating and authorizing connect")
+          suspend_read("authenticating and authorizing connect")
           if( !host.authenticator.authenticate(security_context) ) {
             async_die("Authentication failed. Credentials="+security_context.credential_dump)
             noop // to make the cps compiler plugin happy.
@@ -439,7 +440,7 @@ class OpenwireProtocolHandler extends Pr
             async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
             noop // to make the cps compiler plugin happy.
           } else {
-            resumeRead
+            resume_read
             ack(info);
             noop
           }
@@ -599,7 +600,7 @@ class OpenwireProtocolHandler extends Pr
           override def connection = Some(OpenwireProtocolHandler.this.connection)
           override def dispatch_queue = queue
           refiller = ^ {
-            resumeRead
+            resume_read
           }
         }
 
@@ -612,7 +613,7 @@ class OpenwireProtocolHandler extends Pr
               async_die(failure, msg)
             case None =>
               if (!connection.stopped) {
-                resumeRead
+                resume_read
                 producerRoutes.put(key, route)
                 send_via_route(route, msg, uow)
               }
@@ -649,7 +650,7 @@ class OpenwireProtocolHandler extends Pr
       if( route.full ) {
         // but once it gets full.. suspend, so that we get more messages
         // until it's not full anymore.
-        suspendRead("blocked destination: "+route.overflowSessions.mkString(", "))
+        suspend_read("blocked destination: "+route.overflowSessions.mkString(", "))
       }
 
     } else {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1220417&r1=1220416&r2=1220417&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Sun Dec 18 14:55:04 2011
@@ -23,7 +23,7 @@ import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker._
 import Buffer._
 import java.lang.String
-import protocol.{ProtocolFilter, HeartBeatMonitor, ProtocolHandler}
+import protocol.{ProtocolFilter, ProtocolHandler}
 import security.SecurityContext
 import Stomp._
 import org.apache.activemq.apollo.selector.SelectorParser
@@ -34,12 +34,11 @@ import java.util.concurrent.TimeUnit
 import java.util.Map.Entry
 import path.PathParser
 import scala.util.continuations._
-import org.fusesource.hawtdispatch.transport.SslTransport
-import org.fusesource.hawtdispatch.transport.SslTransport
 import java.security.cert.X509Certificate
 import collection.mutable.{ListBuffer, HashMap}
 import java.io.IOException
 import org.apache.activemq.apollo.dto._
+import org.fusesource.hawtdispatch.transport.{HeartBeatMonitor, SslTransport}
 
 
 case class RichBuffer(self:Buffer) extends Proxy {
@@ -542,7 +541,7 @@ class StompProtocolHandler extends Proto
 
   var protocol_version:AsciiBuffer = _
 
-  var heart_beat_monitor:HeartBeatMonitor = new HeartBeatMonitor
+  var heart_beat_monitor = new HeartBeatMonitor
   val security_context = new SecurityContext
   var waiting_on:String = "client request"
   var config:StompDTO = _
@@ -681,7 +680,7 @@ class StompProtocolHandler extends Proto
       x
     })
     connection_sink = new OverflowSink(sink_manager.open());
-    resumeRead
+    resume_read
   }
 
   override def on_transport_disconnected() = {
@@ -789,14 +788,15 @@ class StompProtocolHandler extends Proto
     }
   }
 
-
-  def suspendRead(reason:String) = {
+  def suspend_read(reason:String) = {
     waiting_on = reason
     connection.transport.suspendRead
+    heart_beat_monitor.suspendRead
   }
-  def resumeRead() = {
+  def resume_read() = {
     waiting_on = "client request"
     connection.transport.resumeRead
+    heart_beat_monitor.resumeRead
   }
 
   def on_stomp_connect(headers:HeaderMap):Unit = {
@@ -836,23 +836,21 @@ class StompProtocolHandler extends Proto
           val please_send = cy.toString.toLong
 
           if( inbound_heartbeat>=0 && can_send > 0 ) {
-            heart_beat_monitor.read_interval = inbound_heartbeat.max(can_send)
-
-            // lets be a little forgiving to account to packet transmission latency.
-            heart_beat_monitor.read_interval += heart_beat_monitor.read_interval.min(5000)
+            heart_beat_monitor.setReadInterval((inbound_heartbeat.max(can_send)*1.5).toLong)
 
-            heart_beat_monitor.on_dead = () => {
+            heart_beat_monitor.setOnDead(^{
               async_die("Stale connection.  Missed heartbeat.")
-            }
+            });
           }
           if( outbound_heartbeat>=0 && please_send > 0 ) {
-            heart_beat_monitor.write_interval = outbound_heartbeat.max(please_send)
-            heart_beat_monitor.on_keep_alive = () => {
+            heart_beat_monitor.setWriteInterval(outbound_heartbeat.max(please_send))
+            heart_beat_monitor.setOnKeepAlive(^{
               connection.transport.offer(NEWLINE_BUFFER)
-            }
+            })
           }
 
-          heart_beat_monitor.transport = connection.transport
+          heart_beat_monitor.suspendRead()
+          heart_beat_monitor.setTransport(connection.transport)
           heart_beat_monitor.start
 
         } catch {
@@ -886,7 +884,7 @@ class StompProtocolHandler extends Proto
     }
 
     reset {
-      suspendRead("virtual host lookup")
+      suspend_read("virtual host lookup")
       val host_header = get(headers, HOST)
       val host = host_header match {
         case None=>
@@ -894,7 +892,7 @@ class StompProtocolHandler extends Proto
         case Some(host)=>
           connection.connector.broker.get_virtual_host(host)
       }
-      resumeRead
+      resume_read
 
       if(host==null) {
         async_die("Invalid virtual host: "+host_header.get)
@@ -908,7 +906,7 @@ class StompProtocolHandler extends Proto
         this.host=host
         connection_log = host.connection_log
         if( host.authenticator!=null &&  host.authorizer!=null ) {
-          suspendRead("authenticating and authorizing connect")
+          suspend_read("authenticating and authorizing connect")
           if( !host.authenticator.authenticate(security_context) ) {
             async_die("Authentication failed. Credentials="+security_context.credential_dump)
             noop // to make the cps compiler plugin happy.
@@ -919,7 +917,7 @@ class StompProtocolHandler extends Proto
             async_die("Not authorized to connect to virtual host '%s'. Principals=".format(this.host.id, security_context.principal_dump))
             noop // to make the cps compiler plugin happy.
           } else {
-            resumeRead
+            resume_read
             send_connected
             noop // to make the cps compiler plugin happy.
           }
@@ -982,7 +980,7 @@ class StompProtocolHandler extends Proto
           override def dispatch_queue = queue
 
           refiller = ^{
-            resumeRead
+            resume_read
           }
         }
 
@@ -995,7 +993,7 @@ class StompProtocolHandler extends Proto
               async_die(failure)
             case None =>
               if (!connection.stopped) {
-                resumeRead
+                resume_read
                 producerRoutes.put(key, route)
                 send_via_route(destination, route, frame, uow)
               }
@@ -1102,7 +1100,7 @@ class StompProtocolHandler extends Proto
       if( route.full ) {
         // but once it gets full.. suspend, so that we get more stomp messages
         // until it's not full anymore.
-        suspendRead("blocked sending to: "+route.overflowSessions.mkString(", "))
+        suspend_read("blocked sending to: "+route.overflowSessions.mkString(", "))
       }
 
     } else {
@@ -1321,7 +1319,7 @@ class StompProtocolHandler extends Proto
 
   override def on_transport_failure(error: IOException) = {
     if( !connection.stopped ) {
-      suspendRead("shutdown")
+      suspend_read("shutdown")
       connection_log.info(error, "Shutting connection '%s'  down due to: %s", security_context.remote_address, error)
       super.on_transport_failure(error);
     }