You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/01/31 15:41:35 UTC
svn commit: r1440988 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/region/
activemq-unit-tests/src/test/java/org/apache/activemq/network/
Author: gtully
Date: Thu Jan 31 14:41:35 2013
New Revision: 1440988
URL: http://svn.apache.org/viewvc?rev=1440988&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4285 - tidied up test to be tolerant of pitfalls of request/reply with temps over a network
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1440988&r1=1440987&r2=1440988&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Jan 31 14:41:35 2013
@@ -589,7 +589,7 @@ public abstract class AbstractRegion imp
try {
lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
} catch (Exception e) {
- LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e);
+ LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: " + control.getDestination(), e);
}
}
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=1440988&r1=1440987&r2=1440988&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java Thu Jan 31 14:41:35 2013
@@ -85,6 +85,12 @@ public class TempQueue extends Queue{
@Override
public void dispose(ConnectionContext context) throws IOException {
+ if (this.destinationStatistics.getMessages().getCount() > 0) {
+ LOG.info(getActiveMQDestination().getQualifiedName()
+ + " on dispose, purge of "
+ + this.destinationStatistics.getMessages().getCount() + " pending messages: " + messages);
+ // we may want to capture these message ids in an advisory
+ }
try {
purge();
} catch (Exception e) {
Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java?rev=1440988&r1=1440987&r2=1440988&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java Thu Jan 31 14:41:35 2013
@@ -17,10 +17,12 @@
package org.apache.activemq.network;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -33,7 +35,10 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.DestinationDoesNotExistException;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbstractDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.failover.FailoverTransport;
@@ -56,8 +61,10 @@ public class NetworkFailoverTest extends
protected BrokerService remoteBroker;
protected Session localSession;
protected Session remoteSession;
- protected ActiveMQQueue included = new ActiveMQQueue("include.test.foo");
- protected String consumerName = "durableSubs";
+ protected ActiveMQQueue included=new ActiveMQQueue("include.test.foo");
+ private AtomicInteger replyToNonExistDest = new AtomicInteger(0);
+ private AtomicInteger roundTripComplete = new AtomicInteger(0);
+ private AtomicInteger remoteDLQCount = new AtomicInteger(0);
public void testRequestReply() throws Exception {
final MessageProducer remoteProducer = remoteSession.createProducer(null);
@@ -65,15 +72,25 @@ public class NetworkFailoverTest extends
remoteConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message msg) {
+ final TextMessage textMsg = (TextMessage)msg;
try {
- TextMessage textMsg = (TextMessage) msg;
- String payload = "REPLY: " + textMsg.getText();
+ String payload = "REPLY: " + textMsg.getText() + ", " + textMsg.getJMSMessageID();
Destination replyTo;
replyTo = msg.getJMSReplyTo();
textMsg.clearBody();
textMsg.setText(payload);
LOG.info("*** Sending response: {}", textMsg.getText());
remoteProducer.send(replyTo, textMsg);
+ LOG.info("replied with: " + textMsg.getJMSMessageID());
+
+ } catch (DestinationDoesNotExistException expected) {
+ // been removed but not yet recreated
+ replyToNonExistDest.incrementAndGet();
+ try {
+ LOG.info("NED: " + textMsg.getJMSMessageID());
+ } catch (JMSException e) {
+ e.printStackTrace();
+ };
} catch (Exception e) {
LOG.warn("*** Responder listener caught exception: ", e);
e.printStackTrace();
@@ -86,20 +103,52 @@ public class NetworkFailoverTest extends
requestProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageConsumer requestConsumer = localSession.createConsumer(tempQueue);
+ // track remote dlq for forward failures
+ MessageConsumer dlqconsumer = remoteSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+ dlqconsumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ LOG.info("dlq " + message.getJMSMessageID());
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ remoteDLQCount.incrementAndGet();
+ }
+ });
+
// allow for consumer infos to perculate arround
Thread.sleep(2000);
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String payload = "test msg " + i;
- TextMessage msg = localSession.createTextMessage(payload);
- msg.setJMSReplyTo(tempQueue);
- requestProducer.send(msg);
- LOG.info("*** Failing over for iteration: #{}", i);
- ((FailoverTransport) ((TransportFilter) ((TransportFilter) ((ActiveMQConnection) localConnection).getTransport()).getNext()).getNext())
- .handleTransportFailure(new IOException("Forcing failover from test"));
- TextMessage result = (TextMessage) requestConsumer.receive(10000);
- assertNotNull(result);
- LOG.info("*** Iteration #{} got response: {}", i, result.getText());
+ long done = System.currentTimeMillis() + (MESSAGE_COUNT * 6000);
+ int i = 0;
+ while (MESSAGE_COUNT > roundTripComplete.get() + remoteDLQCount.get() + replyToNonExistDest.get()
+ && done > System.currentTimeMillis()) {
+ if ( i < MESSAGE_COUNT) {
+ String payload = "test msg " + i;
+ i++;
+ TextMessage msg = localSession.createTextMessage(payload);
+ msg.setJMSReplyTo(tempQueue);
+ requestProducer.send(msg);
+ LOG.info("Sent: " + msg.getJMSMessageID() +", Failing over");
+ ((FailoverTransport) ((TransportFilter) ((TransportFilter)
+ ((ActiveMQConnection) localConnection)
+ .getTransport()).getNext()).getNext())
+ .handleTransportFailure(new IOException("Forcing failover from test"));
+ }
+ TextMessage result = (TextMessage)requestConsumer.receive(5000);
+ if (result != null) {
+ LOG.info("Got reply: " + result.getJMSMessageID() + ", " + result.getText());
+ roundTripComplete.incrementAndGet();
+ }
}
+
+ LOG.info("complete: " + roundTripComplete.get()
+ + ", remoteDLQCount: " + remoteDLQCount.get()
+ + ", replyToNonExistDest: " + replyToNonExistDest.get());
+ assertEquals("complete:" + roundTripComplete.get()
+ + ", remoteDLQCount: " + remoteDLQCount.get()
+ + ", replyToNonExistDest: " + replyToNonExistDest.get(),
+ MESSAGE_COUNT, roundTripComplete.get() + remoteDLQCount.get() + replyToNonExistDest.get() );
}
@Override
@@ -132,21 +181,21 @@ public class NetworkFailoverTest extends
remoteBroker = createRemoteBroker();
remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ remoteBroker.setCacheTempDestinations(true);
remoteBroker.start();
localBroker = createLocalBroker();
localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ localBroker.setCacheTempDestinations(true);
localBroker.start();
String localURI = "tcp://localhost:61616";
String remoteURI = "tcp://localhost:61617";
- ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:(" + localURI + "," + remoteURI
- + ")?randomize=false&backup=true&trackMessages=true");
- // ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
+ ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:("+localURI+","+remoteURI+")?randomize=false&backup=false&trackMessages=true");
localConnection = fac.createConnection();
localConnection.setClientID("local");
localConnection.start();
- fac = new ActiveMQConnectionFactory("failover:(" + remoteURI + "," + localURI + ")?randomize=false&backup=true&trackMessages=true");
+ fac = new ActiveMQConnectionFactory("failover:("+remoteURI + ","+localURI+")?randomize=false&backup=false&trackMessages=true");
fac.setWatchTopicAdvisories(false);
remoteConnection = fac.createConnection();
remoteConnection.setClientID("remote");