You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/20 15:08:11 UTC
svn commit: r1352081 -
/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
Author: tabish
Date: Wed Jun 20 13:08:10 2012
New Revision: 1352081
URL: http://svn.apache.org/viewvc?rev=1352081&view=rev
Log:
Don't use fixed ports for broker instances, let the connector find an open port.
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java?rev=1352081&r1=1352080&r2=1352081&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java Wed Jun 20 13:08:10 2012
@@ -17,14 +17,14 @@
package org.apache.activemq.bugs;
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Destination;
@@ -34,952 +34,701 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertTrue;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+public class AMQ3274Test {
+ private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3274Test.class);
-/**
- *
- */
-public class AMQ3274Test
-{
- protected static int Next_broker_num = 0;
- protected EmbeddedTcpBroker broker1;
- protected EmbeddedTcpBroker broker2;
-
- protected int nextEchoId = 0;
- protected boolean testError = false;
-
- protected int echoResponseFill = 0; // Number of "filler" response messages per request
-
- protected static Log LOG;
-
- static
- {
- LOG = LogFactory.getLog(AMQ3274Test.class);
- }
-
- public AMQ3274Test ()
- throws Exception
- {
- broker1 = new EmbeddedTcpBroker();
- broker2 = new EmbeddedTcpBroker();
-
- broker1.coreConnectTo(broker2, true);
- broker2.coreConnectTo(broker1, true);
- }
-
- public void logMessage (String msg)
- {
- System.out.println(msg);
- System.out.flush();
- }
-
-
- /**
- *
- */
-
- public void testMessages (Session sess, MessageProducer req_prod, Destination resp_dest, int num_msg)
- throws Exception
- {
- MessageConsumer resp_cons;
- TextMessage msg;
- MessageClient cons_client;
- int cur;
- int tot_expected;
-
- resp_cons = sess.createConsumer(resp_dest);
-
- cons_client = new MessageClient(resp_cons, num_msg);
- cons_client.start();
-
- cur = 0;
- while ( ( cur < num_msg ) && ( ! testError ) )
- {
- msg = sess.createTextMessage("MSG AAAA " + cur);
- msg.setIntProperty("SEQ", 100 + cur);
- msg.setStringProperty("TEST", "TOPO");
- msg.setJMSReplyTo(resp_dest);
-
- if ( cur == ( num_msg - 1 ) )
- msg.setBooleanProperty("end-of-response", true);
-
- req_prod.send(msg);
-
- cur++;
- }
-
- //
- // Give the consumer some time to receive the response.
- //
- cons_client.waitShutdown(5000);
-
- //
- // Now shutdown the consumer if it's still running.
- //
- if ( cons_client.shutdown() )
- LOG.debug("Consumer client shutdown complete");
- else
- LOG.debug("Consumer client shutdown incomplete!!!");
-
-
- //
- // Check that the correct number of messages was received.
- //
- tot_expected = num_msg * ( echoResponseFill + 1 );
-
- if ( cons_client.getNumMsgReceived() == tot_expected )
- {
- LOG.info("Have " + tot_expected + " messages, as-expected");
- }
- else
- {
- testError = true;
- LOG.info("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected);
- }
-
- resp_cons.close();
- }
-
-
- /**
- * Test one destination between the given "producer broker" and "consumer broker" specified.
- */
- public void testOneDest (Connection conn, Session sess, Destination cons_dest, String prod_broker_url,
- String cons_broker_url, int num_msg)
- throws Exception
- {
- int echo_id;
-
- EchoService echo_svc;
- String echo_queue_name;
- Destination prod_dest;
- MessageProducer msg_prod;
-
- synchronized ( this )
- {
- echo_id = this.nextEchoId;
- this.nextEchoId++;
- }
-
- echo_queue_name = "echo.queue." + echo_id;
-
- //
- // Remove any previously-created echo queue with the same name.
- //
- LOG.trace("destroying the echo queue in case an old one exists");
- removeQueue(conn, echo_queue_name);
-
-
- //
- // Now start the echo service with that queue.
- //
- echo_svc = new EchoService(echo_queue_name, prod_broker_url);
- echo_svc.start();
-
-
- //
- // Create the Producer to the echo request Queue
- //
- LOG.trace("Creating echo queue and producer");
- prod_dest = sess.createQueue(echo_queue_name);
- msg_prod = sess.createProducer(prod_dest);
-
-
- //
- // Pass messages around.
- //
- testMessages(sess, msg_prod, cons_dest, num_msg);
-
-
- //
- //
- //
-
- echo_svc.shutdown();
- msg_prod.close();
- }
-
-
- /**
- * TEST TEMPORARY TOPICS
- */
- public void testTempTopic (String prod_broker_url, String cons_broker_url)
- throws Exception
- {
- Connection conn;
- Session sess;
- Destination cons_dest;
- int echo_id;
- int num_msg;
-
- num_msg = 5;
-
- LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
- " messages)");
-
-
- //
- // Connect to the bus.
- //
-
- conn = createConnection(cons_broker_url);
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
- //
- // Create the destination on which messages are being tested.
- //
-
- LOG.trace("Creating destination");
- cons_dest = sess.createTemporaryTopic();
-
- testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
-
- //
- // Cleanup
- //
-
- sess.close();
- conn.close();
- }
-
-
- /**
- * TEST TOPICS
- */
- public void testTopic (String prod_broker_url, String cons_broker_url)
- throws Exception
- {
- int num_msg;
-
- Connection conn;
- Session sess;
- String topic_name;
-
- Destination cons_dest;
-
- num_msg = 5;
-
- LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
- " messages)");
-
-
- //
- // Connect to the bus.
- //
-
- conn = createConnection(cons_broker_url);
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
- //
- // Create the destination on which messages are being tested.
- //
-
- topic_name = "topotest2.perm.topic";
- LOG.trace("Removing existing Topic");
- removeTopic(conn, topic_name);
- LOG.trace("Creating Topic, " + topic_name);
- cons_dest = sess.createTopic(topic_name);
-
- testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
-
- //
- // Cleanup
- //
-
- removeTopic(conn, topic_name);
- sess.close();
- conn.close();
- }
-
-
- /**
- * TEST TEMPORARY QUEUES
- */
- public void testTempQueue (String prod_broker_url, String cons_broker_url)
- throws Exception
- {
- int echo_id;
- int num_msg;
-
- Connection conn;
- Session sess;
-
- Destination cons_dest;
-
- num_msg = 5;
-
- LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
- " messages)");
-
-
- //
- // Connect to the bus.
- //
-
- conn = createConnection(cons_broker_url);
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
- //
- // Create the destination on which messages are being tested.
- //
-
- LOG.trace("Creating destination");
- cons_dest = sess.createTemporaryQueue();
-
- testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
-
- //
- // Cleanup
- //
-
- sess.close();
- conn.close();
- }
-
-
- /**
- * TEST QUEUES
- */
- public void testQueue (String prod_broker_url, String cons_broker_url)
- throws Exception
- {
- int num_msg;
-
- Connection conn;
- Session sess;
- String queue_name;
-
- Destination cons_dest;
-
- num_msg = 5;
-
- LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
- " messages)");
-
-
- //
- // Connect to the bus.
- //
-
- conn = createConnection(cons_broker_url);
- conn.start();
- sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-
- //
- // Create the destination on which messages are being tested.
- //
-
- queue_name = "topotest2.perm.queue";
- LOG.trace("Removing existing Queue");
- removeQueue(conn, queue_name);
- LOG.trace("Creating Queue, " + queue_name);
- cons_dest = sess.createQueue(queue_name);
-
- testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
-
-
- //
- // Cleanup
- //
-
- removeQueue(conn, queue_name);
- sess.close();
- conn.close();
- }
-
- @Test
- public void run ()
- throws Exception
- {
- Thread start1;
- Thread start2;
-
- testError = false;
-
- // Use threads to avoid startup deadlock since the first broker started waits until
- // it knows the name of the remote broker before finishing its startup, which means
- // the remote must already be running.
-
- start1 = new Thread() {
- public void run()
- {
- try {
- broker1.start();
- } catch (Exception ex) {
- LOG.error(null, ex);
- }
- }
- };
-
- start2 = new Thread() {
- public void run()
- {
- try {
- broker2.start();
- } catch (Exception ex) {
- LOG.error(null, ex);
- }
- }
- };
-
- start1.start();
- start2.start();
-
- start1.join();
- start2.join();
-
- if ( ! testError )
- this.testTempTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
-
- if ( ! testError )
- this.testTempQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
-
- if ( ! testError )
- this.testTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
-
- if ( ! testError )
- this.testQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
-
- Thread.sleep(100);
-
- shutdown();
-
- assertTrue(! testError);
- }
-
- public void shutdown ()
- throws Exception
- {
- broker1.stop();
- broker2.stop();
- }
-
- /**
- * @param args the command line arguments
- */
- public static void main(String[] args)
- {
- AMQ3274Test main_obj;
-
- try
- {
- main_obj = new AMQ3274Test();
- main_obj.run();
- }
- catch (Exception ex)
- {
- ex.printStackTrace();
-
- LOG.error(null, ex);
-
- System.exit(0);
- }
- }
-
- protected Connection createConnection (String url)
- throws Exception
- {
- return org.apache.activemq.ActiveMQConnection.makeConnection(url);
- }
-
- protected static void removeQueue (Connection conn, String dest_name)
- throws java.lang.Exception
- {
- org.apache.activemq.command.ActiveMQDestination dest;
-
- if ( conn instanceof org.apache.activemq.ActiveMQConnection )
- {
- dest = org.apache.activemq.command.ActiveMQDestination.
- createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
- ((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
- }
- }
-
- protected static void removeTopic (Connection conn, String dest_name)
- throws java.lang.Exception
- {
- org.apache.activemq.command.ActiveMQDestination dest;
-
- if ( conn instanceof org.apache.activemq.ActiveMQConnection )
- {
- dest = org.apache.activemq.command.ActiveMQDestination.
- createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
- ((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
- }
- }
-
- public static String fmtMsgInfo (Message msg)
- throws Exception
- {
- StringBuilder msg_desc;
- String prop;
- Enumeration prop_enum;
-
- msg_desc = new StringBuilder();
- msg_desc = new StringBuilder();
-
- if ( msg instanceof TextMessage )
- {
- msg_desc.append(((TextMessage) msg).getText());
- }
- else
- {
- msg_desc.append("[");
- msg_desc.append(msg.getClass().getName());
- msg_desc.append("]");
- }
-
- prop_enum = msg.getPropertyNames();
- while ( prop_enum.hasMoreElements() )
- {
- prop = (String) prop_enum.nextElement();
- msg_desc.append("; ");
- msg_desc.append(prop);
- msg_desc.append("=");
- msg_desc.append(msg.getStringProperty(prop));
- }
-
- return msg_desc.toString();
- }
-
-//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-///////////////////////////////////////////////// INTERNAL CLASSES /////////////////////////////////////////////////
-//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
-
- protected class EmbeddedTcpBroker
- {
- protected BrokerService brokerSvc;
- protected int brokerNum;
- protected String brokerName;
- protected String brokerId;
- protected int port;
- protected String tcpUrl;
-
- public EmbeddedTcpBroker ()
- throws Exception
- {
- brokerSvc = new BrokerService();
-
- synchronized ( this.getClass() )
- {
- brokerNum = Next_broker_num;
- Next_broker_num++;
- }
-
- brokerName = "broker" + brokerNum;
- brokerId = "b" + brokerNum;
-
- brokerSvc.setBrokerName(brokerName);
- brokerSvc.setBrokerId(brokerId);
-
- brokerSvc.setPersistent(false);
- brokerSvc.setUseJmx(false); // TBD
-
- port = 60000 + ( brokerNum * 10 );
-
- // Configure the transport connector (TCP)
- tcpUrl = "tcp://127.0.0.1:" + Integer.toString(port);
- brokerSvc.addConnector(tcpUrl);
- }
-
- public Connection createConnection ()
- throws URISyntaxException, JMSException
- {
- Connection result;
-
- result = org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl);
-
- return result;
- }
-
- public String getConnectionUrl ()
- {
- return this.tcpUrl;
- }
-
-
- /**
- * Create network connections to the given broker using the network-connector
- * configuration of CORE brokers (e.g. core1.bus.dev1.coresys.tmcs)
- *
- * @param other
- * @param duplex_f
- */
- public void coreConnectTo (EmbeddedTcpBroker other, boolean duplex_f)
- throws Exception
- {
- this.makeConnectionTo(other, duplex_f, true);
- this.makeConnectionTo(other, duplex_f, false);
- }
-
- public void start ()
- throws Exception
- {
- brokerSvc.start();
- //brokerSvc.waitUntilStarted();
- }
-
- public void stop ()
- throws Exception
- {
- brokerSvc.stop();
- }
-
-
- /**
- * Make one connection to the other embedded broker, of the specified type (queue or topic)
- * using the standard CORE broker networking.
- *
- * @param other
- * @param duplex_f
- * @param queue_f
- * @throws Exception
- */
- protected void makeConnectionTo (EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f)
- throws Exception
- {
- NetworkConnector nw_conn;
- String prefix;
- ActiveMQDestination excl_dest;
- ArrayList excludes;
-
- nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
- nw_conn.setDuplex(duplex_f);
-
- if ( queue_f )
- nw_conn.setConduitSubscriptions(false);
- else
- nw_conn.setConduitSubscriptions(true);
-
- nw_conn.setNetworkTTL(5);
- nw_conn.setSuppressDuplicateQueueSubscriptions(true);
- nw_conn.setDecreaseNetworkConsumerPriority(true);
- nw_conn.setBridgeTempDestinations(true);
-
- if ( queue_f )
- {
- prefix = "queue";
- excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
- }
- else
- {
- prefix = "topic";
- excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
- }
-
- excludes = new ArrayList();
- excludes.add(excl_dest);
- nw_conn.setExcludedDestinations(excludes);
-
- if ( duplex_f )
- nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
- else
- nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
-
- brokerSvc.addNetworkConnector(nw_conn);
- }
- }
-
- protected class MessageClient extends java.lang.Thread
- {
- protected MessageConsumer msgCons;
- protected boolean shutdownInd;
- protected int expectedCount;
- protected int lastSeq = 0;
- protected int msgCount = 0;
- protected boolean haveFirstSeq;
- protected CountDownLatch shutdownLatch;
-
- public MessageClient (MessageConsumer cons, int num_to_expect)
- {
- msgCons = cons;
- expectedCount = ( num_to_expect * ( echoResponseFill + 1 ) );
- shutdownLatch = new CountDownLatch(1);
- }
-
- public void run ()
- {
- CountDownLatch latch;
-
- try
- {
- synchronized ( this )
- {
- latch = shutdownLatch;
- }
-
- shutdownInd = false;
- processMessages();
-
- latch.countDown();
- }
- catch ( Exception exc )
- {
- LOG.error("message client error", exc);
- }
- }
-
- public void waitShutdown (long timeout)
- {
- CountDownLatch latch;
-
- try
- {
- synchronized ( this )
- {
- latch = shutdownLatch;
- }
-
- if ( latch != null )
- latch.await(timeout, TimeUnit.MILLISECONDS);
- else
- LOG.info("echo client shutdown: client does not appear to be active");
- }
- catch ( InterruptedException int_exc )
- {
- LOG.warn("wait for message client shutdown interrupted", int_exc);
- }
- }
-
- public boolean shutdown ()
- {
- boolean down_ind;
-
- if ( ! shutdownInd )
- {
- shutdownInd = true;
- }
-
- waitShutdown(200);
-
- synchronized ( this )
- {
- if ( ( shutdownLatch == null ) || ( shutdownLatch.getCount() == 0 ) )
- down_ind = true;
- else
- down_ind = false;
- }
-
- return down_ind;
- }
-
- public int getNumMsgReceived ()
- {
- return msgCount;
- }
-
- protected void processMessages ()
- throws Exception
- {
- Message in_msg;
-
- haveFirstSeq = false;
-
- //
- // Stop at shutdown time or after any test error is detected.
- //
-
- while ( ( ! shutdownInd ) && ( ! testError ) )
- {
- in_msg = msgCons.receive(100);
-
- if ( in_msg != null )
- {
- msgCount++;
- checkMessage(in_msg);
- }
- }
- }
-
- protected void checkMessage (Message in_msg)
- throws Exception
- {
- int seq;
-
- LOG.debug("received message " + fmtMsgInfo(in_msg));
-
- //
- // Only check messages with a sequence number.
- //
-
- if ( in_msg.propertyExists("SEQ") )
- {
- seq = in_msg.getIntProperty("SEQ");
-
- if ( ( haveFirstSeq ) && ( seq != ( lastSeq + 1 ) ) )
- {
- LOG.error("***ERROR*** incorrect sequence number; expected " +
- Integer.toString(lastSeq + 1) + " but have " +
- Integer.toString(seq));
-
- testError = true;
- }
-
- lastSeq = seq;
-
- if ( msgCount > expectedCount )
- {
- LOG.warn("*** have more messages than expected; have " + msgCount +
- "; expect " + expectedCount);
-
- testError = true;
- }
- }
-
- if ( in_msg.propertyExists("end-of-response") )
- {
- LOG.trace("received end-of-response message");
- shutdownInd = true;
- }
- }
- }
-
- /**
- *
- */
- protected class EchoService extends java.lang.Thread
- {
- protected String destName;
- protected Connection jmsConn;
- protected Session sess;
- protected MessageConsumer msg_cons;
- protected boolean Shutdown_ind;
-
- protected Destination req_dest;
- protected Destination resp_dest;
- protected MessageProducer msg_prod;
-
- protected CountDownLatch waitShutdown;
-
- public EchoService (String dest, Connection broker_conn)
- throws Exception
- {
- destName = dest;
- jmsConn = broker_conn;
-
- Shutdown_ind = false;
-
- sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- req_dest = sess.createQueue(destName);
- msg_cons = sess.createConsumer(req_dest);
-
- jmsConn.start();
-
- waitShutdown = new CountDownLatch(1);
- }
-
- public EchoService (String dest, String broker_url)
- throws Exception
- {
- this(dest, ActiveMQConnection.makeConnection(broker_url));
- }
-
- public void run ()
- {
- Message req;
-
- try
- {
- LOG.info("STARTING ECHO SERVICE");
-
- while ( ! Shutdown_ind )
- {
- req = msg_cons.receive(100);
- if ( req != null )
- {
- if ( LOG.isDebugEnabled() )
- LOG.debug("ECHO request message " + req.toString());
-
- resp_dest = req.getJMSReplyTo();
- if ( resp_dest != null )
- {
- msg_prod = sess.createProducer(resp_dest);
- msg_prod.send(req);
- msg_prod.close();
- msg_prod = null;
- }
- else
- {
- LOG.warn("invalid request: no reply-to destination given");
- }
- }
- }
- }
- catch (Exception ex)
- {
- LOG.error(null, ex);
- }
- finally
- {
- LOG.info("shutting down test echo service");
-
- try
- {
- jmsConn.stop();
- }
- catch ( javax.jms.JMSException jms_exc )
- {
- LOG.warn("error on shutting down JMS connection", jms_exc);
- }
-
- synchronized ( this )
- {
- waitShutdown.countDown();
- }
- }
- }
-
-
- /**
- * Shut down the service, waiting up to 3 seconds for the service to terminate.
- */
- public void shutdown ()
- {
- CountDownLatch wait_l;
-
- synchronized ( this )
- {
- wait_l = waitShutdown;
- }
-
- Shutdown_ind = true;
-
- try
- {
- if ( wait_l != null )
- {
- if ( wait_l.await(3000, TimeUnit.MILLISECONDS) )
- LOG.info("echo service shutdown complete");
- else
- LOG.warn("timeout waiting for echo service shutdown");
- }
- else
- {
- LOG.info("echo service shutdown: service does not appear to be active");
- }
- }
- catch ( InterruptedException int_exc )
- {
- LOG.warn("interrupted while waiting for echo service shutdown");
- }
- }
- }
+ protected static int Next_broker_num = 0;
+ protected EmbeddedTcpBroker broker1;
+ protected EmbeddedTcpBroker broker2;
+
+ protected int nextEchoId = 0;
+ protected boolean testError = false;
+
+ protected int echoResponseFill = 0; // Number of "filler" response messages per request
+
+ public AMQ3274Test() throws Exception {
+ broker1 = new EmbeddedTcpBroker();
+ broker2 = new EmbeddedTcpBroker();
+
+ broker1.coreConnectTo(broker2, true);
+ broker2.coreConnectTo(broker1, true);
+ }
+
+ public void logMessage(String msg) {
+ System.out.println(msg);
+ System.out.flush();
+ }
+
+ public void testMessages(Session sess, MessageProducer req_prod, Destination resp_dest, int num_msg) throws Exception {
+ MessageConsumer resp_cons;
+ TextMessage msg;
+ MessageClient cons_client;
+ int cur;
+ int tot_expected;
+
+ resp_cons = sess.createConsumer(resp_dest);
+
+ cons_client = new MessageClient(resp_cons, num_msg);
+ cons_client.start();
+
+ cur = 0;
+ while ((cur < num_msg) && (!testError)) {
+ msg = sess.createTextMessage("MSG AAAA " + cur);
+ msg.setIntProperty("SEQ", 100 + cur);
+ msg.setStringProperty("TEST", "TOPO");
+ msg.setJMSReplyTo(resp_dest);
+
+ if (cur == (num_msg - 1))
+ msg.setBooleanProperty("end-of-response", true);
+
+ req_prod.send(msg);
+
+ cur++;
+ }
+
+ cons_client.waitShutdown(5000);
+
+ if (cons_client.shutdown()) {
+ LOG.debug("Consumer client shutdown complete");
+ } else {
+ LOG.debug("Consumer client shutdown incomplete!!!");
+ }
+
+ tot_expected = num_msg * (echoResponseFill + 1);
+
+ if (cons_client.getNumMsgReceived() == tot_expected) {
+ LOG.info("Have " + tot_expected + " messages, as-expected");
+ } else {
+ testError = true;
+ LOG.info("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected);
+ }
+
+ resp_cons.close();
+ }
+
+ /**
+ * Test one destination between the given "producer broker" and
+ * "consumer broker" specified.
+ */
+ public void testOneDest(Connection conn, Session sess, Destination cons_dest, String prod_broker_url, String cons_broker_url, int num_msg) throws Exception {
+ int echo_id;
+
+ EchoService echo_svc;
+ String echo_queue_name;
+ Destination prod_dest;
+ MessageProducer msg_prod;
+
+ synchronized (this) {
+ echo_id = this.nextEchoId;
+ this.nextEchoId++;
+ }
+
+ echo_queue_name = "echo.queue." + echo_id;
+
+ LOG.trace("destroying the echo queue in case an old one exists");
+ removeQueue(conn, echo_queue_name);
+
+ echo_svc = new EchoService(echo_queue_name, prod_broker_url);
+ echo_svc.start();
+
+ LOG.trace("Creating echo queue and producer");
+ prod_dest = sess.createQueue(echo_queue_name);
+ msg_prod = sess.createProducer(prod_dest);
+
+ testMessages(sess, msg_prod, cons_dest, num_msg);
+
+ echo_svc.shutdown();
+ msg_prod.close();
+ }
+
+ /**
+ * TEST TEMPORARY TOPICS
+ */
+ public void testTempTopic(String prod_broker_url, String cons_broker_url) throws Exception {
+ Connection conn;
+ Session sess;
+ Destination cons_dest;
+ int num_msg;
+
+ num_msg = 5;
+
+ LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
+
+ conn = createConnection(cons_broker_url);
+ conn.start();
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ LOG.trace("Creating destination");
+ cons_dest = sess.createTemporaryTopic();
+
+ testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+ sess.close();
+ conn.close();
+ }
+
+ /**
+ * TEST TOPICS
+ */
+ public void testTopic(String prod_broker_url, String cons_broker_url) throws Exception {
+ int num_msg;
+
+ Connection conn;
+ Session sess;
+ String topic_name;
+
+ Destination cons_dest;
+
+ num_msg = 5;
+
+ LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
+
+ conn = createConnection(cons_broker_url);
+ conn.start();
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ topic_name = "topotest2.perm.topic";
+ LOG.trace("Removing existing Topic");
+ removeTopic(conn, topic_name);
+ LOG.trace("Creating Topic, " + topic_name);
+ cons_dest = sess.createTopic(topic_name);
+
+ testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+ removeTopic(conn, topic_name);
+ sess.close();
+ conn.close();
+ }
+
+ /**
+ * TEST TEMPORARY QUEUES
+ */
+ public void testTempQueue(String prod_broker_url, String cons_broker_url) throws Exception {
+ int num_msg;
+
+ Connection conn;
+ Session sess;
+
+ Destination cons_dest;
+
+ num_msg = 5;
+
+ LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
+
+ conn = createConnection(cons_broker_url);
+ conn.start();
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ LOG.trace("Creating destination");
+ cons_dest = sess.createTemporaryQueue();
+
+ testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+ sess.close();
+ conn.close();
+ }
+
+ /**
+ * TEST QUEUES
+ */
+ public void testQueue(String prod_broker_url, String cons_broker_url) throws Exception {
+ int num_msg;
+
+ Connection conn;
+ Session sess;
+ String queue_name;
+
+ Destination cons_dest;
+
+ num_msg = 5;
+
+ LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)");
+
+ conn = createConnection(cons_broker_url);
+ conn.start();
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ queue_name = "topotest2.perm.queue";
+ LOG.trace("Removing existing Queue");
+ removeQueue(conn, queue_name);
+ LOG.trace("Creating Queue, " + queue_name);
+ cons_dest = sess.createQueue(queue_name);
+
+ testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+ removeQueue(conn, queue_name);
+ sess.close();
+ conn.close();
+ }
+
+ @Test
+ public void run() throws Exception {
+ Thread start1;
+ Thread start2;
+
+ testError = false;
+
+ // Use threads to avoid startup deadlock since the first broker started waits until
+ // it knows the name of the remote broker before finishing its startup, which means
+ // the remote must already be running.
+
+ start1 = new Thread() {
+ public void run() {
+ try {
+ broker1.start();
+ } catch (Exception ex) {
+ LOG.error(null, ex);
+ }
+ }
+ };
+
+ start2 = new Thread() {
+ public void run() {
+ try {
+ broker2.start();
+ } catch (Exception ex) {
+ LOG.error(null, ex);
+ }
+ }
+ };
+
+ start1.start();
+ start2.start();
+
+ start1.join();
+ start2.join();
+
+ if (!testError) {
+ this.testTempTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+ }
+ if (!testError) {
+ this.testTempQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+ }
+ if (!testError) {
+ this.testTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+ }
+ if (!testError) {
+ this.testQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+ }
+ Thread.sleep(100);
+
+ shutdown();
+
+ assertTrue(!testError);
+ }
+
+ public void shutdown() throws Exception {
+ broker1.stop();
+ broker2.stop();
+ }
+
+ /**
+ * @param args
+ * the command line arguments
+ */
+ public static void main(String[] args) {
+ AMQ3274Test main_obj;
+
+ try {
+ main_obj = new AMQ3274Test();
+ main_obj.run();
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ LOG.error(null, ex);
+ System.exit(0);
+ }
+ }
+
+ protected Connection createConnection(String url) throws Exception {
+ return org.apache.activemq.ActiveMQConnection.makeConnection(url);
+ }
+
+ protected static void removeQueue(Connection conn, String dest_name) throws java.lang.Exception {
+ org.apache.activemq.command.ActiveMQDestination dest;
+
+ if (conn instanceof org.apache.activemq.ActiveMQConnection) {
+ dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name,
+ (byte) org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
+ ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
+ }
+ }
+
+ protected static void removeTopic(Connection conn, String dest_name) throws java.lang.Exception {
+ org.apache.activemq.command.ActiveMQDestination dest;
+
+ if (conn instanceof org.apache.activemq.ActiveMQConnection) {
+ dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name,
+ (byte) org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
+ ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ public static String fmtMsgInfo(Message msg) throws Exception {
+ StringBuilder msg_desc;
+ String prop;
+ Enumeration prop_enum;
+
+ msg_desc = new StringBuilder();
+ msg_desc = new StringBuilder();
+
+ if (msg instanceof TextMessage) {
+ msg_desc.append(((TextMessage) msg).getText());
+ } else {
+ msg_desc.append("[");
+ msg_desc.append(msg.getClass().getName());
+ msg_desc.append("]");
+ }
+
+ prop_enum = msg.getPropertyNames();
+ while (prop_enum.hasMoreElements()) {
+ prop = (String) prop_enum.nextElement();
+ msg_desc.append("; ");
+ msg_desc.append(prop);
+ msg_desc.append("=");
+ msg_desc.append(msg.getStringProperty(prop));
+ }
+
+ return msg_desc.toString();
+ }
+
+ // ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+ // /////////////////////////////////////////////// INTERNAL CLASSES
+ // /////////////////////////////////////////////////
+ // ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ protected class EmbeddedTcpBroker {
+ protected BrokerService brokerSvc;
+ protected int brokerNum;
+ protected String brokerName;
+ protected String brokerId;
+ protected int port;
+ protected String tcpUrl;
+
+ public EmbeddedTcpBroker() throws Exception {
+ brokerSvc = new BrokerService();
+
+ synchronized (this.getClass()) {
+ brokerNum = Next_broker_num;
+ Next_broker_num++;
+ }
+
+ brokerName = "broker" + brokerNum;
+ brokerId = "b" + brokerNum;
+
+ brokerSvc.setBrokerName(brokerName);
+ brokerSvc.setBrokerId(brokerId);
+ brokerSvc.setPersistent(false);
+ brokerSvc.setUseJmx(false);
+ tcpUrl = brokerSvc.addConnector("tcp://localhost:0").getPublishableConnectString();
+ }
+
+ public Connection createConnection() throws URISyntaxException, JMSException {
+ Connection result;
+
+ result = org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl);
+
+ return result;
+ }
+
+ public String getConnectionUrl() {
+ return this.tcpUrl;
+ }
+
+ /**
+ * Create network connections to the given broker using the
+ * network-connector configuration of CORE brokers (e.g.
+ * core1.bus.dev1.coresys.tmcs)
+ *
+ * @param other
+ * @param duplex_f
+ */
+ public void coreConnectTo(EmbeddedTcpBroker other, boolean duplex_f) throws Exception {
+ this.makeConnectionTo(other, duplex_f, true);
+ this.makeConnectionTo(other, duplex_f, false);
+ }
+
+ public void start() throws Exception {
+ brokerSvc.start();
+ }
+
+ public void stop() throws Exception {
+ brokerSvc.stop();
+ }
+
+ /**
+ * Make one connection to the other embedded broker, of the specified
+ * type (queue or topic) using the standard CORE broker networking.
+ *
+ * @param other
+ * @param duplex_f
+ * @param queue_f
+ * @throws Exception
+ */
+ protected void makeConnectionTo(EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f) throws Exception {
+ NetworkConnector nw_conn;
+ String prefix;
+ ActiveMQDestination excl_dest;
+ ArrayList<ActiveMQDestination> excludes;
+
+ nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
+ nw_conn.setDuplex(duplex_f);
+
+ if (queue_f)
+ nw_conn.setConduitSubscriptions(false);
+ else
+ nw_conn.setConduitSubscriptions(true);
+
+ nw_conn.setNetworkTTL(5);
+ nw_conn.setSuppressDuplicateQueueSubscriptions(true);
+ nw_conn.setDecreaseNetworkConsumerPriority(true);
+ nw_conn.setBridgeTempDestinations(true);
+
+ if (queue_f) {
+ prefix = "queue";
+ excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
+ } else {
+ prefix = "topic";
+ excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
+ }
+
+ excludes = new ArrayList<ActiveMQDestination>();
+ excludes.add(excl_dest);
+ nw_conn.setExcludedDestinations(excludes);
+
+ if (duplex_f)
+ nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
+ else
+ nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
+
+ brokerSvc.addNetworkConnector(nw_conn);
+ }
+ }
+
+ protected class MessageClient extends java.lang.Thread {
+ protected MessageConsumer msgCons;
+ protected boolean shutdownInd;
+ protected int expectedCount;
+ protected int lastSeq = 0;
+ protected int msgCount = 0;
+ protected boolean haveFirstSeq;
+ protected CountDownLatch shutdownLatch;
+
+ public MessageClient(MessageConsumer cons, int num_to_expect) {
+ msgCons = cons;
+ expectedCount = (num_to_expect * (echoResponseFill + 1));
+ shutdownLatch = new CountDownLatch(1);
+ }
+
+ public void run() {
+ CountDownLatch latch;
+
+ try {
+ synchronized (this) {
+ latch = shutdownLatch;
+ }
+
+ shutdownInd = false;
+ processMessages();
+
+ latch.countDown();
+ } catch (Exception exc) {
+ LOG.error("message client error", exc);
+ }
+ }
+
+ public void waitShutdown(long timeout) {
+ CountDownLatch latch;
+
+ try {
+ synchronized (this) {
+ latch = shutdownLatch;
+ }
+
+ if (latch != null)
+ latch.await(timeout, TimeUnit.MILLISECONDS);
+ else
+ LOG.info("echo client shutdown: client does not appear to be active");
+ } catch (InterruptedException int_exc) {
+ LOG.warn("wait for message client shutdown interrupted", int_exc);
+ }
+ }
+
+ public boolean shutdown() {
+ boolean down_ind;
+
+ if (!shutdownInd) {
+ shutdownInd = true;
+ }
+
+ waitShutdown(200);
+
+ synchronized (this) {
+ if ((shutdownLatch == null) || (shutdownLatch.getCount() == 0))
+ down_ind = true;
+ else
+ down_ind = false;
+ }
+
+ return down_ind;
+ }
+
+ public int getNumMsgReceived() {
+ return msgCount;
+ }
+
+ protected void processMessages() throws Exception {
+ Message in_msg;
+
+ haveFirstSeq = false;
+ while ((!shutdownInd) && (!testError)) {
+ in_msg = msgCons.receive(100);
+
+ if (in_msg != null) {
+ msgCount++;
+ checkMessage(in_msg);
+ }
+ }
+ }
+
+ protected void checkMessage(Message in_msg) throws Exception {
+ int seq;
+
+ LOG.debug("received message " + fmtMsgInfo(in_msg));
+
+ if (in_msg.propertyExists("SEQ")) {
+ seq = in_msg.getIntProperty("SEQ");
+
+ if ((haveFirstSeq) && (seq != (lastSeq + 1))) {
+ LOG.error("***ERROR*** incorrect sequence number; expected " + Integer.toString(lastSeq + 1) + " but have " + Integer.toString(seq));
+
+ testError = true;
+ }
+
+ lastSeq = seq;
+
+ if (msgCount > expectedCount) {
+ LOG.warn("*** have more messages than expected; have " + msgCount + "; expect " + expectedCount);
+
+ testError = true;
+ }
+ }
+
+ if (in_msg.propertyExists("end-of-response")) {
+ LOG.trace("received end-of-response message");
+ shutdownInd = true;
+ }
+ }
+ }
+
+ protected class EchoService extends java.lang.Thread {
+ protected String destName;
+ protected Connection jmsConn;
+ protected Session sess;
+ protected MessageConsumer msg_cons;
+ protected boolean Shutdown_ind;
+
+ protected Destination req_dest;
+ protected Destination resp_dest;
+ protected MessageProducer msg_prod;
+
+ protected CountDownLatch waitShutdown;
+
+ public EchoService(String dest, Connection broker_conn) throws Exception {
+ destName = dest;
+ jmsConn = broker_conn;
+
+ Shutdown_ind = false;
+
+ sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ req_dest = sess.createQueue(destName);
+ msg_cons = sess.createConsumer(req_dest);
+
+ jmsConn.start();
+
+ waitShutdown = new CountDownLatch(1);
+ }
+
+ public EchoService(String dest, String broker_url) throws Exception {
+ this(dest, ActiveMQConnection.makeConnection(broker_url));
+ }
+
+ public void run() {
+ Message req;
+
+ try {
+ LOG.info("STARTING ECHO SERVICE");
+
+ while (!Shutdown_ind) {
+ req = msg_cons.receive(100);
+ if (req != null) {
+ if (LOG.isDebugEnabled())
+ LOG.debug("ECHO request message " + req.toString());
+
+ resp_dest = req.getJMSReplyTo();
+ if (resp_dest != null) {
+ msg_prod = sess.createProducer(resp_dest);
+ msg_prod.send(req);
+ msg_prod.close();
+ msg_prod = null;
+ } else {
+ LOG.warn("invalid request: no reply-to destination given");
+ }
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error(null, ex);
+ } finally {
+ LOG.info("shutting down test echo service");
+
+ try {
+ jmsConn.stop();
+ } catch (javax.jms.JMSException jms_exc) {
+ LOG.warn("error on shutting down JMS connection", jms_exc);
+ }
+
+ synchronized (this) {
+ waitShutdown.countDown();
+ }
+ }
+ }
+
+ /**
+ * Shut down the service, waiting up to 3 seconds for the service to
+ * terminate.
+ */
+ public void shutdown() {
+ CountDownLatch wait_l;
+
+ synchronized (this) {
+ wait_l = waitShutdown;
+ }
+
+ Shutdown_ind = true;
+
+ try {
+ if (wait_l != null) {
+ if (wait_l.await(3000, TimeUnit.MILLISECONDS)) {
+ LOG.info("echo service shutdown complete");
+ } else {
+ LOG.warn("timeout waiting for echo service shutdown");
+ }
+ } else {
+ LOG.info("echo service shutdown: service does not appear to be active");
+ }
+ } catch (InterruptedException int_exc) {
+ LOG.warn("interrupted while waiting for echo service shutdown");
+ }
+ }
+ }
}