You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/02 21:59:18 UTC
svn commit: r1428029 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
Author: tabish
Date: Wed Jan 2 20:59:18 2013
New Revision: 1428029
URL: http://svn.apache.org/viewvc?rev=1428029&view=rev
Log:
fix test cases after changes in https://issues.apache.org/jira/browse/AMQ-4237 broker the tests queue MBean lookup
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java?rev=1428029&r1=1428028&r2=1428029&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java Wed Jan 2 20:59:18 2013
@@ -17,11 +17,8 @@
package org.apache.activemq.network;
import static org.junit.Assert.assertTrue;
-import static org.junit.Assume.assumeNotNull;
import java.io.File;
-import java.net.MalformedURLException;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -31,38 +28,35 @@ import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
-import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
-import javax.management.remote.JMXConnector;
-import javax.management.remote.JMXConnectorFactory;
-import javax.management.remote.JMXServiceURL;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class NetworkBrokerDetachTest {
- private final static String BROKER_NAME = "broker";
- private final static String REM_BROKER_NAME = "networkedBroker";
- private final static String DESTINATION_NAME = "testQ";
- private final static int NUM_CONSUMERS = 1;
-
+ private final static String BROKER_NAME = "broker";
+ private final static String REM_BROKER_NAME = "networkedBroker";
+ private final static String DESTINATION_NAME = "testQ";
+ private final static int NUM_CONSUMERS = 1;
+
protected static final Logger LOG = LoggerFactory.getLogger(NetworkBrokerDetachTest.class);
protected final int numRestarts = 3;
protected final int networkTTL = 2;
protected final boolean dynamicOnly = false;
-
+
protected BrokerService broker;
protected BrokerService networkedBroker;
@@ -86,42 +80,36 @@ public class NetworkBrokerDetachTest {
configureNetworkConnector(networkConnector);
return broker;
}
-
+
private void configureNetworkConnector(NetworkConnector networkConnector) {
networkConnector.setDuplex(false);
networkConnector.setNetworkTTL(networkTTL);
networkConnector.setDynamicOnly(dynamicOnly);
}
-
+
// variants for each store....
protected void configureBroker(BrokerService broker) throws Exception {
- //KahaPersistenceAdapter persistenceAdapter = new KahaPersistenceAdapter();
- //persistenceAdapter.setDirectory(new File("target/activemq-data/kaha/" + broker.getBrokerName() + "/NetworBrokerDetatchTest"));
- //broker.setPersistenceAdapter(persistenceAdapter);
-
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/" + broker.getBrokerName() + "NetworBrokerDetatchTest"));
broker.setPersistenceAdapter(persistenceAdapter);
-
- // default AMQ
}
-
+
@Before
public void init() throws Exception {
broker = createBroker();
broker.setDeleteAllMessagesOnStartup(true);
broker.start();
-
+
networkedBroker = createNetworkedBroker();
networkedBroker.setDeleteAllMessagesOnStartup(true);
networkedBroker.start();
}
-
+
@After
public void cleanup() throws Exception {
networkedBroker.stop();
networkedBroker.waitUntilStopped();
-
+
broker.stop();
broker.waitUntilStopped();
}
@@ -129,7 +117,7 @@ public class NetworkBrokerDetachTest {
@Test
public void testNetworkedBrokerDetach() throws Exception {
LOG.info("Creating Consumer on the networked broker ...");
- // Create a consumer on the networked broker
+ // Create a consumer on the networked broker
ConnectionFactory consFactory = createConnectionFactory(networkedBroker);
Connection consConn = consFactory.createConnection();
Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -137,64 +125,63 @@ public class NetworkBrokerDetachTest {
for(int i=0; i<NUM_CONSUMERS; i++) {
consSession.createConsumer(destination);
}
-
- assertTrue("got expected consumer count from mbean within time limit",
- verifyConsumerCount(1, destination, broker));
-
-
+
+ assertTrue("got expected consumer count from mbean within time limit",
+ verifyConsumerCount(1, destination, broker));
+
LOG.info("Stopping Consumer on the networked broker ...");
- // Closing the connection will also close the consumer
+ // Closing the connection will also close the consumer
consConn.close();
-
+
// We should have 0 consumer for the queue on the local broker
assertTrue("got expected 0 count from mbean within time limit", verifyConsumerCount(0, destination, broker));
}
-
@Test
public void testNetworkedBrokerDurableSubAfterRestart() throws Exception {
-
+
final AtomicInteger count = new AtomicInteger(0);
MessageListener counter = new MessageListener() {
+ @Override
public void onMessage(Message message) {
count.incrementAndGet();
}
};
-
+
LOG.info("Creating durable consumer on each broker ...");
ActiveMQTopic destination = registerDurableConsumer(networkedBroker, counter);
registerDurableConsumer(broker, counter);
-
+
assertTrue("got expected consumer count from local broker mbean within time limit",
verifyConsumerCount(2, destination, broker));
-
+
assertTrue("got expected consumer count from network broker mbean within time limit",
verifyConsumerCount(2, destination, networkedBroker));
-
+
sendMessageTo(destination, broker);
-
+
assertTrue("Got one message on each", verifyMessageCount(2, count));
-
+
LOG.info("Stopping brokerTwo...");
networkedBroker.stop();
- networkedBroker.waitUntilStopped();
-
+ networkedBroker.waitUntilStopped();
+
LOG.info("restarting broker Two...");
networkedBroker = createNetworkedBroker();
networkedBroker.start();
-
+
LOG.info("Recreating durable Consumer on the broker after restart...");
registerDurableConsumer(networkedBroker, counter);
-
+
// give advisories a chance to percolate
TimeUnit.SECONDS.sleep(5);
-
+
sendMessageTo(destination, broker);
-
+
// expect similar after restart
assertTrue("got expected consumer count from local broker mbean within time limit",
verifyConsumerCount(2, destination, broker));
-
+
// a durable sub is auto bridged on restart unless dynamicOnly=true
assertTrue("got expected consumer count from network broker mbean within time limit",
verifyConsumerCount(2, destination, networkedBroker));
@@ -209,9 +196,10 @@ public class NetworkBrokerDetachTest {
private boolean verifyMessageCount(final int i, final AtomicInteger count) throws Exception {
return Wait.waitFor(new Wait.Condition() {
+ @Override
public boolean isSatisified() throws Exception {
return i == count.get();
- }
+ }
});
}
@@ -237,10 +225,9 @@ public class NetworkBrokerDetachTest {
session.createProducer(destination).send(session.createTextMessage("Hi"));
conn.close();
}
-
+
protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
-
- String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
+ String url = broker.getTransportConnectors().get(0).getServer().getConnectURI().toString();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connectionFactory.setOptimizedMessageDispatch(true);
connectionFactory.setCopyMessageOnSend(false);
@@ -256,70 +243,61 @@ public class NetworkBrokerDetachTest {
connectionFactory.setAlwaysSyncSend(true);
return connectionFactory;
}
-
- // JMX Helper Methods
+
+ // JMX Helper Methods
private boolean verifyConsumerCount(final long expectedCount, final ActiveMQDestination destination, final BrokerService broker) throws Exception {
return Wait.waitFor(new Wait.Condition() {
+ @Override
public boolean isSatisified() throws Exception {
boolean result = false;
try {
+
+ ObjectName[] destinations;
+
+ if (destination.isQueue()) {
+ destinations = broker.getAdminView().getQueues();
+ } else {
+ destinations = broker.getAdminView().getTopics();
+ }
+
// We should have 1 consumer for the queue on the local broker
- Object consumers = broker.getManagementContext().getAttribute(getObjectName(broker.getBrokerName(), destination.isQueue() ? "Queue" : "Topic", "Destination=" + destination.getPhysicalName()), "ConsumerCount");
- if (consumers != null) {
- LOG.info("Consumers for " + destination.getPhysicalName() + " on " + broker + " : " + consumers);
- if (expectedCount == ((Long)consumers).longValue()) {
- result = true;
+ for (ObjectName name : destinations) {
+ DestinationViewMBean view = (DestinationViewMBean)
+ broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
+
+ if (view.getName().equals(destination.getPhysicalName())) {
+ LOG.info("Consumers for " + destination.getPhysicalName() + " on " + broker + " : " + view.getConsumerCount());
+ if (expectedCount == view.getConsumerCount()) {
+ result = true;
+ }
}
}
+
} catch (Exception ignoreAndRetry) {
}
return result;
- }
+ }
});
}
-
-
+
private boolean verifyDurableConsumerCount(final long expectedCount, final BrokerService broker) throws Exception {
return Wait.waitFor(new Wait.Condition() {
+ @Override
public boolean isSatisified() throws Exception {
boolean result = false;
- MBeanServerConnection mbsc = getMBeanServerConnection();
- if (mbsc != null) {
- Set subs = broker.getManagementContext().queryNames(getObjectName(broker.getBrokerName(), "Subscription", "active=false,*"), null);
+ BrokerView view = broker.getAdminView();
+
+ if (view != null) {
+ ObjectName[] subs = broker.getAdminView().getInactiveDurableTopicSubscribers();
if (subs != null) {
LOG.info("inactive durable subs on " + broker + " : " + subs);
- if (expectedCount == subs.size()) {
+ if (expectedCount == subs.length) {
result = true;
}
}
}
return result;
- }
+ }
});
}
-
- private MBeanServerConnection getMBeanServerConnection() throws MalformedURLException {
- final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
- MBeanServerConnection mbsc = null;
- try {
- JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
- mbsc = jmxc.getMBeanServerConnection();
- } catch (Exception ignored) {
- LOG.warn("getMBeanServer ex: " + ignored);
- }
- // If port 1099 is in use when the Broker starts, starting the jmx
- // connector will fail. So, if we have no mbsc to query, skip the
- // test.
- assumeNotNull(mbsc);
- return mbsc;
- }
-
-
- private ObjectName getObjectName(String brokerName, String type, String pattern) throws Exception {
- ObjectName beanName = new ObjectName(
- "org.apache.activemq:BrokerName=" + brokerName + ",Type=" + type +"," + pattern
- );
-
- return beanName;
- }
}