You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/11/17 12:49:28 UTC
svn commit: r718224 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/network/
main/java/org/apache/activemq/transport/discovery/multicast/
test/java/org/ap...
Author: gtully
Date: Mon Nov 17 03:49:28 2008
New Revision: 718224
URL: http://svn.apache.org/viewvc?rev=718224&view=rev
Log:
test case and logging improvements relating to cyclic/multicast discovery network of size 3 and topic message duplication with nteworkTTL > 1
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=718224&r1=718223&r2=718224&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Mon Nov 17 03:49:28 2008
@@ -1739,7 +1739,7 @@
/**
* Handles async client internal exceptions.
* A client internal exception is usually one that has been thrown
- * by a container runtie component during asynchronous processing of a
+ * by a container runtime component during asynchronous processing of a
* message that does not affect the connection itself.
* This method notifies the <code>ClientInternalExceptionListener</code> by invoking
* its <code>onException</code> method, if one has been registered with this connection.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=718224&r1=718223&r2=718224&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Mon Nov 17 03:49:28 2008
@@ -199,7 +199,7 @@
inAckRange = true;
}
if (inAckRange) {
- // Don't remove the nodes until we are committed.
+ // Don't remove the nodes until we are committed.
if (!context.isInTransaction()) {
dequeueCounter++;
if (!this.getConsumerInfo().isBrowser()) {
@@ -282,9 +282,6 @@
}else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
- // Acknowledge all dispatched messages up till the message id of
- // the
- // acknowledgment.
int index = 0;
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
final MessageReference node = iter.next();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=718224&r1=718223&r2=718224&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Mon Nov 17 03:49:28 2008
@@ -500,7 +500,7 @@
BrokerId[] path = info.getBrokerPath();
if (path != null && path.length >= networkTTL) {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " Ignoring Subscription " + info + " restricted to " + networkTTL + " network hops only");
+ LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL + " network hops only : " + info);
}
return;
}
@@ -508,24 +508,24 @@
// Ignore this consumer as it's a consumer we locally sent to
// the broker.
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already routed through this broker once");
+ LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", already routed through this broker once : " + info);
}
return;
}
if (!isPermissableDestination(info.getDestination())) {
- // ignore if not in the permited or in the excluded list
+ // ignore if not in the permitted or in the excluded list
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " destination " + info.getDestination() + " is not permiited");
+ LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", destination " + info.getDestination() + " is not permiited :" + info);
}
return;
}
if (addConsumerInfo(info)) {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " Forwarding sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
+ LOG.debug(configuration.getBrokerName() + " Forwarding sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug(configuration.getBrokerName() + " Ignoring sub " + info + " already subscribed to matching destination");
+ LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + " as already subscribed to matching destination : " + info);
}
}
} else if (data.getClass() == DestinationInfo.class) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=718224&r1=718223&r2=718224&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Mon Nov 17 03:49:28 2008
@@ -79,10 +79,11 @@
return;
}
if ( localURI.equals(uri) || (connectionFilter != null && !connectionFilter.connectTo(uri))) {
+ LOG.debug("not connecting loopback: " + uri);
return;
}
URI connectUri = uri;
- LOG.info("Establishing network connection between from " + localURIName + " to " + connectUri);
+ LOG.info("Establishing network connection from " + localURIName + " to " + connectUri);
Transport remoteTransport;
Transport localTransport;
@@ -213,10 +214,13 @@
String name = super.getName();
if (name == null) {
name = discoveryAgent.toString();
- ;
super.setName(name);
}
return name;
}
+ @Override
+ public String toString() {
+ return "DiscoveryNetworkConnector:" + getName() + ":" + getBrokerService();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?rev=718224&r1=718223&r2=718224&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java Mon Nov 17 03:49:28 2008
@@ -176,14 +176,7 @@
private long lastAdvertizeTime;
private AtomicBoolean started = new AtomicBoolean(false);
private boolean reportAdvertizeFailed = true;
-
- private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
- public Thread newThread(Runnable runable) {
- Thread t = new Thread(runable, "Multicast Discovery Agent Notifier");
- t.setDaemon(true);
- return t;
- }
- });
+ private Executor executor = null;
/**
* Set the discovery listener
@@ -304,7 +297,7 @@
mcast.joinGroup(inetAddress);
mcast.setSoTimeout((int)keepAliveInterval);
runner = new Thread(this);
- runner.setName("MulticastDiscovery: " + selfService);
+ runner.setName(this.toString() + ":" + runner.getName());
runner.setDaemon(true);
runner.start();
doAdvertizeSelf();
@@ -409,11 +402,9 @@
RemoteBrokerData data = brokersByService.get(service);
if (data == null) {
data = new RemoteBrokerData(brokerName, service);
- brokersByService.put(service, data);
-
+ brokersByService.put(service, data);
fireServiceAddEvent(data);
doAdvertizeSelf();
-
} else {
data.updateHeartBeat();
if (data.doRecovery()) {
@@ -433,7 +424,7 @@
}
private void doExpireOldServices() {
- long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
+ long expireTime = System.currentTimeMillis() - (keepAliveInterval * HEARTBEAT_MISS_BEFORE_DEATH);
for (Iterator<RemoteBrokerData> i = brokersByService.values().iterator(); i.hasNext();) {
RemoteBrokerData data = i.next();
if (data.getLastHeartBeat() < expireTime) {
@@ -467,7 +458,7 @@
// Have the listener process the event async so that
// he does not block this thread since we are doing time sensitive
// processing of events.
- executor.execute(new Runnable() {
+ getExecutor().execute(new Runnable() {
public void run() {
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
if (discoveryListener != null) {
@@ -482,11 +473,11 @@
if (discoveryListener != null) {
final DiscoveryEvent event = new DiscoveryEvent(data.service);
event.setBrokerName(data.brokerName);
-
+
// Have the listener process the event async so that
// he does not block this thread since we are doing time sensitive
// processing of events.
- executor.execute(new Runnable() {
+ getExecutor().execute(new Runnable() {
public void run() {
DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
if (discoveryListener != null) {
@@ -497,6 +488,20 @@
}
}
+ private Executor getExecutor() {
+ if (executor == null) {
+ final String threadName = "Notifier-" + this.toString();
+ executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ public Thread newThread(Runnable runable) {
+ Thread t = new Thread(runable, threadName);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ }
+ return executor;
+ }
+
public long getBackOffMultiplier() {
return backOffMultiplier;
}
@@ -540,4 +545,10 @@
public void setGroup(String group) {
this.group = group;
}
+
+ @Override
+ public String toString() {
+ return "MulticastDiscoveryAgent-"
+ + (selfService != null ? "advertise:" + selfService : "listener:" + this.discoveryListener);
+ }
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java?rev=718224&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java Mon Nov 17 03:49:28 2008
@@ -0,0 +1,229 @@
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import junit.framework.TestCase;
+
+public class NoDuplicateOnTopicNetworkTest extends TestCase {
+ private static final Log LOG = LogFactory
+ .getLog(NoDuplicateOnTopicNetworkTest.class);
+
+ private static final String MULTICAST_DEFAULT = "multicast://default";
+ private static final String BROKER_1 = "tcp://localhost:61626";
+ private static final String BROKER_2 = "tcp://localhost:61636";
+ private static final String BROKER_3 = "tcp://localhost:61646";
+ private BrokerService broker1;
+ private BrokerService broker2;
+ private BrokerService broker3;
+
+ private boolean dynamicOnly = false;
+ // no duplicates in cyclic network if networkTTL <=1
+ // when > 1, subscriptions perculate around resulting in duplicates as there is no
+ // memory of the original subscription.
+ // solution for 6.0 using org.apache.activemq.command.ConsumerInfo.getNetworkConsumerIds()
+ private int ttl = 1;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+
+ broker3 = createAndStartBroker("broker3", BROKER_3);
+ Thread.sleep(3000);
+ broker2 = createAndStartBroker("broker2", BROKER_2);
+ Thread.sleep(3000);
+ broker1 = createAndStartBroker("broker1", BROKER_1);
+ Thread.sleep(1000);
+ }
+
+ private BrokerService createAndStartBroker(String name, String addr)
+ throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setBrokerName(name);
+ broker.addConnector(addr).setDiscoveryUri(new URI(MULTICAST_DEFAULT));
+ broker.setUseJmx(false);
+
+ NetworkConnector networkConnector = broker
+ .addNetworkConnector(MULTICAST_DEFAULT);
+ networkConnector.setDecreaseNetworkConsumerPriority(true);
+ networkConnector.setDynamicOnly(dynamicOnly);
+ networkConnector.setNetworkTTL(ttl);
+
+ broker.start();
+
+ return broker;
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ broker1.stop();
+ broker2.stop();
+ broker3.stop();
+ super.tearDown();
+ }
+
+ public void testProducerConsumerTopic() throws Exception {
+ final String topicName = "broadcast";
+ Thread producerThread = new Thread(new Runnable() {
+ public void run() {
+ TopicWithDuplicateMessages producer = new TopicWithDuplicateMessages();
+ producer.setBrokerURL(BROKER_1);
+ producer.setTopicName(topicName);
+ try {
+ producer.produce();
+ } catch (JMSException e) {
+ fail("Unexpected " + e);
+ }
+ }
+ });
+
+ final TopicWithDuplicateMessages consumer = new TopicWithDuplicateMessages();
+ Thread consumerThread = new Thread(new Runnable() {
+ public void run() {
+ consumer.setBrokerURL(BROKER_2);
+ consumer.setTopicName(topicName);
+ try {
+ consumer.consumer();
+ consumer.getLatch().await(60, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ fail("Unexpected " + e);
+ }
+ }
+ });
+
+ consumerThread.start();
+ Thread.sleep(1000);
+ producerThread.start();
+ producerThread.join();
+ consumerThread.join();
+
+ Map<String, String> map = new HashMap<String, String>();
+ for (String msg : consumer.getMessageStrings()) {
+ assertTrue("is not a duplicate: " + msg, !map.containsKey(msg));
+ map.put(msg, msg);
+ }
+ assertEquals("got all required messages: " + map.size(), consumer
+ .getNumMessages(), map.size());
+ }
+
+ class TopicWithDuplicateMessages {
+ private String brokerURL;
+ private String topicName;
+ private Connection connection;
+ private Session session;
+ private Topic topic;
+ private MessageProducer producer;
+ private MessageConsumer consumer;
+
+ private List<String> receivedStrings = new ArrayList<String>();
+ private int numMessages = 10;
+ private CountDownLatch recievedLatch = new CountDownLatch(numMessages);
+
+ public CountDownLatch getLatch() {
+ return recievedLatch;
+ }
+
+ public List<String> getMessageStrings() {
+ return receivedStrings;
+ }
+
+ public String getBrokerURL() {
+ return brokerURL;
+ }
+
+ public void setBrokerURL(String brokerURL) {
+ this.brokerURL = brokerURL;
+ }
+
+ public String getTopicName() {
+ return topicName;
+ }
+
+ public void setTopicName(String topicName) {
+ this.topicName = topicName;
+ }
+
+ private void createConnection() throws JMSException {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(
+ brokerURL);
+ connection = factory.createConnection();
+ }
+
+ private void createTopic() throws JMSException {
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ topic = session.createTopic(topicName);
+ }
+
+ private void createProducer() throws JMSException {
+ producer = session.createProducer(topic);
+ }
+
+ private void createConsumer() throws JMSException {
+ consumer = session.createConsumer(topic);
+ consumer.setMessageListener(new MessageListener() {
+
+ public void onMessage(Message arg0) {
+ TextMessage msg = (TextMessage) arg0;
+ try {
+ LOG.debug("Received message [" + msg.getText() + "]");
+ receivedStrings.add(msg.getText());
+ recievedLatch.countDown();
+ } catch (JMSException e) {
+ fail("Unexpected :" + e);
+ }
+ }
+
+ });
+ }
+
+ private void publish() throws JMSException {
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage textMessage = session.createTextMessage();
+ String message = "message: " + i;
+ LOG.debug("Sending message[" + message + "]");
+ textMessage.setText(message);
+ producer.send(textMessage);
+ }
+ }
+
+ public void produce() throws JMSException {
+ createConnection();
+ createTopic();
+ createProducer();
+ connection.start();
+ publish();
+ }
+
+ public void consumer() throws JMSException {
+ createConnection();
+ createTopic();
+ createConsumer();
+ connection.start();
+ }
+
+ public int getNumMessages() {
+ return numMessages;
+ }
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date