You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/02/05 16:16:37 UTC
svn commit: r1442619 -
/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java
Author: tabish
Date: Tue Feb 5 15:16:37 2013
New Revision: 1442619
URL: http://svn.apache.org/viewvc?rev=1442619&view=rev
Log:
Fix test failure due to fixed ports used in the test.
Modified:
activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java
Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java?rev=1442619&r1=1442618&r2=1442619&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java (original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java Tue Feb 5 15:16:37 2013
@@ -54,42 +54,46 @@ import org.slf4j.LoggerFactory;
public class FailoverManagedClusterTest extends TestCase {
private static final Logger LOG = LoggerFactory.getLogger(FailoverManagedClusterTest.class);
-
+
long txGenerator = System.currentTimeMillis();
-
+
private static final String MASTER_BIND_ADDRESS = "tcp://0.0.0.0:61616";
private static final String SLAVE_BIND_ADDRESS = "tcp://0.0.0.0:61617";
- private static final String BROKER_URL = "failover://(" + MASTER_BIND_ADDRESS + "," + SLAVE_BIND_ADDRESS + ")?randomize=false";
-
+ private String masterConnectionUri;
+ private String slaveConnectionUri;
+
+ private String brokerUri;
+
private BrokerService master;
private BrokerService slave;
- private CountDownLatch slaveThreadStarted = new CountDownLatch(1);
+ private final CountDownLatch slaveThreadStarted = new CountDownLatch(1);
@Override
protected void setUp() throws Exception {
createAndStartMaster();
- createAndStartSlave();
+ createAndStartSlave();
+
+ brokerUri = "failover://(" + masterConnectionUri + "," + slaveConnectionUri + ")?randomize=false";
}
-
+
@Override
protected void tearDown() throws Exception {
if (slave != null) {
slave.stop();
}
+
if (master != null) {
master.stop();
}
}
-
-
private void createAndStartMaster() throws Exception {
master = new BrokerService();
master.setDeleteAllMessagesOnStartup(true);
master.setUseJmx(false);
master.setBrokerName("BROKER");
- master.addConnector(MASTER_BIND_ADDRESS);
+ masterConnectionUri = master.addConnector(MASTER_BIND_ADDRESS).getPublishableConnectString();
master.start();
master.waitUntilStarted();
}
@@ -98,10 +102,11 @@ public class FailoverManagedClusterTest
slave = new BrokerService();
slave.setUseJmx(false);
slave.setBrokerName("BROKER");
- slave.addConnector(SLAVE_BIND_ADDRESS);
+ slaveConnectionUri = slave.addConnector(SLAVE_BIND_ADDRESS).getPublishableConnectString();
// Start the slave asynchronously, since this will block
new Thread(new Runnable() {
+ @Override
public void run() {
try {
slaveThreadStarted.countDown();
@@ -116,18 +121,19 @@ public class FailoverManagedClusterTest
}
public void testFailover() throws Exception {
-
- ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUri);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
- adapter.setServerUrl(BROKER_URL);
+ adapter.setServerUrl(brokerUri);
adapter.start(new StubBootstrapContext());
final CountDownLatch messageDelivered = new CountDownLatch(1);
final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+ @Override
public void onMessage(Message message) {
LOG.info("Received message " + message);
super.onMessage(message);
@@ -142,11 +148,13 @@ public class FailoverManagedClusterTest
activationSpec.validate();
MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
+ @Override
public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
endpoint.xaresource = resource;
return endpoint;
}
+ @Override
public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
return true;
}
@@ -163,7 +171,7 @@ public class FailoverManagedClusterTest
MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
slaveThreadStarted.await(10, TimeUnit.SECONDS);
-
+
// force a failover before send
LOG.info("Stopping master to force failover..");
master.stop();
@@ -173,45 +181,54 @@ public class FailoverManagedClusterTest
producer.send(session.createTextMessage("Hello, again!"));
// Wait for the message to be delivered.
- assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
+ assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
}
private static final class StubBootstrapContext implements BootstrapContext {
+ @Override
public WorkManager getWorkManager() {
return new WorkManager() {
+ @Override
public void doWork(Work work) throws WorkException {
new Thread(work).start();
}
+ @Override
public void doWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
new Thread(work).start();
}
+ @Override
public long startWork(Work work) throws WorkException {
new Thread(work).start();
return 0;
}
+ @Override
public long startWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
new Thread(work).start();
return 0;
}
+ @Override
public void scheduleWork(Work work) throws WorkException {
new Thread(work).start();
}
+ @Override
public void scheduleWork(Work work, long arg1, ExecutionContext arg2, WorkListener arg3) throws WorkException {
new Thread(work).start();
}
};
}
+ @Override
public XATerminator getXATerminator() {
return null;
}
+ @Override
public Timer createTimer() throws UnavailableException {
return null;
}
@@ -222,6 +239,7 @@ public class FailoverManagedClusterTest
public XAResource xaresource;
public Xid xid;
+ @Override
public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
try {
if (xid == null) {
@@ -233,6 +251,7 @@ public class FailoverManagedClusterTest
}
}
+ @Override
public void afterDelivery() throws ResourceException {
try {
xaresource.end(xid, XAResource.TMSUCCESS);
@@ -244,15 +263,17 @@ public class FailoverManagedClusterTest
}
}
+ @Override
public void release() {
}
+ @Override
public void onMessage(Message message) {
messageCount++;
}
}
-
+
public Xid createXid() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream os = new DataOutputStream(baos);
@@ -261,14 +282,17 @@ public class FailoverManagedClusterTest
final byte[] bs = baos.toByteArray();
return new Xid() {
+ @Override
public int getFormatId() {
return 86;
}
+ @Override
public byte[] getGlobalTransactionId() {
return bs;
}
+ @Override
public byte[] getBranchQualifier() {
return bs;
}