You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/28 18:27:07 UTC

svn commit: r1366706 - /activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Author: chirino
Date: Sat Jul 28 16:27:07 2012
New Revision: 1366706

URL: http://svn.apache.org/viewvc?rev=1366706&view=rev
Log:
Fix problem where if you did not have access to send to a destination on openwire, the connection would get terminated.

Modified:
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1366706&r1=1366705&r2=1366706&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Sat Jul 28 16:27:07 2012
@@ -317,13 +317,7 @@ class OpenwireProtocolHandler extends Pr
   class ProtocolException(msg:String) extends RuntimeException(msg)
   class Break extends RuntimeException
 
-  def async_fail(msg: String, actual:Command=null):Unit = try {
-    fail(msg, actual)
-  } catch {
-    case x:Break=>
-  }
-
-  def fail[T](msg: String, actual:Command=null):T = {
+  def fail[T](msg: String, actual:Command=null):Unit = {
     def respond(command:Command) = {
       if(command.isResponseRequired()) {
         val e = new ProtocolException(msg)
@@ -356,7 +350,6 @@ class OpenwireProtocolHandler extends Pr
        case (command:Command, command2:Command)=>
          respond(command)
     }
-    throw new Break()
   }
 
   def async_die(msg: String, actual:Command=null):Unit = try {
@@ -643,16 +636,16 @@ class OpenwireProtocolHandler extends Pr
         val route = OpenwireDeliveryProducerRoute(addresses)
 
         // don't process frames until producer is connected...
-        connection.transport.suspendRead
+        suspend_read("connecting producer route")
         host.dispatch_queue {
           val rc = host.router.connect(addresses, route, security_context)
           dispatchQueue {
+            resume_read
             rc match {
               case Some(failure) =>
-                async_die(failure, msg)
+                fail(failure, msg)
               case None =>
                 if (!connection.stopped) {
-                  resume_read
                   producerRoutes.put(msg.getDestination, route)
                   send_via_route(route, msg, uow)
                 }
@@ -861,7 +854,10 @@ class OpenwireProtocolHandler extends Pr
 
     def attach = {
 
-      if( info.getDestination == null ) fail("destination was not set")
+      if( info.getDestination == null ) {
+        fail("destination was not set", info)
+        throw new Break()
+      }
       addresses = to_destination_dto(info.getDestination, OpenwireProtocolHandler.this)
 
       // if they are temp dests.. attach our owner id so that we don't
@@ -881,7 +877,8 @@ class OpenwireProtocolHandler extends Pr
             SelectorParser.parse(x.toString)
           } catch {
             case e:FilterException =>
-              fail("Invalid selector expression: "+e.getMessage)
+              fail("Invalid selector expression: "+e.getMessage, info)
+              throw new Break()
           }
       }
 
@@ -908,7 +905,7 @@ class OpenwireProtocolHandler extends Pr
             case None =>
               ack(info)
             case Some(reason) =>
-              async_fail(reason, info)
+              fail(reason, info)
           }
         }
       }