You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/11/17 12:49:28 UTC

svn commit: r718224 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/network/ main/java/org/apache/activemq/transport/discovery/multicast/ test/java/org/ap...

Author: gtully
Date: Mon Nov 17 03:49:28 2008
New Revision: 718224

URL: http://svn.apache.org/viewvc?rev=718224&view=rev
Log:
test case and logging improvements relating to cyclic/multicast discovery network of size 3 and topic message duplication with nteworkTTL > 1

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=718224&r1=718223&r2=718224&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Mon Nov 17 03:49:28 2008
@@ -1739,7 +1739,7 @@
     /**
      * Handles async client internal exceptions.
      * A client internal exception is usually one that has been thrown
-     * by a container runtie component during asynchronous processing of a
+     * by a container runtime component during asynchronous processing of a
      * message that does not affect the connection itself.
      * This method notifies the <code>ClientInternalExceptionListener</code> by invoking
      * its <code>onException</code> method, if one has been registered with this connection.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=718224&r1=718223&r2=718224&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Nov 17 03:49:28 2008
@@ -199,7 +199,7 @@
                         inAckRange = true;
                     }
                     if (inAckRange) {
-                        // Don't remove the nodes until we are committed.
+                        // Don't remove the nodes until we are committed.  
                         if (!context.isInTransaction()) {
                             dequeueCounter++;
                             if (!this.getConsumerInfo().isBrowser()) {
@@ -282,9 +282,6 @@
             }else if (ack.isDeliveredAck()) {
                 // Message was delivered but not acknowledged: update pre-fetch
                 // counters.
-                // Acknowledge all dispatched messages up till the message id of
-                // the
-                // acknowledgment.
                 int index = 0;
                 for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
                     final MessageReference node = iter.next();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=718224&r1=718223&r2=718224&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Mon Nov 17 03:49:28 2008
@@ -500,7 +500,7 @@
             BrokerId[] path = info.getBrokerPath();
             if (path != null && path.length >= networkTTL) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
+                    LOG.debug(configuration.getBrokerName() + " Ignoring sub  from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
                 }
                 return;
             }
@@ -508,24 +508,24 @@
                 // Ignore this consumer as it's a consumer we locally sent to
                 // the broker.
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already routed through this broker once");
+                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
                 }
                 return;
             }
             if (!isPermissableDestination(info.getDestination())) {
-                // ignore if not in the permited or in the excluded list
+                // ignore if not in the permitted or in the excluded list
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
+                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
                 }
                 return;
             }
             if (addConsumerInfo(info)) {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " Forwarding sub on " + localBroker + " from " + remoteBrokerName + " :  " + info);
+                    LOG.debug(configuration.getBrokerName() + " Forwarding sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
                 }
             } else {
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination");
+                    LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
                 }
             }
         } else if (data.getClass() == DestinationInfo.class) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=718224&r1=718223&r2=718224&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Mon Nov 17 03:49:28 2008
@@ -79,10 +79,11 @@
                 return;
             }
             if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) {
+                LOG.debug("not connecting loopback: " + uri);
                 return;
             }
             URI connectUri = uri;
-            LOG.info("Establishing network connection between from " + localURIName + " to " + connectUri);
+            LOG.info("Establishing network connection from " + localURIName + " to " + connectUri);
 
             Transport remoteTransport;
             Transport localTransport;
@@ -213,10 +214,13 @@
         String name = super.getName();
         if (name == null) {
             name = discoveryAgent.toString();
-            ;
             super.setName(name);
         }
         return name;
     }
 
+    @Override
+    public String toString() {
+        return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService();
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=718224&r1=718223&r2=718224&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java Mon Nov 17 03:49:28 2008
@@ -176,14 +176,7 @@
     private long lastAdvertizeTime;
     private AtomicBoolean started = new AtomicBoolean(false);
     private boolean reportAdvertizeFailed = true;
-
-    private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-        public Thread newThread(Runnable runable) {
-            Thread t = new Thread(runable, "Multicast Discovery Agent Notifier");
-            t.setDaemon(true);
-            return t;
-        }
-    });
+    private Executor executor = null;
 
     /**
      * Set the discovery listener
@@ -304,7 +297,7 @@
             mcast.joinGroup(inetAddress);
             mcast.setSoTimeout((int)keepAliveInterval);
             runner = new Thread(this);
-            runner.setName("MulticastDiscovery: " + selfService);
+            runner.setName(this.toString() + ":" + runner.getName());
             runner.setDaemon(true);
             runner.start();
             doAdvertizeSelf();
@@ -409,11 +402,9 @@
             RemoteBrokerData data = brokersByService.get(service);
             if (data == null) {
                 data = new RemoteBrokerData(brokerName, service);
-                brokersByService.put(service, data);
-
+                brokersByService.put(service, data);      
                 fireServiceAddEvent(data);
                 doAdvertizeSelf();
-
             } else {
                 data.updateHeartBeat();
                 if (data.doRecovery()) {
@@ -433,7 +424,7 @@
     }
 
     private void doExpireOldServices() {
-        long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
+        long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH); 
         for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
             RemoteBrokerData data = i.next();
             if (data.getLastHeartBeat() < expireTime) {
@@ -467,7 +458,7 @@
             // Have the listener process the event async so that
             // he does not block this thread since we are doing time sensitive
             // processing of events.
-            executor.execute(new Runnable() {
+            getExecutor().execute(new Runnable() {
                 public void run() {
                     DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
                     if (discoveryListener != null) {
@@ -482,11 +473,11 @@
         if (discoveryListener != null) {
             final DiscoveryEvent event = new DiscoveryEvent(data.service);
             event.setBrokerName(data.brokerName);
-
+            
             // Have the listener process the event async so that
             // he does not block this thread since we are doing time sensitive
             // processing of events.
-            executor.execute(new Runnable() {
+            getExecutor().execute(new Runnable() {
                 public void run() {
                     DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
                     if (discoveryListener != null) {
@@ -497,6 +488,20 @@
         }
     }
 
+    private Executor getExecutor() {
+        if (executor == null) {
+            final String threadName = "Notifier-" + this.toString();
+            executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+                public Thread newThread(Runnable runable) {
+                    Thread t = new Thread(runable,  threadName);
+                    t.setDaemon(true);
+                    return t;
+                }
+            });
+        }
+        return executor;
+    }
+
     public long getBackOffMultiplier() {
         return backOffMultiplier;
     }
@@ -540,4 +545,10 @@
     public void setGroup(String group) {
         this.group = group;
     }
+    
+    @Override
+    public String toString() {
+        return  "MulticastDiscoveryAgent-"
+            + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
+    }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java?rev=718224&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java Mon Nov 17 03:49:28 2008
@@ -0,0 +1,229 @@
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import junit.framework.TestCase;
+
+public class NoDuplicateOnTopicNetworkTest extends TestCase {
+    private static final Log LOG = LogFactory
+            .getLog(NoDuplicateOnTopicNetworkTest.class);
+
+    private static final String MULTICAST_DEFAULT = "multicast://default";
+    private static final String BROKER_1 = "tcp://localhost:61626";
+    private static final String BROKER_2 = "tcp://localhost:61636";
+    private static final String BROKER_3 = "tcp://localhost:61646";
+    private BrokerService broker1;
+    private BrokerService broker2;
+    private BrokerService broker3;
+
+    private boolean dynamicOnly = false;
+    // no duplicates in cyclic network if networkTTL <=1
+    // when > 1, subscriptions perculate around resulting in duplicates as there is no
+    // memory of the original subscription.
+    // solution for 6.0 using org.apache.activemq.command.ConsumerInfo.getNetworkConsumerIds()
+    private int ttl = 1;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+
+        broker3 = createAndStartBroker("broker3", BROKER_3);
+        Thread.sleep(3000);
+        broker2 = createAndStartBroker("broker2", BROKER_2);
+        Thread.sleep(3000);
+        broker1 = createAndStartBroker("broker1", BROKER_1);
+        Thread.sleep(1000);
+    }
+
+    private BrokerService createAndStartBroker(String name, String addr)
+            throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName(name);
+        broker.addConnector(addr).setDiscoveryUri(new URI(MULTICAST_DEFAULT));
+        broker.setUseJmx(false);
+
+        NetworkConnector networkConnector = broker
+                .addNetworkConnector(MULTICAST_DEFAULT);
+        networkConnector.setDecreaseNetworkConsumerPriority(true);
+        networkConnector.setDynamicOnly(dynamicOnly);
+        networkConnector.setNetworkTTL(ttl);
+
+        broker.start();
+       
+        return broker;
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        broker1.stop();
+        broker2.stop();
+        broker3.stop();
+        super.tearDown();
+    }
+
+    public void testProducerConsumerTopic() throws Exception {
+        final String topicName = "broadcast";
+        Thread producerThread = new Thread(new Runnable() {
+            public void run() {
+                TopicWithDuplicateMessages producer = new TopicWithDuplicateMessages();
+                producer.setBrokerURL(BROKER_1);
+                producer.setTopicName(topicName);
+                try {
+                    producer.produce();
+                } catch (JMSException e) {
+                    fail("Unexpected " + e);
+                }
+            }
+        });
+
+        final TopicWithDuplicateMessages consumer = new TopicWithDuplicateMessages();
+        Thread consumerThread = new Thread(new Runnable() {
+            public void run() {
+                consumer.setBrokerURL(BROKER_2);
+                consumer.setTopicName(topicName);
+                try {
+                    consumer.consumer();
+                    consumer.getLatch().await(60, TimeUnit.SECONDS);
+                } catch (Exception e) {
+                    fail("Unexpected " + e);
+                }
+            }
+        });
+
+        consumerThread.start();
+        Thread.sleep(1000);
+        producerThread.start();
+        producerThread.join();
+        consumerThread.join();
+
+        Map<String, String> map = new HashMap<String, String>();
+        for (String msg : consumer.getMessageStrings()) {
+            assertTrue("is not a duplicate: " + msg, !map.containsKey(msg));
+            map.put(msg, msg);
+        }
+        assertEquals("got all required messages: " + map.size(), consumer
+                .getNumMessages(), map.size());
+    }
+
+    class TopicWithDuplicateMessages {
+        private String brokerURL;
+        private String topicName;
+        private Connection connection;
+        private Session session;
+        private Topic topic;
+        private MessageProducer producer;
+        private MessageConsumer consumer;
+
+        private List<String> receivedStrings = new ArrayList<String>();
+        private int numMessages = 10;
+        private CountDownLatch recievedLatch = new CountDownLatch(numMessages);
+
+        public CountDownLatch getLatch() {
+            return recievedLatch;
+        }
+
+        public List<String> getMessageStrings() {
+            return receivedStrings;
+        }
+
+        public String getBrokerURL() {
+            return brokerURL;
+        }
+
+        public void setBrokerURL(String brokerURL) {
+            this.brokerURL = brokerURL;
+        }
+
+        public String getTopicName() {
+            return topicName;
+        }
+
+        public void setTopicName(String topicName) {
+            this.topicName = topicName;
+        }
+
+        private void createConnection() throws JMSException {
+            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+                    brokerURL);
+            connection = factory.createConnection();
+        }
+
+        private void createTopic() throws JMSException {
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            topic = session.createTopic(topicName);
+        }
+
+        private void createProducer() throws JMSException {
+            producer = session.createProducer(topic);
+        }
+
+        private void createConsumer() throws JMSException {
+            consumer = session.createConsumer(topic);
+            consumer.setMessageListener(new MessageListener() {
+
+                public void onMessage(Message arg0) {
+                    TextMessage msg = (TextMessage) arg0;
+                    try {
+                        LOG.debug("Received message [" + msg.getText() + "]");
+                        receivedStrings.add(msg.getText());
+                        recievedLatch.countDown();
+                    } catch (JMSException e) {
+                        fail("Unexpected :" + e);
+                    }
+                }
+
+            });
+        }
+
+        private void publish() throws JMSException {
+            for (int i = 0; i < numMessages; i++) {
+                TextMessage textMessage = session.createTextMessage();
+                String message = "message: " + i;
+                LOG.debug("Sending message[" + message + "]");
+                textMessage.setText(message);
+                producer.send(textMessage);
+            }
+        }
+
+        public void produce() throws JMSException {
+            createConnection();
+            createTopic();
+            createProducer();
+            connection.start();
+            publish();
+        }
+
+        public void consumer() throws JMSException {
+            createConnection();
+            createTopic();
+            createConsumer();
+            connection.start();
+        }
+
+        public int getNumMessages() {
+            return numMessages;
+        }
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date