You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/09/07 16:40:59 UTC

svn commit: r993382 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/state/ main/java/org/apache/activemq/transport/failover/ test/java/org/apache/activemq/network/

Author: gtully
Date: Tue Sep  7 14:40:58 2010
New Revision: 993382

URL: http://svn.apache.org/viewvc?rev=993382&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2904 - fix issue with network connector failover recreating consumer in pull mode, add test. related to: https://issues.apache.org/activemq/browse/AMQ-2579

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=993382&r1=993381&r2=993382&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Tue Sep  7 14:40:58 2010
@@ -1872,6 +1872,8 @@ public class ActiveMQConnection implemen
         if (LOG.isDebugEnabled()) {
             LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
         }
+        signalInterruptionProcessingNeeded();
+
         for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
             ActiveMQSession s = i.next();
             s.clearMessagesInProgress();
@@ -2318,6 +2320,17 @@ public class ActiveMQConnection implemen
         }
     }
 
+    private void signalInterruptionProcessingNeeded() {
+        FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
+        if (failoverTransport != null) {
+            failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("notified failover transport (" + failoverTransport
+                        + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
+            }
+        }
+    }
+
     /*
      * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
      * will wait to receive re dispatched messages.

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=993382&r1=993381&r2=993382&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Tue Sep  7 14:40:58 2010
@@ -674,8 +674,9 @@ public class ConnectionStateTracker exte
         }
     }
 
-    public void transportInterrupted() {
-        for (ConnectionState connectionState : connectionStates.values()) {
+    public void transportInterrupted(ConnectionId connectionId) {
+        ConnectionState connectionState = connectionStates.get(connectionId);
+        if (connectionState != null) {
             connectionState.setConnectionInterruptProcessingComplete(false);
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=993382&r1=993381&r2=993382&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Tue Sep  7 14:40:58 2010
@@ -233,8 +233,6 @@ public class FailoverTransport implement
                 connectedTransportURI = null;
                 connected = false;
 
-                stateTracker.transportInterrupted();
-
                 // notify before any reconnect attempt so ack state can be
                 // whacked
                 if (transportListener != null) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java?rev=993382&r1=993381&r2=993382&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java Tue Sep  7 14:40:58 2010
@@ -51,6 +51,8 @@ public class FailoverStaticNetworkTest {
     
     protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception {
         BrokerService broker = new BrokerService();
+        broker.setUseJmx(true);
+        broker.getManagementContext().setCreateConnector(false);
         broker.setSslContext(sslContext);
         broker.setDeleteAllMessagesOnStartup(true);
         broker.setBrokerName("Broker_" + listenPort);
@@ -68,14 +70,14 @@ public class FailoverStaticNetworkTest {
     }
   
     @Before
-    public void init() throws Exception {
+    public void setUp() throws Exception {
         KeyManager[] km = SslBrokerServiceTest.getKeyManager();
         TrustManager[] tm = SslBrokerServiceTest.getTrustManager();
         sslContext = new SslContext(km, tm, null);
     }
     
     @After
-    public void cleanup() throws Exception {
+    public void tearDown() throws Exception {
         brokerB.stop();
         brokerB.waitUntilStopped();
         
@@ -83,6 +85,44 @@ public class FailoverStaticNetworkTest {
         brokerA.waitUntilStopped();
     }
 
+    @Test
+    public void testSendReceiveAfterReconnect() throws Exception {
+        brokerA = createBroker("tcp", "61617", null);
+        brokerA.start();
+        brokerB = createBroker("tcp", "62617", new String[]{"61617"});
+        brokerB.start();
+        doTestNetworkSendReceive();
+
+        LOG.info("stopping brokerA");
+        brokerA.stop();
+        brokerA.waitUntilStopped();
+
+        LOG.info("restarting brokerA");
+        brokerA = createBroker("tcp", "61617", null);
+        brokerA.start();
+
+        doTestNetworkSendReceive();
+    }
+
+    @Test
+    public void testSendReceiveFailover() throws Exception {
+        brokerA = createBroker("tcp", "61617", null);
+        brokerA.start();
+        brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"});
+        brokerB.start();
+        doTestNetworkSendReceive();
+
+        LOG.info("stopping brokerA");
+        brokerA.stop();
+        brokerA.waitUntilStopped();
+
+        LOG.info("restarting brokerA");
+        brokerA = createBroker("tcp", "63617", null);
+        brokerA.start();
+
+        doTestNetworkSendReceive();
+    }
+
     /**
      * networked broker started after target so first connect attempt succeeds
      * start order is important
@@ -95,7 +135,7 @@ public class FailoverStaticNetworkTest {
         brokerB = createBroker("tcp", "62617", new String[]{"61617","1111"});
         brokerB.start();
   
-        testNetworkSendReceive();
+        doTestNetworkSendReceive();
     }
 
     @Test
@@ -106,28 +146,35 @@ public class FailoverStaticNetworkTest {
         brokerB = createBroker("ssl", "62617", new String[]{"61617", "1111"});
         brokerB.start();
   
-        testNetworkSendReceive();
+        doTestNetworkSendReceive();
     }
 
-    private void testNetworkSendReceive() throws Exception, JMSException {
-        LOG.info("Creating Consumer on the networked broker ...");
+    private void doTestNetworkSendReceive() throws Exception, JMSException {
+        LOG.info("Creating Consumer on the networked brokerA ...");
         
         SslContext.setCurrentSslContext(sslContext);
-        // Create a consumer on brokerB
+        // Create a consumer on brokerA
         ConnectionFactory consFactory = createConnectionFactory(brokerA);
         Connection consConn = consFactory.createConnection();
         consConn.start();
         Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
         final MessageConsumer consumer = consSession.createConsumer(destination);
-        
+
+        LOG.info("publishing to brokerB");
+
         sendMessageTo(destination, brokerB);
         
-        assertTrue("consumer got message", Wait.waitFor(new Wait.Condition() {
+        boolean gotMessage = Wait.waitFor(new Wait.Condition() {
             public boolean isSatisified() throws Exception {
                 return consumer.receive(1000) != null;
             }      
-        }));
+        });
+        try {
+            consConn.close();
+        } catch (JMSException ignored) {
+        }
+        assertTrue("consumer on A got message", gotMessage);
     }
 
     private void sendMessageTo(ActiveMQDestination destination, BrokerService brokerService) throws Exception {