You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/28 18:27:07 UTC
svn commit: r1366706 -
/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Author: chirino
Date: Sat Jul 28 16:27:07 2012
New Revision: 1366706
URL: http://svn.apache.org/viewvc?rev=1366706&view=rev
Log:
Fix problem where if you did not have access to send to a destination on openwire, the connection would get terminated.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1366706&r1=1366705&r2=1366706&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Sat Jul 28 16:27:07 2012
@@ -317,13 +317,7 @@ class OpenwireProtocolHandler extends Pr
class ProtocolException(msg:String) extends RuntimeException(msg)
class Break extends RuntimeException
- def async_fail(msg: String, actual:Command=null):Unit = try {
- fail(msg, actual)
- } catch {
- case x:Break=>
- }
-
- def fail[T](msg: String, actual:Command=null):T = {
+ def fail[T](msg: String, actual:Command=null):Unit = {
def respond(command:Command) = {
if(command.isResponseRequired()) {
val e = new ProtocolException(msg)
@@ -356,7 +350,6 @@ class OpenwireProtocolHandler extends Pr
case (command:Command, command2:Command)=>
respond(command)
}
- throw new Break()
}
def async_die(msg: String, actual:Command=null):Unit = try {
@@ -643,16 +636,16 @@ class OpenwireProtocolHandler extends Pr
val route = OpenwireDeliveryProducerRoute(addresses)
// don't process frames until producer is connected...
- connection.transport.suspendRead
+ suspend_read("connecting producer route")
host.dispatch_queue {
val rc = host.router.connect(addresses, route, security_context)
dispatchQueue {
+ resume_read
rc match {
case Some(failure) =>
- async_die(failure, msg)
+ fail(failure, msg)
case None =>
if (!connection.stopped) {
- resume_read
producerRoutes.put(msg.getDestination, route)
send_via_route(route, msg, uow)
}
@@ -861,7 +854,10 @@ class OpenwireProtocolHandler extends Pr
def attach = {
- if( info.getDestination == null ) fail("destination was not set")
+ if( info.getDestination == null ) {
+ fail("destination was not set", info)
+ throw new Break()
+ }
addresses = to_destination_dto(info.getDestination, OpenwireProtocolHandler.this)
// if they are temp dests.. attach our owner id so that we don't
@@ -881,7 +877,8 @@ class OpenwireProtocolHandler extends Pr
SelectorParser.parse(x.toString)
} catch {
case e:FilterException =>
- fail("Invalid selector expression: "+e.getMessage)
+ fail("Invalid selector expression: "+e.getMessage, info)
+ throw new Break()
}
}
@@ -908,7 +905,7 @@ class OpenwireProtocolHandler extends Pr
case None =>
ack(info)
case Some(reason) =>
- async_fail(reason, info)
+ fail(reason, info)
}
}
}