You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/04/09 22:36:59 UTC
svn commit: r1311428 -
/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
Author: chirino
Date: Mon Apr 9 20:36:59 2012
New Revision: 1311428
URL: http://svn.apache.org/viewvc?rev=1311428&view=rev
Log:
Don't create the advisory topics until an openwire connection needs it.. Identified in issue APLO-185. This hides the fact that Apollo supports openwire.
Modified:
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala?rev=1311428&r1=1311427&r2=1311428&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/DestinationAdvisoryRouterListener.scala Mon Apr 9 20:36:59 2012
@@ -58,6 +58,7 @@ class DestinationAdvisoryRouterListener(
final val messageIdGenerator = new LongSequenceGenerator
advisoryProducerId.setConnectionId(new UTF8Buffer(ID_GENERATOR.generateId))
+ var enabled = false
class ProducerRoute extends DeliveryProducerRoute(router) {
@@ -103,25 +104,28 @@ class DestinationAdvisoryRouterListener(
def on_bind(dest: DomainDestination, consumer: DeliveryConsumer, security: SecurityContext) = {
val destination = to_activemq_destination(Array(dest.address))
- if (destination!=null && AdvisorySupport.isDestinationAdvisoryTopic(destination) && !destination_advisories.isEmpty) {
+ if (destination!=null && AdvisorySupport.isDestinationAdvisoryTopic(destination)) {
// replay the destination advisories..
- val producer = new ProducerRoute {
- override def on_connected = {
- overflow_sink.refiller = ^{
- // once the sink is not overflowed.. then we can disconnect
- if(!overflow_sink.overflowed) {
- unbind(consumer::Nil)
- overflow_sink.refiller = NOOP
+ enabled = true
+ if( !destination_advisories.isEmpty ) {
+ val producer = new ProducerRoute {
+ override def on_connected = {
+ overflow_sink.refiller = ^{
+ // once the sink is not overflowed.. then we can disconnect
+ if(!overflow_sink.overflowed) {
+ unbind(consumer::Nil)
+ overflow_sink.refiller = NOOP
+ }
}
+ overflow_sink.refiller.run()
+ super.on_connected
}
- overflow_sink.refiller.run()
- super.on_connected
}
- }
- producer.bind(consumer::Nil)
- producer.connected()
- for( info <- destination_advisories.values ) {
- producer.overflow_sink.offer(info)
+ producer.bind(consumer::Nil)
+ producer.connected()
+ for( info <- destination_advisories.values ) {
+ producer.overflow_sink.offer(info)
+ }
}
}
}
@@ -176,7 +180,7 @@ class DestinationAdvisoryRouterListener(
val message = delivery.message.asInstanceOf[OpenwireMessage].message
val dest = to_destination_dto(message.getDestination,null)
val key = dest.toList
- if( router.virtual_host.service_state.is_started ) {
+ if( enabled && router.virtual_host.service_state.is_started ) {
val route = producerRoutes.get(key) match {
case null =>
// create the producer route...