You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/16 18:40:37 UTC
svn commit: r1136537 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-web...
Author: chirino
Date: Thu Jun 16 16:40:37 2011
New Revision: 1136537
URL: http://svn.apache.org/viewvc?rev=1136537&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-50 - Durable subscriptions can now be created on broker startup.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Thu Jun 16 16:40:37 2011
@@ -23,8 +23,6 @@ import org.apache.activemq.apollo.util._
import collection.mutable.HashMap
import org.apache.activemq.apollo.broker.store.QueueRecord
import Buffer._
-import java.util.ArrayList
-import org.apache.activemq.apollo.dto._
import path._
import security.SecurityContext
import java.util.concurrent.TimeUnit
@@ -32,6 +30,8 @@ import collection.mutable.ListBuffer._
import collection.mutable.HashMap._
import scala.Array
import tools.nsc.doc.model.ProtectedInInstance
+import org.apache.activemq.apollo.dto._
+import java.util.{Arrays, ArrayList}
trait DomainDestination {
@@ -645,21 +645,35 @@ class LocalRouter(val virtual_host:Virtu
/////////////////////////////////////////////////////////////////////////////
protected def create_configure_destinations {
+ import collection.JavaConversions._
def create_configured_dests(list: ArrayList[_ <: StringIdDTO], d: Domain[_], f: (Array[String]) => DestinationDTO) = {
- import collection.JavaConversions._
- list.foreach {
- dto =>
- if (dto.id != null) {
- val parts = destination_parser.parts(dto.id)
- val path = destination_parser.decode_path(parts)
- if (!PathParser.containsWildCards(path)) {
- d.get_or_create_destination(path, f(parts), null)
- }
+ list.foreach { dto =>
+ if (dto.id != null) {
+ val parts = destination_parser.parts(dto.id)
+ val path = destination_parser.decode_path(parts)
+ if (!PathParser.containsWildCards(path)) {
+ d.get_or_create_destination(path, f(parts), null)
}
+ }
}
}
create_configured_dests(virtual_host.config.queues, queue_domain, (parts) => new QueueDestinationDTO(parts))
create_configured_dests(virtual_host.config.topics, topic_domain, (parts) => new TopicDestinationDTO(parts))
+
+ virtual_host.config.dsubs.foreach { dto =>
+ if (dto.id != null && dto.topic!=null ) {
+
+ // We will create the durable sub if it does not exist yet..
+ if( !topic_domain.durable_subscriptions_by_id.contains(dto.id) ) {
+ val destination = new DurableSubscriptionDestinationDTO()
+ destination.subscription_id = dto.id
+ destination.path = Arrays.asList(destination_parser.parts(dto.topic) : _ *)
+ destination.selector = dto.selector
+ _create_queue(QueueBinding.create(destination))
+ }
+
+ }
+ }
}
protected def _start(on_completed: Runnable) = {
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Thu Jun 16 16:40:37 2011
@@ -239,10 +239,10 @@ class DurableSubscriptionQueueBinding(va
}
override def message_filter = {
- if ( binding_dto.filter==null ) {
+ if ( binding_dto.selector==null ) {
ConstantExpression.TRUE
} else {
- SelectorParser.parse(binding_dto.filter)
+ SelectorParser.parse(binding_dto.selector)
}
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java Thu Jun 16 16:40:37 2011
@@ -33,5 +33,19 @@ public class DurableSubscriptionDTO exte
@XmlAttribute(name="id_regex")
public String id_regex;
+ /**
+ * If the topic and id are specified, then the durable subscription
+ * can be eagerly created.
+ */
+ @XmlAttribute
+ public String topic;
+
+ /**
+ * An optional selector that the durable subscription will be created
+ * with when it's first eagerly created.
+ */
+ @XmlAttribute
+ public String selector;
+
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java Thu Jun 16 16:40:37 2011
@@ -20,7 +20,6 @@ import javax.xml.bind.annotation.XmlAcce
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlRootElement;
-import java.util.List;
/**
* <p>
@@ -33,7 +32,7 @@ import java.util.List;
public class DurableSubscriptionDestinationDTO extends TopicDestinationDTO {
@XmlAttribute
- public String filter;
+ public String selector;
@XmlAttribute(name="subscription_id")
public String subscription_id;
@@ -54,7 +53,7 @@ public class DurableSubscriptionDestinat
DurableSubscriptionDestinationDTO that = (DurableSubscriptionDestinationDTO) o;
- if (filter != null ? !filter.equals(that.filter) : that.filter != null) return false;
+ if (selector != null ? !selector.equals(that.selector) : that.selector != null) return false;
if (subscription_id != null ? !subscription_id.equals(that.subscription_id) : that.subscription_id != null)
return false;
@@ -64,7 +63,7 @@ public class DurableSubscriptionDestinat
@Override
public int hashCode() {
int result = super.hashCode();
- result = 31 * result + (filter != null ? filter.hashCode() : 0);
+ result = 31 * result + (selector != null ? selector.hashCode() : 0);
result = 31 * result + (subscription_id != null ? subscription_id.hashCode() : 0);
return result;
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Jun 16 16:40:37 2011
@@ -935,7 +935,7 @@ class StompProtocolHandler extends Proto
val rc = new DurableSubscriptionDestinationDTO()
rc.path = x.path
rc.subscription_id = decode_header(id)
- rc.filter = if (selector == null) null else selector._1
+ rc.selector = if (selector == null) null else selector._1
rc
case _ => die("A persistent subscription can only be used on a topic destination")
}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Thu Jun 16 16:40:37 2011
@@ -31,8 +31,8 @@
h1 Queue #{id}
- case x:DurableSubscriptionDestinationDTO =>
h1 Durable Subscription on #{id}
- - if( x.filter != null )
- p filter: ${x.filter}
+ - if( x.selector != null )
+ p selector: ${x.selector}
- case _ =>
h1 Temporary Queue