You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/05/25 17:15:50 UTC
svn commit: r1127551 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/
Author: chirino
Date: Wed May 25 15:15:50 2011
New Revision: 1127551
URL: http://svn.apache.org/viewvc?rev=1127551&view=rev
Log:
Simplifying the subscription model a bit by removing the client_id field. Protocols like openwire can encode the client id into the durable subscription_id.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Wed May 25 15:15:50 2011
@@ -277,10 +277,10 @@ class LocalRouter(val virtual_host:Virtu
// Stores durable subscription queues.
val durable_subscriptions_by_path = new PathMap[Queue]()
- val durable_subscriptions_by_id = HashMap[(String,String), Queue]()
+ val durable_subscriptions_by_id = HashMap[String, Queue]()
def get_or_create_durable_subscription(destination:DurableSubscriptionDestinationDTO):Queue = {
- val key = (destination.client_id, destination.subscription_id)
+ val key = destination.subscription_id
durable_subscriptions_by_id.get( key ).getOrElse {
val queue = _create_queue(QueueBinding.create(destination))
durable_subscriptions_by_id.put(key, queue)
@@ -290,7 +290,7 @@ class LocalRouter(val virtual_host:Virtu
def destroy_durable_subscription(queue:Queue):Unit = {
val destination = queue.binding.binding_dto.asInstanceOf[DurableSubscriptionDestinationDTO]
- if( durable_subscriptions_by_id.remove( (destination.client_id, destination.subscription_id) ).isDefined ) {
+ if( durable_subscriptions_by_id.remove( destination.subscription_id ).isDefined ) {
val path = queue.binding.destination
durable_subscriptions_by_path.remove(path, queue)
var matches = get_destination_matches(path)
@@ -327,7 +327,7 @@ class LocalRouter(val virtual_host:Virtu
}
durable_subscriptions_by_path.put(path, queue)
- durable_subscriptions_by_id.put((destination.client_id, destination.subscription_id), queue)
+ durable_subscriptions_by_id.put(destination.subscription_id, queue)
matches.foreach( _.bind_durable_subscription(destination, queue) )
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Wed May 25 15:15:50 2011
@@ -214,9 +214,6 @@ class DurableSubscriptionQueueBinding(va
if( binding_dto.filter!=null ) {
rc += " filtering '"+binding_dto.filter+"'"
}
- if( binding_dto.client_id!=null ) {
- rc += " for client '"+binding_dto.client_id+"'"
- }
rc
}
@@ -244,10 +241,7 @@ class DurableSubscriptionQueueBinding(va
if( x.name != null && !decode_filter(x.name).matches(destination)) {
return false
}
- if( x.client_id != null && x.client_id!=x.client_id ) {
- return false
- }
- if( x.subscription_id != null && x.subscription_id!=x.subscription_id ) {
+ if( x.subscription_id != null && x.subscription_id!=binding_dto.subscription_id ) {
return false
}
true
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Wed May 25 15:15:50 2011
@@ -43,10 +43,6 @@ class Topic(val router:LocalRouter, val
def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
- def is_same_ds(sub1:DurableSubscriptionDestinationDTO, sub2:DurableSubscriptionDestinationDTO) = {
- (sub1.client_id, sub1.subscription_id) == (sub2.client_id, sub2.subscription_id)
- }
-
def bind (destination: DestinationDTO, consumer:DeliveryConsumer) = {
destination match {
case null=> // unified queue case
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDTO.java Wed May 25 15:15:50 2011
@@ -28,12 +28,6 @@ import javax.xml.bind.annotation.*;
public class DurableSubscriptionDTO extends QueueDTO {
/**
- * To narrow down matches to a client id
- */
- @XmlAttribute(name="client_id")
- public String client_id;
-
- /**
* To narrow down matches to a subscription id
*/
@XmlAttribute(name="subscription_id")
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/DurableSubscriptionDestinationDTO.java Wed May 25 15:15:50 2011
@@ -35,9 +35,6 @@ public class DurableSubscriptionDestinat
@XmlAttribute
public String filter;
- @XmlAttribute(name="client_id")
- public String client_id;
-
@XmlAttribute(name="subscription_id")
public String subscription_id;
@@ -54,12 +51,11 @@ public class DurableSubscriptionDestinat
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
DurableSubscriptionDestinationDTO that = (DurableSubscriptionDestinationDTO) o;
- if (client_id != null ? !client_id.equals(that.client_id) : that.client_id != null) return false;
if (filter != null ? !filter.equals(that.filter) : that.filter != null) return false;
- if (parts != null ? !parts.equals(that.parts) : that.parts != null) return false;
if (subscription_id != null ? !subscription_id.equals(that.subscription_id) : that.subscription_id != null)
return false;
@@ -68,9 +64,8 @@ public class DurableSubscriptionDestinat
@Override
public int hashCode() {
- int result = parts != null ? parts.hashCode() : 0;
+ int result = super.hashCode();
result = 31 * result + (filter != null ? filter.hashCode() : 0);
- result = 31 * result + (client_id != null ? client_id.hashCode() : 0);
result = 31 * result + (subscription_id != null ? subscription_id.hashCode() : 0);
return result;
}
Modified: activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala?rev=1127551&r1=1127550&r2=1127551&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/src/main/scala/org/apache/activemq/apollo/openwire/OpenwireProtocolHandler.scala Wed May 25 15:15:50 2011
@@ -468,8 +468,13 @@ class OpenwireProtocolHandler extends Pr
destination = destination.map { _ match {
case x:TopicDestinationDTO=>
val rc = new DurableSubscriptionDestinationDTO(x.parts)
- rc.client_id = parent.parent.info.getClientId
- rc.subscription_id = if( is_durable_sub ) info.getSubscriptionName else null
+ if( is_durable_sub ) {
+ rc.subscription_id = ""
+ if( parent.parent.info.getClientId != null ) {
+ rc.subscription_id += parent.parent.info.getClientId + ":"
+ }
+ rc.subscription_id += info.getSubscriptionName
+ }
rc.filter = info.getSelector
rc
case _ => die("A durable subscription can only be used on a topic destination")