You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/16 18:40:37 UTC

svn commit: r1136537 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ apollo-web...

Author: chirino
Date: Thu Jun 16 16:40:37 2011
New Revision: 1136537

URL: http://svn.apache.org/viewvc?rev=1136537&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-50 - Durable subscriptions can now be created on broker startup.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Thu Jun 16 16:40:37 2011
@@ -23,8 +23,6 @@ import org.apache.activemq.apollo.util._
 import collection.mutable.HashMap
 import org.apache.activemq.apollo.broker.store.QueueRecord
 import Buffer._
-import java.util.ArrayList
-import org.apache.activemq.apollo.dto._
 import path._
 import security.SecurityContext
 import java.util.concurrent.TimeUnit
@@ -32,6 +30,8 @@ import collection.mutable.ListBuffer._
 import collection.mutable.HashMap._
 import scala.Array
 import tools.nsc.doc.model.ProtectedInInstance
+import org.apache.activemq.apollo.dto._
+import java.util.{Arrays, ArrayList}
 
 trait DomainDestination {
 
@@ -645,21 +645,35 @@ class LocalRouter(val virtual_host:Virtu
   /////////////////////////////////////////////////////////////////////////////
 
   protected def create_configure_destinations {
+    import collection.JavaConversions._
     def create_configured_dests(list: ArrayList[_ <: StringIdDTO], d: Domain[_], f: (Array[String]) => DestinationDTO) = {
-      import collection.JavaConversions._
-      list.foreach {
-        dto =>
-          if (dto.id != null) {
-            val parts = destination_parser.parts(dto.id)
-            val path = destination_parser.decode_path(parts)
-            if (!PathParser.containsWildCards(path)) {
-              d.get_or_create_destination(path, f(parts), null)
-            }
+      list.foreach { dto =>
+        if (dto.id != null) {
+          val parts = destination_parser.parts(dto.id)
+          val path = destination_parser.decode_path(parts)
+          if (!PathParser.containsWildCards(path)) {
+            d.get_or_create_destination(path, f(parts), null)
           }
+        }
       }
     }
     create_configured_dests(virtual_host.config.queues, queue_domain, (parts) => new QueueDestinationDTO(parts))
     create_configured_dests(virtual_host.config.topics, topic_domain, (parts) => new TopicDestinationDTO(parts))
+
+    virtual_host.config.dsubs.foreach { dto =>
+      if (dto.id != null && dto.topic!=null ) {
+
+        // We will create the durable sub if it does not exist yet..
+        if( !topic_domain.durable_subscriptions_by_id.contains(dto.id) ) {
+          val destination = new DurableSubscriptionDestinationDTO()
+          destination.subscription_id = dto.id
+          destination.path = Arrays.asList(destination_parser.parts(dto.topic) : _ *)
+          destination.selector = dto.selector
+          _create_queue(QueueBinding.create(destination))
+        }
+
+      }
+    }
   }
 
   protected def _start(on_completed: Runnable) = {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Thu Jun 16 16:40:37 2011
@@ -239,10 +239,10 @@ class DurableSubscriptionQueueBinding(va
   }
 
   override def message_filter = {
-    if ( binding_dto.filter==null ) {
+    if ( binding_dto.selector==null ) {
       ConstantExpression.TRUE
     } else {
-      SelectorParser.parse(binding_dto.filter)
+      SelectorParser.parse(binding_dto.selector)
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java Thu Jun 16 16:40:37 2011
@@ -33,5 +33,19 @@ public class DurableSubscriptionDTO exte
     @XmlAttribute(name="id_regex")
     public String id_regex;
 
+    /**
+     * If the topic and id are specified, then the durable subscription
+     * can be eagerly created.
+     */
+    @XmlAttribute
+    public String topic;
+
+    /**
+     * An optional selector that the durable subscription will be created
+     * with when it's first eagerly created.
+     */
+    @XmlAttribute
+    public String selector;
+
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java Thu Jun 16 16:40:37 2011
@@ -20,7 +20,6 @@ import javax.xml.bind.annotation.XmlAcce
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlRootElement;
-import java.util.List;
 
 /**
  * <p>
@@ -33,7 +32,7 @@ import java.util.List;
 public class DurableSubscriptionDestinationDTO extends TopicDestinationDTO {
 
     @XmlAttribute
-    public String filter;
+    public String selector;
 
     @XmlAttribute(name="subscription_id")
     public String subscription_id;
@@ -54,7 +53,7 @@ public class DurableSubscriptionDestinat
 
         DurableSubscriptionDestinationDTO that = (DurableSubscriptionDestinationDTO) o;
 
-        if (filter != null ? !filter.equals(that.filter) : that.filter != null) return false;
+        if (selector != null ? !selector.equals(that.selector) : that.selector != null) return false;
         if (subscription_id != null ? !subscription_id.equals(that.subscription_id) : that.subscription_id != null)
             return false;
 
@@ -64,7 +63,7 @@ public class DurableSubscriptionDestinat
     @Override
     public int hashCode() {
         int result = super.hashCode();
-        result = 31 * result + (filter != null ? filter.hashCode() : 0);
+        result = 31 * result + (selector != null ? selector.hashCode() : 0);
         result = 31 * result + (subscription_id != null ? subscription_id.hashCode() : 0);
         return result;
     }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocolHandler.scala Thu Jun 16 16:40:37 2011
@@ -935,7 +935,7 @@ class StompProtocolHandler extends Proto
           val rc = new DurableSubscriptionDestinationDTO()
           rc.path = x.path
           rc.subscription_id = decode_header(id)
-          rc.filter = if (selector == null) null else selector._1
+          rc.selector = if (selector == null) null else selector._1
           rc
         case _ => die("A persistent subscription can only be used on a topic destination")
         }

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1136537&r1=1136536&r2=1136537&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Thu Jun 16 16:40:37 2011
@@ -31,8 +31,8 @@
     h1 Queue #{id}
   - case x:DurableSubscriptionDestinationDTO =>
     h1 Durable Subscription on #{id}
-    - if( x.filter != null )
-      p filter: ${x.filter}
+    - if( x.selector != null )
+      p selector: ${x.selector}
   - case _ =>
     h1 Temporary Queue