You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2009/06/19 17:28:57 UTC

svn commit: r786544 - in /activemq/sandbox/activemq-flow: activemq-broker/src/main/java/org/apache/activemq/apollo/broker/ activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/ activemq-openwire/src/test/java/org/apache/activemq/legacy/...

Author: chirino
Date: Fri Jun 19 15:28:56 2009
New Revision: 786544

URL: http://svn.apache.org/viewvc?rev=786544&view=rev
Log:
Putting back the Domain class in since it's going to have to do some synchronization.


Added:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java
Modified:
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
    activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java
    activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java

Added: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java?rev=786544&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java (added)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Domain.java Fri Jun 19 15:28:56 2009
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker;
+
+import java.util.Collection;
+
+import org.apache.activemq.apollo.broker.path.PathMap;
+import org.apache.activemq.protobuf.AsciiBuffer;
+
+public class Domain {
+
+    private final PathMap<DeliveryTarget> targets = new PathMap<DeliveryTarget>();
+
+    synchronized public void bind(AsciiBuffer name, DeliveryTarget queue) {
+        targets.put(name, queue);
+    }
+    
+    synchronized public void unbind(AsciiBuffer name, DeliveryTarget queue) {
+        targets.remove(name, queue);
+    }
+
+    synchronized public Collection<DeliveryTarget> route(AsciiBuffer name, MessageDelivery delivery) {
+        return targets.get(name);
+    }
+
+}

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java?rev=786544&r1=786543&r2=786544&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/Router.java Fri Jun 19 15:28:56 2009
@@ -22,7 +22,6 @@
 import java.util.HashMap;
 import java.util.HashSet;
 
-import org.apache.activemq.apollo.broker.path.PathMap;
 import org.apache.activemq.flow.ISourceController;
 import org.apache.activemq.protobuf.AsciiBuffer;
 import org.apache.commons.logging.Log;
@@ -36,36 +35,40 @@
     public static final AsciiBuffer TEMP_TOPIC_DOMAIN = new AsciiBuffer("temp-topic");
     public static final AsciiBuffer TEMP_QUEUE_DOMAIN = new AsciiBuffer("temp-queue");
 
-    private final HashMap<AsciiBuffer, PathMap<DeliveryTarget>> domains = new HashMap<AsciiBuffer, PathMap<DeliveryTarget>>();
+    private final HashMap<AsciiBuffer, Domain> domains = new HashMap<AsciiBuffer, Domain>();
     
     private VirtualHost virtualHost;
     private BrokerDatabase database;
     
 
     public Router() {
-        domains.put(QUEUE_DOMAIN, new PathMap<DeliveryTarget>());
-        domains.put(TOPIC_DOMAIN, new PathMap<DeliveryTarget>());
-        domains.put(TEMP_QUEUE_DOMAIN, new PathMap<DeliveryTarget>());
-        domains.put(TEMP_TOPIC_DOMAIN, new PathMap<DeliveryTarget>());
+        domains.put(QUEUE_DOMAIN, new Domain());
+        domains.put(TOPIC_DOMAIN, new Domain());
+        domains.put(TEMP_QUEUE_DOMAIN, new Domain());
+        domains.put(TEMP_TOPIC_DOMAIN, new Domain());
     }
 
