You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/01/31 15:41:35 UTC

svn commit: r1440988 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-unit-tests/src/test/java/org/apache/activemq/network/

Author: gtully
Date: Thu Jan 31 14:41:35 2013
New Revision: 1440988

URL: http://svn.apache.org/viewvc?rev=1440988&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-4285 - tidied up test to be tolerant of pitfalls of request/reply with temps over a network

Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=1440988&r1=1440987&r2=1440988&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Thu Jan 31 14:41:35 2013
@@ -589,7 +589,7 @@ public abstract class AbstractRegion imp
             try {
                 lookup(consumerExchange.getConnectionContext(), control.getDestination(),false).wakeup();
             } catch (Exception e) {
-                LOG.warn("failed to deliver consumerControl to destination: " + control.getDestination(), e);
+                LOG.warn("failed to deliver post consumerControl dispatch-wakeup, to destination: " + control.getDestination(), e);
             }
         }
     }

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=1440988&r1=1440987&r2=1440988&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TempQueue.java Thu Jan 31 14:41:35 2013
@@ -85,6 +85,12 @@ public class TempQueue extends Queue{
     
     @Override
     public void dispose(ConnectionContext context) throws IOException {
+        if (this.destinationStatistics.getMessages().getCount() > 0) {
+            LOG.info(getActiveMQDestination().getQualifiedName()
+                            + " on dispose, purge of "
+                            + this.destinationStatistics.getMessages().getCount() + " pending messages: " + messages);
+            // we may want to capture these message ids in an advisory
+        }
         try {
            purge();
         } catch (Exception e) {

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java?rev=1440988&r1=1440987&r2=1440988&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java (original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java Thu Jan 31 14:41:35 2013
@@ -17,10 +17,12 @@
 package org.apache.activemq.network;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -33,7 +35,10 @@ import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.DestinationDoesNotExistException;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.AbstractDeadLetterStrategy;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.failover.FailoverTransport;
@@ -56,8 +61,10 @@ public class NetworkFailoverTest extends
     protected BrokerService remoteBroker;
     protected Session localSession;
     protected Session remoteSession;
-    protected ActiveMQQueue included = new ActiveMQQueue("include.test.foo");
-    protected String consumerName = "durableSubs";
+    protected ActiveMQQueue included=new ActiveMQQueue("include.test.foo");
+    private AtomicInteger replyToNonExistDest = new AtomicInteger(0);
+    private AtomicInteger roundTripComplete = new AtomicInteger(0);
+    private AtomicInteger remoteDLQCount = new AtomicInteger(0);
 
     public void testRequestReply() throws Exception {
         final MessageProducer remoteProducer = remoteSession.createProducer(null);
@@ -65,15 +72,25 @@ public class NetworkFailoverTest extends
         remoteConsumer.setMessageListener(new MessageListener() {
             @Override
             public void onMessage(Message msg) {
+                final TextMessage textMsg = (TextMessage)msg;
                 try {
-                    TextMessage textMsg = (TextMessage) msg;
-                    String payload = "REPLY: " + textMsg.getText();
+                    String payload = "REPLY: " + textMsg.getText() + ", " + textMsg.getJMSMessageID();
                     Destination replyTo;
                     replyTo = msg.getJMSReplyTo();
                     textMsg.clearBody();
                     textMsg.setText(payload);
                     LOG.info("*** Sending response: {}", textMsg.getText());
                     remoteProducer.send(replyTo, textMsg);
+                    LOG.info("replied with: " + textMsg.getJMSMessageID());
+
+                } catch (DestinationDoesNotExistException expected) {
+                    // been removed but not yet recreated
+                    replyToNonExistDest.incrementAndGet();
+                    try {
+                        LOG.info("NED: " + textMsg.getJMSMessageID());
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    };
                 } catch (Exception e) {
                     LOG.warn("*** Responder listener caught exception: ", e);
                     e.printStackTrace();
@@ -86,20 +103,52 @@ public class NetworkFailoverTest extends
         requestProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
         MessageConsumer requestConsumer = localSession.createConsumer(tempQueue);
 
+        // track remote dlq for forward failures
+        MessageConsumer dlqconsumer = remoteSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
+        dlqconsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    LOG.info("dlq " + message.getJMSMessageID());
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+                remoteDLQCount.incrementAndGet();
+            }
+        });
+       
         // allow for consumer infos to perculate arround
         Thread.sleep(2000);
-        for (int i = 0; i < MESSAGE_COUNT; i++) {
-            String payload = "test msg " + i;
-            TextMessage msg = localSession.createTextMessage(payload);
-            msg.setJMSReplyTo(tempQueue);
-            requestProducer.send(msg);
-            LOG.info("*** Failing over for iteration: #{}", i);
-            ((FailoverTransport) ((TransportFilter) ((TransportFilter) ((ActiveMQConnection) localConnection).getTransport()).getNext()).getNext())
-                .handleTransportFailure(new IOException("Forcing failover from test"));
-            TextMessage result = (TextMessage) requestConsumer.receive(10000);
-            assertNotNull(result);
-            LOG.info("*** Iteration #{} got response: {}", i, result.getText());
+        long done = System.currentTimeMillis() + (MESSAGE_COUNT * 6000);
+        int i = 0;
+        while (MESSAGE_COUNT > roundTripComplete.get() + remoteDLQCount.get() + replyToNonExistDest.get()
+                && done > System.currentTimeMillis()) {
+            if  ( i < MESSAGE_COUNT) {
+                String payload = "test msg " + i;
+                i++;
+                TextMessage msg = localSession.createTextMessage(payload);
+                msg.setJMSReplyTo(tempQueue);
+                requestProducer.send(msg);
+                LOG.info("Sent: " + msg.getJMSMessageID() +", Failing over");
+                ((FailoverTransport) ((TransportFilter) ((TransportFilter)
+                        ((ActiveMQConnection) localConnection)
+                                .getTransport()).getNext()).getNext())
+                        .handleTransportFailure(new IOException("Forcing failover from test"));
+            }
+            TextMessage result = (TextMessage)requestConsumer.receive(5000);
+            if (result != null) {
+                LOG.info("Got reply: " + result.getJMSMessageID() + ", " + result.getText());
+                roundTripComplete.incrementAndGet();
+            }
         }
+
+        LOG.info("complete: " + roundTripComplete.get()
+                        + ", remoteDLQCount: " + remoteDLQCount.get()
+                        + ", replyToNonExistDest: " + replyToNonExistDest.get());
+        assertEquals("complete:" + roundTripComplete.get()
+                + ", remoteDLQCount: " + remoteDLQCount.get()
+                + ", replyToNonExistDest: " + replyToNonExistDest.get(),
+                MESSAGE_COUNT, roundTripComplete.get() + remoteDLQCount.get() + replyToNonExistDest.get() );
     }
 
     @Override
@@ -132,21 +181,21 @@ public class NetworkFailoverTest extends
 
         remoteBroker = createRemoteBroker();
         remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        remoteBroker.setCacheTempDestinations(true);
         remoteBroker.start();
 
         localBroker = createLocalBroker();
         localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        localBroker.setCacheTempDestinations(true);
         localBroker.start();
 
         String localURI = "tcp://localhost:61616";
         String remoteURI = "tcp://localhost:61617";
-        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:(" + localURI + "," + remoteURI
-            + ")?randomize=false&backup=true&trackMessages=true");
-        // ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
+        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:("+localURI+","+remoteURI+")?randomize=false&backup=false&trackMessages=true");
         localConnection = fac.createConnection();
         localConnection.setClientID("local");
         localConnection.start();
-        fac = new ActiveMQConnectionFactory("failover:(" + remoteURI + "," + localURI + ")?randomize=false&backup=true&trackMessages=true");
+        fac = new ActiveMQConnectionFactory("failover:("+remoteURI + ","+localURI+")?randomize=false&backup=false&trackMessages=true");
         fac.setWatchTopicAdvisories(false);
         remoteConnection = fac.createConnection();
         remoteConnection.setClientID("remote");