You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/09/07 16:40:59 UTC
svn commit: r993382 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/state/
main/java/org/apache/activemq/transport/failover/
test/java/org/apache/activemq/network/
Author: gtully
Date: Tue Sep 7 14:40:58 2010
New Revision: 993382
URL: http://svn.apache.org/viewvc?rev=993382&view=rev
Log:
resolve: https://issues.apache.org/activemq/browse/AMQ-2904 - fix issue with network connector failover recreating consumer in pull mode, add test. related to: https://issues.apache.org/activemq/browse/AMQ-2579
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=993382&r1=993381&r2=993382&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Tue Sep 7 14:40:58 2010
@@ -1872,6 +1872,8 @@ public class ActiveMQConnection implemen
if (LOG.isDebugEnabled()) {
LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
}
+ signalInterruptionProcessingNeeded();
+
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next();
s.clearMessagesInProgress();
@@ -2318,6 +2320,17 @@ public class ActiveMQConnection implemen
}
}
+ private void signalInterruptionProcessingNeeded() {
+ FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
+ if (failoverTransport != null) {
+ failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("notified failover transport (" + failoverTransport
+ + ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
+ }
+ }
+ }
+
/*
* specify the amount of time in milliseconds that a consumer with a transaction pending recovery
* will wait to receive re dispatched messages.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=993382&r1=993381&r2=993382&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java Tue Sep 7 14:40:58 2010
@@ -674,8 +674,9 @@ public class ConnectionStateTracker exte
}
}
- public void transportInterrupted() {
- for (ConnectionState connectionState : connectionStates.values()) {
+ public void transportInterrupted(ConnectionId connectionId) {
+ ConnectionState connectionState = connectionStates.get(connectionId);
+ if (connectionState != null) {
connectionState.setConnectionInterruptProcessingComplete(false);
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=993382&r1=993381&r2=993382&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Tue Sep 7 14:40:58 2010
@@ -233,8 +233,6 @@ public class FailoverTransport implement
connectedTransportURI = null;
connected = false;
- stateTracker.transportInterrupted();
-
// notify before any reconnect attempt so ack state can be
// whacked
if (transportListener != null) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java?rev=993382&r1=993381&r2=993382&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java Tue Sep 7 14:40:58 2010
@@ -51,6 +51,8 @@ public class FailoverStaticNetworkTest {
protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception {
BrokerService broker = new BrokerService();
+ broker.setUseJmx(true);
+ broker.getManagementContext().setCreateConnector(false);
broker.setSslContext(sslContext);
broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName("Broker_" + listenPort);
@@ -68,14 +70,14 @@ public class FailoverStaticNetworkTest {
}
@Before
- public void init() throws Exception {
+ public void setUp() throws Exception {
KeyManager[] km = SslBrokerServiceTest.getKeyManager();
TrustManager[] tm = SslBrokerServiceTest.getTrustManager();
sslContext = new SslContext(km, tm, null);
}
@After
- public void cleanup() throws Exception {
+ public void tearDown() throws Exception {
brokerB.stop();
brokerB.waitUntilStopped();
@@ -83,6 +85,44 @@ public class FailoverStaticNetworkTest {
brokerA.waitUntilStopped();
}
+ @Test
+ public void testSendReceiveAfterReconnect() throws Exception {
+ brokerA = createBroker("tcp", "61617", null);
+ brokerA.start();
+ brokerB = createBroker("tcp", "62617", new String[]{"61617"});
+ brokerB.start();
+ doTestNetworkSendReceive();
+
+ LOG.info("stopping brokerA");
+ brokerA.stop();
+ brokerA.waitUntilStopped();
+
+ LOG.info("restarting brokerA");
+ brokerA = createBroker("tcp", "61617", null);
+ brokerA.start();
+
+ doTestNetworkSendReceive();
+ }
+
+ @Test
+ public void testSendReceiveFailover() throws Exception {
+ brokerA = createBroker("tcp", "61617", null);
+ brokerA.start();
+ brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"});
+ brokerB.start();
+ doTestNetworkSendReceive();
+
+ LOG.info("stopping brokerA");
+ brokerA.stop();
+ brokerA.waitUntilStopped();
+
+ LOG.info("restarting brokerA");
+ brokerA = createBroker("tcp", "63617", null);
+ brokerA.start();
+
+ doTestNetworkSendReceive();
+ }
+
/**
* networked broker started after target so first connect attempt succeeds
* start order is important
@@ -95,7 +135,7 @@ public class FailoverStaticNetworkTest {
brokerB = createBroker("tcp", "62617", new String[]{"61617","1111"});
brokerB.start();
- testNetworkSendReceive();
+ doTestNetworkSendReceive();
}
@Test
@@ -106,28 +146,35 @@ public class FailoverStaticNetworkTest {
brokerB = createBroker("ssl", "62617", new String[]{"61617", "1111"});
brokerB.start();
- testNetworkSendReceive();
+ doTestNetworkSendReceive();
}
- private void testNetworkSendReceive() throws Exception, JMSException {
- LOG.info("Creating Consumer on the networked broker ...");
+ private void doTestNetworkSendReceive() throws Exception, JMSException {
+ LOG.info("Creating Consumer on the networked brokerA ...");
SslContext.setCurrentSslContext(sslContext);
- // Create a consumer on brokerB
+ // Create a consumer on brokerA
ConnectionFactory consFactory = createConnectionFactory(brokerA);
Connection consConn = consFactory.createConnection();
consConn.start();
Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
final MessageConsumer consumer = consSession.createConsumer(destination);
-
+
+ LOG.info("publishing to brokerB");
+
sendMessageTo(destination, brokerB);
- assertTrue("consumer got message", Wait.waitFor(new Wait.Condition() {
+ boolean gotMessage = Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return consumer.receive(1000) != null;
}
- }));
+ });
+ try {
+ consConn.close();
+ } catch (JMSException ignored) {
+ }
+ assertTrue("consumer on A got message", gotMessage);
}
private void sendMessageTo(ActiveMQDestination destination, BrokerService brokerService) throws Exception {