-    public PathMap<DeliveryTarget> getDomain(AsciiBuffer name) {
+    public Domain getDomain(Destination destination) {
+        return getDomain(destination.getDomain());
+    }
+
+    public Domain getDomain(AsciiBuffer name) {
         return domains.get(name);
     }
 
-    public PathMap<DeliveryTarget> putDomain(AsciiBuffer name, PathMap<DeliveryTarget> domain) {
+    public Domain putDomain(AsciiBuffer name, Domain domain) {
         return domains.put(name, domain);
     }
 
-    public PathMap<DeliveryTarget> removeDomain(AsciiBuffer name) {
+    public Domain removeDomain(AsciiBuffer name) {
         return domains.remove(name);
     }
 
     public synchronized void bind(Destination destination, DeliveryTarget target) {
         Collection<Destination> destinationList = destination.getDestinations();
         if (destinationList == null) {
-        	PathMap<DeliveryTarget> domain = domains.get(destination.getDomain());
-            domain.put(destination.getName(), target);
+        	Domain domain = getDomain(destination);
+            domain.bind(destination.getName(), target);
         } else {
             for (Destination d : destinationList) {
                 bind(d, target);
@@ -76,8 +79,8 @@
     public synchronized void unbind(Destination destination, DeliveryTarget target) {
         Collection<Destination> destinationList = destination.getDestinations();
         if (destinationList == null) {
-        	PathMap<DeliveryTarget> domain = domains.get(destination.getDomain());
-            domain.remove(destination.getName(), target);
+        	Domain domain = getDomain(destination);
+            domain.unbind(destination.getName(), target);
         } else {
             for (Destination d : destinationList) {
                 unbind(d, target);
@@ -121,8 +124,8 @@
         // Handles routing to composite/multi destinations.
         Collection<Destination> destinationList = destination.getDestinations();
         if (destinationList == null) {
-        	PathMap<DeliveryTarget> domain = domains.get(destination.getDomain());
-            Collection<DeliveryTarget> rc = domain.get(destination.getName());
+        	Domain domain = getDomain(destination);
+            Collection<DeliveryTarget> rc = domain.route(destination.getName(), msg);
             // We can auto create queues in the queue domain..
             if(rc.isEmpty() && autoCreate && destination.getDomain().equals(Router.QUEUE_DOMAIN) ) {
             	try {

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java?rev=786544&r1=786543&r2=786544&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/VirtualHost.java Fri Jun 19 15:28:56 2009
@@ -117,10 +117,10 @@
         // Create Queue instances
         for (IQueue<Long, MessageDelivery> iQueue : queueStore.getSharedQueues()) {
             Queue queue = new Queue(iQueue);
-            PathMap<DeliveryTarget> domain = router.getDomain(Router.QUEUE_DOMAIN);
+            Domain domain = router.getDomain(Router.QUEUE_DOMAIN);
             Destination dest = new Destination.SingleDestination(Router.QUEUE_DOMAIN, iQueue.getDescriptor().getQueueName());
             queue.setDestination(dest);
-            domain.put(dest.getName(), queue);
+            domain.bind(dest.getName(), queue);
             queues.put(dest.getName(), queue);
         }
         for (Queue queue : queues.values()) {
@@ -157,8 +157,8 @@
             IQueue<Long, MessageDelivery> iQueue = queueStore.createSharedQueue(dest.getName().toString());
             queue = new Queue(iQueue);
             queue.setDestination(dest);
-            PathMap<DeliveryTarget> domain = router.getDomain(dest.getDomain());
-            domain.put(dest.getName(), queue);
+            Domain domain = router.getDomain(dest.getDomain());
+            domain.bind(dest.getName(), queue);
             queues.put(dest.getName(), queue);
         }
         queue.start();

Modified: activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java?rev=786544&r1=786543&r2=786544&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java (original)
+++ activemq/sandbox/activemq-flow/activemq-broker/src/main/java/org/apache/activemq/apollo/broker/path/PathFilter.java Fri Jun 19 15:28:56 2009
@@ -48,21 +48,23 @@
     public abstract boolean matches(AsciiBuffer path);
 
     public static PathFilter parseFilter(AsciiBuffer path) {
-        ArrayList<AsciiBuffer> paths = PathSupport.parse(path);
-        int idx = paths.size() - 1;
-        if (idx >= 0) {
-        	AsciiBuffer lastPath = paths.get(idx);
-            if (lastPath.equals(ANY_DESCENDENT)) {
-                return new PrefixPathFilter(paths);
-            } else {
-                while (idx >= 0) {
-                    lastPath = paths.get(idx--);
-                    if (lastPath.equals(ANY_CHILD)) {
-                        return new WildcardPathFilter(paths);
-                    }
-                }
-            }
-        }
+    	if( containsWildCards(path) ) { 
+	        ArrayList<AsciiBuffer> paths = PathSupport.parse(path);
+	        int idx = paths.size() - 1;
+	        if (idx >= 0) {
+	        	AsciiBuffer lastPath = paths.get(idx);
+	            if (lastPath.equals(ANY_DESCENDENT)) {
+	                return new PrefixPathFilter(paths);
+	            } else {
+	                while (idx >= 0) {
+	                    lastPath = paths.get(idx--);
+	                    if (lastPath.equals(ANY_CHILD)) {
+	                        return new WildcardPathFilter(paths);
+	                    }
+	                }
+	            }
+	        }
+    	}
 
         // if none of the paths contain a wildcard then use equality
         return new SimplePathFilter(path);

Modified: activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java?rev=786544&r1=786543&r2=786544&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java (original)
+++ activemq/sandbox/activemq-flow/activemq-openwire/src/test/java/org/apache/activemq/legacy/openwireprotocol/BrokerTest.java Fri Jun 19 15:28:56 2009
@@ -43,6 +43,73 @@
     public byte destinationType;
     public boolean durableConsumer;
     protected static final int MAX_NULL_WAIT=500;
+    
+    public void initCombosForTestTopicNoLocal() {
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
+                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
+    }
+
+    public void testTopicNoLocal() throws Exception {
+
+        ActiveMQDestination destination = new ActiveMQTopic("TEST");
+
+        // Setup a first connection
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo1);
+
+        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+        consumerInfo1.setRetroactive(true);
+        consumerInfo1.setPrefetchSize(100);
+        consumerInfo1.setNoLocal(true);
+        connection1.send(consumerInfo1);
+
+        // Setup a second connection
+        StubConnection connection2 = createConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        connection2.send(producerInfo2);
+
+        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+        consumerInfo2.setRetroactive(true);
+        consumerInfo2.setPrefetchSize(100);
+        consumerInfo2.setNoLocal(true);
+        connection2.send(consumerInfo2);
+
+        // Send the messages
+        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+
+        // The 2nd connection should get the messages.
+        for (int i = 0; i < 4; i++) {
+            Message m1 = receiveMessage(connection2);
+            assertNotNull(m1);
+        }
+
+        // Send a message with the 2nd connection
+        Message message = createMessage(producerInfo2, destination, deliveryMode);
+        connection2.send(message);
+
+        // The first connection should not see the initial 4 local messages sent
+        // but should
+        // see the messages from connection 2.
+        Message m = receiveMessage(connection1);
+        assertNotNull(m);
+        assertEquals(message.getMessageId(), m.getMessageId());
+
+        assertNoMessagesLeft(connection1);
+        assertNoMessagesLeft(connection2);
+    }
+    
     public void initCombosForTestQueueSendThenAddConsumer() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
                                                            Integer.valueOf(DeliveryMode.PERSISTENT)});
@@ -1070,71 +1137,6 @@
         assertNull(connection1.getDispatchQueue().poll(MAX_NULL_WAIT, TimeUnit.MILLISECONDS));
     }
 
-    public void initCombosForTestTopicNoLocal() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
-                                                           Integer.valueOf(DeliveryMode.PERSISTENT)});
-    }
-
-    public void testTopicNoLocal() throws Exception {
-
-        ActiveMQDestination destination = new ActiveMQTopic("TEST");
-
-        // Setup a first connection
-        StubConnection connection1 = createConnection();
-        ConnectionInfo connectionInfo1 = createConnectionInfo();
-        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
-        ProducerInfo producerInfo1 = createProducerInfo(sessionInfo1);
-        connection1.send(connectionInfo1);
-        connection1.send(sessionInfo1);
-        connection1.send(producerInfo1);
-
-        ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
-        consumerInfo1.setRetroactive(true);
-        consumerInfo1.setPrefetchSize(100);
-        consumerInfo1.setNoLocal(true);
-        connection1.send(consumerInfo1);
-
-        // Setup a second connection
-        StubConnection connection2 = createConnection();
-        ConnectionInfo connectionInfo2 = createConnectionInfo();
-        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
-        ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
-        connection2.send(connectionInfo2);
-        connection2.send(sessionInfo2);
-        connection2.send(producerInfo2);
-
-        ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
-        consumerInfo2.setRetroactive(true);
-        consumerInfo2.setPrefetchSize(100);
-        consumerInfo2.setNoLocal(true);
-        connection2.send(consumerInfo2);
-
-        // Send the messages
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-        connection1.send(createMessage(producerInfo1, destination, deliveryMode));
-
-        // The 2nd connection should get the messages.
-        for (int i = 0; i < 4; i++) {
-            Message m1 = receiveMessage(connection2);
-            assertNotNull(m1);
-        }
-
-        // Send a message with the 2nd connection
-        Message message = createMessage(producerInfo2, destination, deliveryMode);
-        connection2.send(message);
-
-        // The first connection should not see the initial 4 local messages sent
-        // but should
-        // see the messages from connection 2.
-        Message m = receiveMessage(connection1);
-        assertNotNull(m);
-        assertEquals(message.getMessageId(), m.getMessageId());
-
-        assertNoMessagesLeft(connection1);
-        assertNoMessagesLeft(connection2);
-    }
 
     public void initCombosForTopicDispatchIsBroadcast() {
         addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),