You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/19 17:28:57 UTC
svn commit: r786544 - in /activemq/sandbox/activemq-flow:
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/
activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/
activemq-openwire/src/test/java/org/apache/activemq/legacy/...
Author: chirino
Date: Fri Jun 19 15:28:56 2009
New Revision: 786544
URL: http://svn.apache.org/viewvc?rev=786544&view=rev
Log:
Putting back the Domain class in since it's going to have to do some synchronization.
Added:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java
Modified:
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java
activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java?rev=786544&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java Fri Jun 19 15:28:56 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.util.Collection;
+
+import org.apache.activemq.apollo.broker.path.PathMap;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public class Domain {
+
+ private final PathMap<DeliveryTarget> targets = new PathMap<DeliveryTarget>();
+
+ synchronized public void bind(AsciiBuffer name, DeliveryTarget queue) {
+ targets.put(name, queue);
+ }
+
+ synchronized public void unbind(AsciiBuffer name, DeliveryTarget queue) {
+ targets.remove(name, queue);
+ }
+
+ synchronized public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {
+ return targets.get(name);
+ }
+
+}
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java?rev=786544&r1=786543&r2=786544&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java Fri Jun 19 15:28:56 2009
@@ -22,7 +22,6 @@
import java.util.HashMap;
import java.util.HashSet;
-import org.apache.activemq.apollo.broker.path.PathMap;
import org.apache.activemq.flow.ISourceController;
import org.apache.activemq.protobuf.AsciiBuffer;
import org.apache.commons.logging.Log;
@@ -36,36 +35,40 @@
public static final AsciiBuffer TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
public static final AsciiBuffer TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue");
- private final HashMap<AsciiBuffer, PathMap<DeliveryTarget>> domains = new HashMap<AsciiBuffer, PathMap<DeliveryTarget>>();
+ private final HashMap<AsciiBuffer, Domain> domains = new HashMap<AsciiBuffer, Domain>();
private VirtualHost virtualHost;
private BrokerDatabase database;
public Router() {
- domains.put(QUEUE_DOMAIN, new PathMap<DeliveryTarget>());
- domains.put(TOPIC_DOMAIN, new PathMap<DeliveryTarget>());
- domains.put(TEMP_QUEUE_DOMAIN, new PathMap<DeliveryTarget>());
- domains.put(TEMP_TOPIC_DOMAIN, new PathMap<DeliveryTarget>());
+ domains.put(QUEUE_DOMAIN, new Domain());
+ domains.put(TOPIC_DOMAIN, new Domain());
+ domains.put(TEMP_QUEUE_DOMAIN, new Domain());
+ domains.put(TEMP_TOPIC_DOMAIN, new Domain());
}
- public PathMap<DeliveryTarget> getDomain(AsciiBuffer name) {
+ public Domain getDomain(Destination destination) {
+ return getDomain(destination.getDomain());
+ }
+
+ public Domain getDomain(AsciiBuffer name) {
return domains.get(name);
}
- public PathMap<DeliveryTarget> putDomain(AsciiBuffer name, PathMap<DeliveryTarget> domain) {
+ public Domain putDomain(AsciiBuffer name, Domain domain) {
return domains.put(name, domain);
}
- public PathMap<DeliveryTarget> removeDomain(AsciiBuffer name) {
+ public Domain removeDomain(AsciiBuffer name) {
return domains.remove(name);
}
public synchronized void bind(Destination destination, DeliveryTarget target) {
Collection<Destination> destinationList = destination.getDestinations();
if (destinationList == null) {
- PathMap<DeliveryTarget> domain = domains.get(destination.getDomain());
- domain.put(destination.getName(), target);
+ Domain domain = getDomain(destination);
+ domain.bind(destination.getName(), target);
} else {
for (Destination d : destinationList) {
bind(d, target);
@@ -76,8 +79,8 @@
public synchronized void unbind(Destination destination, DeliveryTarget target) {
Collection<Destination> destinationList = destination.getDestinations();
if (destinationList == null) {
- PathMap<DeliveryTarget> domain = domains.get(destination.getDomain());
- domain.remove(destination.getName(), target);
+ Domain domain = getDomain(destination);
+ domain.unbind(destination.getName(), target);
} else {
for (Destination d : destinationList) {
unbind(d, target);
@@ -121,8 +124,8 @@
// Handles routing to composite/multi destinations.
Collection<Destination> destinationList = destination.getDestinations();
if (destinationList == null) {
- PathMap<DeliveryTarget> domain = domains.get(destination.getDomain());
- Collection<DeliveryTarget> rc = domain.get(destination.getName());
+ Domain domain = getDomain(destination);
+ Collection<DeliveryTarget> rc = domain.route(destination.getName(), msg);
// We can auto create queues in the queue domain..
if(rc.isEmpty() && autoCreate && destination.getDomain().equals(Router.QUEUE_DOMAIN) ) {
try {
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=786544&r1=786543&r2=786544&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Fri Jun 19 15:28:56 2009
@@ -117,10 +117,10 @@
// Create Queue instances
for (IQueue<Long, MessageDelivery> iQueue : queueStore.getSharedQueues()) {
Queue queue = new Queue(iQueue);
- PathMap<DeliveryTarget> domain = router.getDomain(Router.QUEUE_DOMAIN);
+ Domain domain = router.getDomain(Router.QUEUE_DOMAIN);
Destination dest = new Destination.SingleDestination(Router.QUEUE_DOMAIN, iQueue.getDescriptor().getQueueName());
queue.setDestination(dest);
- domain.put(dest.getName(), queue);
+ domain.bind(dest.getName(), queue);
queues.put(dest.getName(), queue);
}
for (Queue queue : queues.values()) {
@@ -157,8 +157,8 @@
IQueue<Long, MessageDelivery> iQueue = queueStore.createSharedQueue(dest.getName().toString());
queue = new Queue(iQueue);
queue.setDestination(dest);
- PathMap<DeliveryTarget> domain = router.getDomain(dest.getDomain());
- domain.put(dest.getName(), queue);
+ Domain domain = router.getDomain(dest.getDomain());
+ domain.bind(dest.getName(), queue);
queues.put(dest.getName(), queue);
}
queue.start();
Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java?rev=786544&r1=786543&r2=786544&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java Fri Jun 19 15:28:56 2009
@@ -48,21 +48,23 @@
public abstract boolean matches(AsciiBuffer path);
public static PathFilter parseFilter(AsciiBuffer path) {
- ArrayList<AsciiBuffer> paths = PathSupport.parse(path);
- int idx = paths.size() - 1;
- if (idx >= 0) {
- AsciiBuffer lastPath = paths.get(idx);
- if (lastPath.equals(ANY_DESCENDENT)) {
- return new PrefixPathFilter(paths);
- } else {
- while (idx >= 0) {
- lastPath = paths.get(idx--);
- if (lastPath.equals(ANY_CHILD)) {
- return new WildcardPathFilter(paths);
- }
- }
- }
- }
+ if( containsWildCards(path) ) {
+ ArrayList<AsciiBuffer> paths = PathSupport.parse(path);
+ int idx = paths.size() - 1;
+ if (idx >= 0) {
+ AsciiBuffer lastPath = paths.get(idx);
+ if (lastPath.equals(ANY_DESCENDENT)) {
+ return new PrefixPathFilter(paths);
+ } else {
+ while (idx >= 0) {
+ lastPath = paths.get(idx--);
+ if (lastPath.equals(ANY_CHILD)) {
+ return new WildcardPathFilter(paths);
+ }
+ }
+ }
+ }
+ }
// if none of the paths contain a wildcard then use equality
return new SimplePathFilter(path);
Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786544&r1=786543&r2=786544&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Fri Jun 19 15:28:56 2009
@@ -43,6 +43,73 @@
public byte destinationType;
public boolean durableConsumer;
protected static final int MAX_NULL_WAIT=500;
+
+ public void initCombosForTestTopicNoLocal() {
+ addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+ Integer.valueOf(DeliveryMode.PERSISTENT)});
+ }
+
+ public void testTopicNoLocal() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo1);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setRetroactive(true);
+ consumerInfo1.setPrefetchSize(100);
+ consumerInfo1.setNoLocal(true);
+ connection1.send(consumerInfo1);
+
+ // Setup a second connection
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.send(producerInfo2);
+
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+ consumerInfo2.setRetroactive(true);
+ consumerInfo2.setPrefetchSize(100);
+ consumerInfo2.setNoLocal(true);
+ connection2.send(consumerInfo2);
+
+ // Send the messages
+ connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+ connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+
+ // The 2nd connection should get the messages.
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection2);
+ assertNotNull(m1);
+ }
+
+ // Send a message with the 2nd connection
+ Message message = createMessage(producerInfo2, destination, deliveryMode);
+ connection2.send(message);
+
+ // The first connection should not see the initial 4 local messages sent
+ // but should
+ // see the messages from connection 2.
+ Message m = receiveMessage(connection1);
+ assertNotNull(m);
+ assertEquals(message.getMessageId(), m.getMessageId());
+
+ assertNoMessagesLeft(connection1);
+ assertNoMessagesLeft(connection2);
+ }
+
public void initCombosForTestQueueSendThenAddConsumer() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
@@ -1070,71 +1137,6 @@
assertNull(connection1.getDispatchQueue().poll(MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
}
- public void initCombosForTestTopicNoLocal() {
- addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
- Integer.valueOf(DeliveryMode.PERSISTENT)});
- }
-
- public void testTopicNoLocal() throws Exception {
-
- ActiveMQDestination destination = new ActiveMQTopic("TEST");
-
- // Setup a first connection
- StubConnection connection1 = createConnection();
- ConnectionInfo connectionInfo1 = createConnectionInfo();
- SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
- ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
- connection1.send(connectionInfo1);
- connection1.send(sessionInfo1);
- connection1.send(producerInfo1);
-
- ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
- consumerInfo1.setRetroactive(true);
- consumerInfo1.setPrefetchSize(100);
- consumerInfo1.setNoLocal(true);
- connection1.send(consumerInfo1);
-
- // Setup a second connection
- StubConnection connection2 = createConnection();
- ConnectionInfo connectionInfo2 = createConnectionInfo();
- SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
- ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
- connection2.send(connectionInfo2);
- connection2.send(sessionInfo2);
- connection2.send(producerInfo2);
-
- ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
- consumerInfo2.setRetroactive(true);
- consumerInfo2.setPrefetchSize(100);
- consumerInfo2.setNoLocal(true);
- connection2.send(consumerInfo2);
-
- // Send the messages
- connection1.send(createMessage(producerInfo1, destination, deliveryMode));
- connection1.send(createMessage(producerInfo1, destination, deliveryMode));
- connection1.send(createMessage(producerInfo1, destination, deliveryMode));
- connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-
- // The 2nd connection should get the messages.
- for (int i = 0; i < 4; i++) {
- Message m1 = receiveMessage(connection2);
- assertNotNull(m1);
- }
-
- // Send a message with the 2nd connection
- Message message = createMessage(producerInfo2, destination, deliveryMode);
- connection2.send(message);
-
- // The first connection should not see the initial 4 local messages sent
- // but should
- // see the messages from connection 2.
- Message m = receiveMessage(connection1);
- assertNotNull(m);
- assertEquals(message.getMessageId(), m.getMessageId());
-
- assertNoMessagesLeft(connection1);
- assertNoMessagesLeft(connection2);
- }
public void initCombosForTopicDispatchIsBroadcast() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),