You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/05/25 17:15:50 UTC

svn commit: r1127551 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/

Author: chirino
Date: Wed May 25 15:15:50 2011
New Revision: 1127551

URL: http://svn.apache.org/viewvc?rev=1127551&view=rev
Log:
Simplifying the subscription model a bit by removing the client_id field.  Protocols like openwire can encode the client id into the durable subscription_id.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
    activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed May 25 15:15:50 2011
@@ -277,10 +277,10 @@ class LocalRouter(val virtual_host:Virtu
 
     // Stores durable subscription queues.
     val durable_subscriptions_by_path = new PathMap[Queue]()
-    val durable_subscriptions_by_id = HashMap[(String,String), Queue]()
+    val durable_subscriptions_by_id = HashMap[String, Queue]()
 
     def get_or_create_durable_subscription(destination:DurableSubscriptionDestinationDTO):Queue = {
-      val key = (destination.client_id, destination.subscription_id)
+      val key = destination.subscription_id
       durable_subscriptions_by_id.get( key ).getOrElse {
         val queue = _create_queue(QueueBinding.create(destination))
         durable_subscriptions_by_id.put(key, queue)
@@ -290,7 +290,7 @@ class LocalRouter(val virtual_host:Virtu
 
     def destroy_durable_subscription(queue:Queue):Unit = {
       val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
-      if( durable_subscriptions_by_id.remove( (destination.client_id, destination.subscription_id) ).isDefined ) {
+      if( durable_subscriptions_by_id.remove( destination.subscription_id ).isDefined ) {
         val path = queue.binding.destination
         durable_subscriptions_by_path.remove(path, queue)
         var matches = get_destination_matches(path)
@@ -327,7 +327,7 @@ class LocalRouter(val virtual_host:Virtu
       }
 
       durable_subscriptions_by_path.put(path, queue)
-      durable_subscriptions_by_id.put((destination.client_id, destination.subscription_id), queue)
+      durable_subscriptions_by_id.put(destination.subscription_id, queue)
 
       matches.foreach( _.bind_durable_subscription(destination, queue) )
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Wed May 25 15:15:50 2011
@@ -214,9 +214,6 @@ class DurableSubscriptionQueueBinding(va
     if( binding_dto.filter!=null ) {
       rc += " filtering '"+binding_dto.filter+"'"
     }
-    if( binding_dto.client_id!=null ) {
-      rc += " for client '"+binding_dto.client_id+"'"
-    }
     rc
   }
 
@@ -244,10 +241,7 @@ class DurableSubscriptionQueueBinding(va
         if( x.name != null && !decode_filter(x.name).matches(destination)) {
           return false
         }
-        if( x.client_id != null && x.client_id!=x.client_id ) {
-          return false
-        }
-        if( x.subscription_id != null && x.subscription_id!=x.subscription_id ) {
+        if( x.subscription_id != null && x.subscription_id!=binding_dto.subscription_id ) {
           return false
         }
         true

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed May 25 15:15:50 2011
@@ -43,10 +43,6 @@ class Topic(val router:LocalRouter, val 
 
   def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
 
-  def is_same_ds(sub1:DurableSubscriptionDestinationDTO, sub2:DurableSubscriptionDestinationDTO) = {
-    (sub1.client_id, sub1.subscription_id) == (sub2.client_id, sub2.subscription_id)
-  }
-
   def bind (destination: DestinationDTO, consumer:DeliveryConsumer) = {
     destination match {
       case null=> // unified queue case

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java Wed May 25 15:15:50 2011
@@ -28,12 +28,6 @@ import javax.xml.bind.annotation.*;
 public class DurableSubscriptionDTO extends QueueDTO {
 
     /**
-     * To narrow down matches to a client id
-     */
-    @XmlAttribute(name="client_id")
-    public String client_id;
-
-    /**
      * To narrow down matches to a subscription id
      */
     @XmlAttribute(name="subscription_id")

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java Wed May 25 15:15:50 2011
@@ -35,9 +35,6 @@ public class DurableSubscriptionDestinat
     @XmlAttribute
     public String filter;
 
-    @XmlAttribute(name="client_id")
-    public String client_id;
-
     @XmlAttribute(name="subscription_id")
     public String subscription_id;
 
@@ -54,12 +51,11 @@ public class DurableSubscriptionDestinat
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
+        if (!super.equals(o)) return false;
 
         DurableSubscriptionDestinationDTO that = (DurableSubscriptionDestinationDTO) o;
 
-        if (client_id != null ? !client_id.equals(that.client_id) : that.client_id != null) return false;
         if (filter != null ? !filter.equals(that.filter) : that.filter != null) return false;
-        if (parts != null ? !parts.equals(that.parts) : that.parts != null) return false;
         if (subscription_id != null ? !subscription_id.equals(that.subscription_id) : that.subscription_id != null)
             return false;
 
@@ -68,9 +64,8 @@ public class DurableSubscriptionDestinat
 
     @Override
     public int hashCode() {
-        int result = parts != null ? parts.hashCode() : 0;
+        int result = super.hashCode();
         result = 31 * result + (filter != null ? filter.hashCode() : 0);
-        result = 31 * result + (client_id != null ? client_id.hashCode() : 0);
         result = 31 * result + (subscription_id != null ? subscription_id.hashCode() : 0);
         return result;
     }

Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed May 25 15:15:50 2011
@@ -468,8 +468,13 @@ class OpenwireProtocolHandler extends Pr
         destination = destination.map { _ match {
           case x:TopicDestinationDTO=>
             val rc = new DurableSubscriptionDestinationDTO(x.parts)
-            rc.client_id = parent.parent.info.getClientId
-            rc.subscription_id = if( is_durable_sub ) info.getSubscriptionName else null
+            if( is_durable_sub ) {
+              rc.subscription_id = ""
+              if( parent.parent.info.getClientId != null ) {
+                rc.subscription_id += parent.parent.info.getClientId + ":"
+              }
+              rc.subscription_id += info.getSubscriptionName
+            }
             rc.filter = info.getSelector
             rc
           case _ => die("A durable subscription can only be used on a topic destination")