You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2012/04/16 08:23:15 UTC
svn commit: r1326502 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
test/java/org/apache/activemq/bugs/AMQ3274Test.java
Author: rajdavies
Date: Mon Apr 16 06:23:14 2012
New Revision: 1326502
URL: http://svn.apache.org/viewvc?rev=1326502&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-3274
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1326502&r1=1326501&r2=1326502&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Mon Apr 16 06:23:14 2012
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
-
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
@@ -38,35 +37,11 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.region.AbstractRegion;
import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTempDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.BrokerInfo;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.KeepAliveInfo;
-import org.apache.activemq.command.Message;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.NetworkBridgeFilter;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.command.*;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.thread.DefaultThreadPools;
@@ -1011,7 +986,7 @@ public abstract class DemandForwardingBr
List<ConsumerId> candidateConsumers = consumerInfo.getNetworkConsumerIds();
Collection<Subscription> currentSubs =
- getRegionSubscriptions(consumerInfo.getDestination().isTopic());
+ getRegionSubscriptions(consumerInfo.getDestination());
for (Subscription sub : currentSubs) {
List<ConsumerId> networkConsumers = sub.getConsumerInfo().getNetworkConsumerIds();
if (!networkConsumers.isEmpty()) {
@@ -1079,11 +1054,37 @@ public abstract class DemandForwardingBr
return found;
}
- private final Collection<Subscription> getRegionSubscriptions(boolean isTopic) {
- RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
- AbstractRegion abstractRegion = (AbstractRegion)
- (isTopic ? region.getTopicRegion() : region.getQueueRegion());
- return abstractRegion.getSubscriptions().values();
+ private final Collection<Subscription> getRegionSubscriptions(ActiveMQDestination dest) {
+ RegionBroker region_broker = (RegionBroker) brokerService.getRegionBroker();
+ Region region;
+ Collection<Subscription> subs;
+
+ region = null;
+ switch ( dest.getDestinationType() )
+ {
+ case ActiveMQDestination.QUEUE_TYPE:
+ region = region_broker.getQueueRegion();
+ break;
+
+ case ActiveMQDestination.TOPIC_TYPE:
+ region = region_broker.getTopicRegion();
+ break;
+
+ case ActiveMQDestination.TEMP_QUEUE_TYPE:
+ region = region_broker.getTempQueueRegion();
+ break;
+
+ case ActiveMQDestination.TEMP_TOPIC_TYPE:
+ region = region_broker.getTempTopicRegion();
+ break;
+ }
+
+ if ( region instanceof AbstractRegion )
+ subs = ((AbstractRegion) region).getSubscriptions().values();
+ else
+ subs = null;
+
+ return subs;
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info) throws IOException {
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java?rev=1326502&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java Mon Apr 16 06:23:14 2012
@@ -0,0 +1,975 @@
+/*
+ */
+
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.junit.*;
+import static org.junit.Assert.*;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ *
+ */
+public class AMQ3274Test
+{
+ protected static int Next_broker_num = 0;
+ protected EmbeddedTcpBroker broker1;
+ protected EmbeddedTcpBroker broker2;
+
+ protected int nextEchoId = 0;
+ protected boolean testError = false;
+
+ protected int echoResponseFill = 0; // Number of "filler" response messages per request
+
+ protected static Log LOG;
+
+ static
+ {
+ LOG = LogFactory.getLog(AMQ3274Test.class);
+ }
+
+ public AMQ3274Test ()
+ throws Exception
+ {
+ broker1 = new EmbeddedTcpBroker();
+ broker2 = new EmbeddedTcpBroker();
+
+ broker1.coreConnectTo(broker2, true);
+ broker2.coreConnectTo(broker1, true);
+ }
+
+ public void logMessage (String msg)
+ {
+ System.out.println(msg);
+ System.out.flush();
+ }
+
+
+ /**
+ *
+ */
+
+ public void testMessages (Session sess, MessageProducer req_prod, Destination resp_dest, int num_msg)
+ throws Exception
+ {
+ MessageConsumer resp_cons;
+ TextMessage msg;
+ MessageClient cons_client;
+ int cur;
+ int tot_expected;
+
+ resp_cons = sess.createConsumer(resp_dest);
+
+ cons_client = new MessageClient(resp_cons, num_msg);
+ cons_client.start();
+
+ cur = 0;
+ while ( ( cur < num_msg ) && ( ! testError ) )
+ {
+ msg = sess.createTextMessage("MSG AAAA " + cur);
+ msg.setIntProperty("SEQ", 100 + cur);
+ msg.setStringProperty("TEST", "TOPO");
+ msg.setJMSReplyTo(resp_dest);
+
+ if ( cur == ( num_msg - 1 ) )
+ msg.setBooleanProperty("end-of-response", true);
+
+ req_prod.send(msg);
+
+ cur++;
+ }
+
+ //
+ // Give the consumer some time to receive the response.
+ //
+ cons_client.waitShutdown(5000);
+
+ //
+ // Now shutdown the consumer if it's still running.
+ //
+ if ( cons_client.shutdown() )
+ LOG.debug("Consumer client shutdown complete");
+ else
+ LOG.debug("Consumer client shutdown incomplete!!!");
+
+
+ //
+ // Check that the correct number of messages was received.
+ //
+ tot_expected = num_msg * ( echoResponseFill + 1 );
+
+ if ( cons_client.getNumMsgReceived() == tot_expected )
+ {
+ LOG.info("Have " + tot_expected + " messages, as-expected");
+ }
+ else
+ {
+ testError = true;
+ LOG.info("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected);
+ }
+
+ resp_cons.close();
+ }
+
+
+ /**
+ * Test one destination between the given "producer broker" and "consumer broker" specified.
+ */
+ public void testOneDest (Connection conn, Session sess, Destination cons_dest, String prod_broker_url,
+ String cons_broker_url, int num_msg)
+ throws Exception
+ {
+ int echo_id;
+
+ EchoService echo_svc;
+ String echo_queue_name;
+ Destination prod_dest;
+ MessageProducer msg_prod;
+
+ synchronized ( this )
+ {
+ echo_id = this.nextEchoId;
+ this.nextEchoId++;
+ }
+
+ echo_queue_name = "echo.queue." + echo_id;
+
+ //
+ // Remove any previously-created echo queue with the same name.
+ //
+ LOG.trace("destroying the echo queue in case an old one exists");
+ removeQueue(conn, echo_queue_name);
+
+
+ //
+ // Now start the echo service with that queue.
+ //
+ echo_svc = new EchoService(echo_queue_name, prod_broker_url);
+ echo_svc.start();
+
+
+ //
+ // Create the Producer to the echo request Queue
+ //
+ LOG.trace("Creating echo queue and producer");
+ prod_dest = sess.createQueue(echo_queue_name);
+ msg_prod = sess.createProducer(prod_dest);
+
+
+ //
+ // Pass messages around.
+ //
+ testMessages(sess, msg_prod, cons_dest, num_msg);
+
+
+ //
+ //
+ //
+
+ echo_svc.shutdown();
+ msg_prod.close();
+ }
+
+
+ /**
+ * TEST TEMPORARY TOPICS
+ */
+ public void testTempTopic (String prod_broker_url, String cons_broker_url)
+ throws Exception
+ {
+ Connection conn;
+ Session sess;
+ Destination cons_dest;
+ int echo_id;
+ int num_msg;
+
+ num_msg = 5;
+
+ LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+ " messages)");
+
+
+ //
+ // Connect to the bus.
+ //
+
+ conn = createConnection(cons_broker_url);
+ conn.start();
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ //
+ // Create the destination on which messages are being tested.
+ //
+
+ LOG.trace("Creating destination");
+ cons_dest = sess.createTemporaryTopic();
+
+ testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+
+ //
+ // Cleanup
+ //
+
+ sess.close();
+ conn.close();
+ }
+
+
+ /**
+ * TEST TOPICS
+ */
+ public void testTopic (String prod_broker_url, String cons_broker_url)
+ throws Exception
+ {
+ int num_msg;
+
+ Connection conn;
+ Session sess;
+ String topic_name;
+
+ Destination cons_dest;
+
+ num_msg = 5;
+
+ LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+ " messages)");
+
+
+ //
+ // Connect to the bus.
+ //
+
+ conn = createConnection(cons_broker_url);
+ conn.start();
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ //
+ // Create the destination on which messages are being tested.
+ //
+
+ topic_name = "topotest2.perm.topic";
+ LOG.trace("Removing existing Topic");
+ removeTopic(conn, topic_name);
+ LOG.trace("Creating Topic, " + topic_name);
+ cons_dest = sess.createTopic(topic_name);
+
+ testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+
+ //
+ // Cleanup
+ //
+
+ removeTopic(conn, topic_name);
+ sess.close();
+ conn.close();
+ }
+
+
+ /**
+ * TEST TEMPORARY QUEUES
+ */
+ public void testTempQueue (String prod_broker_url, String cons_broker_url)
+ throws Exception
+ {
+ int echo_id;
+ int num_msg;
+
+ Connection conn;
+ Session sess;
+
+ Destination cons_dest;
+
+ num_msg = 5;
+
+ LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+ " messages)");
+
+
+ //
+ // Connect to the bus.
+ //
+
+ conn = createConnection(cons_broker_url);
+ conn.start();
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ //
+ // Create the destination on which messages are being tested.
+ //
+
+ LOG.trace("Creating destination");
+ cons_dest = sess.createTemporaryQueue();
+
+ testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+
+ //
+ // Cleanup
+ //
+
+ sess.close();
+ conn.close();
+ }
+
+
+ /**
+ * TEST QUEUES
+ */
+ public void testQueue (String prod_broker_url, String cons_broker_url)
+ throws Exception
+ {
+ int num_msg;
+
+ Connection conn;
+ Session sess;
+ String queue_name;
+
+ Destination cons_dest;
+
+ num_msg = 5;
+
+ LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+ " messages)");
+
+
+ //
+ // Connect to the bus.
+ //
+
+ conn = createConnection(cons_broker_url);
+ conn.start();
+ sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+ //
+ // Create the destination on which messages are being tested.
+ //
+
+ queue_name = "topotest2.perm.queue";
+ LOG.trace("Removing existing Queue");
+ removeQueue(conn, queue_name);
+ LOG.trace("Creating Queue, " + queue_name);
+ cons_dest = sess.createQueue(queue_name);
+
+ testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg);
+
+
+ //
+ // Cleanup
+ //
+
+ removeQueue(conn, queue_name);
+ sess.close();
+ conn.close();
+ }
+
+ @Test
+ public void run ()
+ throws Exception
+ {
+ Thread start1;
+ Thread start2;
+
+ testError = false;
+
+ // Use threads to avoid startup deadlock since the first broker started waits until
+ // it knows the name of the remote broker before finishing its startup, which means
+ // the remote must already be running.
+
+ start1 = new Thread() {
+ public void run()
+ {
+ try {
+ broker1.start();
+ } catch (Exception ex) {
+ LOG.error(null, ex);
+ }
+ }
+ };
+
+ start2 = new Thread() {
+ public void run()
+ {
+ try {
+ broker2.start();
+ } catch (Exception ex) {
+ LOG.error(null, ex);
+ }
+ }
+ };
+
+ start1.start();
+ start2.start();
+
+ start1.join();
+ start2.join();
+
+ if ( ! testError )
+ this.testTempTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+
+ if ( ! testError )
+ this.testTempQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+
+ if ( ! testError )
+ this.testTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+
+ if ( ! testError )
+ this.testQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl());
+
+ Thread.sleep(100);
+
+ shutdown();
+
+ assertTrue(! testError);
+ }
+
+ public void shutdown ()
+ throws Exception
+ {
+ broker1.stop();
+ broker2.stop();
+ }
+
+ /**
+ * @param args the command line arguments
+ */
+ public static void main(String[] args)
+ {
+ AMQ3274Test main_obj;
+
+ try
+ {
+ main_obj = new AMQ3274Test();
+ main_obj.run();
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+
+ LOG.error(null, ex);
+
+ System.exit(0);
+ }
+ }
+
+ protected Connection createConnection (String url)
+ throws Exception
+ {
+ return org.apache.activemq.ActiveMQConnection.makeConnection(url);
+ }
+
+ protected static void removeQueue (Connection conn, String dest_name)
+ throws java.lang.Exception
+ {
+ org.apache.activemq.command.ActiveMQDestination dest;
+
+ if ( conn instanceof org.apache.activemq.ActiveMQConnection )
+ {
+ dest = org.apache.activemq.command.ActiveMQDestination.
+ createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
+ ((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
+ }
+ }
+
+ protected static void removeTopic (Connection conn, String dest_name)
+ throws java.lang.Exception
+ {
+ org.apache.activemq.command.ActiveMQDestination dest;
+
+ if ( conn instanceof org.apache.activemq.ActiveMQConnection )
+ {
+ dest = org.apache.activemq.command.ActiveMQDestination.
+ createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
+ ((org.apache.activemq.ActiveMQConnection)conn).destroyDestination(dest);
+ }
+ }
+
+ public static String fmtMsgInfo (Message msg)
+ throws Exception
+ {
+ StringBuilder msg_desc;
+ String prop;
+ Enumeration prop_enum;
+
+ msg_desc = new StringBuilder();
+ msg_desc = new StringBuilder();
+
+ if ( msg instanceof TextMessage )
+ {
+ msg_desc.append(((TextMessage) msg).getText());
+ }
+ else
+ {
+ msg_desc.append("[");
+ msg_desc.append(msg.getClass().getName());
+ msg_desc.append("]");
+ }
+
+ prop_enum = msg.getPropertyNames();
+ while ( prop_enum.hasMoreElements() )
+ {
+ prop = (String) prop_enum.nextElement();
+ msg_desc.append("; ");
+ msg_desc.append(prop);
+ msg_desc.append("=");
+ msg_desc.append(msg.getStringProperty(prop));
+ }
+
+ return msg_desc.toString();
+ }
+
+//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+///////////////////////////////////////////////// INTERNAL CLASSES /////////////////////////////////////////////////
+//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
+
+ protected class EmbeddedTcpBroker
+ {
+ protected BrokerService brokerSvc;
+ protected int brokerNum;
+ protected String brokerName;
+ protected String brokerId;
+ protected int port;
+ protected String tcpUrl;
+
+ public EmbeddedTcpBroker ()
+ throws Exception
+ {
+ brokerSvc = new BrokerService();
+
+ synchronized ( this.getClass() )
+ {
+ brokerNum = Next_broker_num;
+ Next_broker_num++;
+ }
+
+ brokerName = "broker" + brokerNum;
+ brokerId = "b" + brokerNum;
+
+ brokerSvc.setBrokerName(brokerName);
+ brokerSvc.setBrokerId(brokerId);
+
+ brokerSvc.setPersistent(false);
+ brokerSvc.setUseJmx(false); // TBD
+
+ port = 60000 + ( brokerNum * 10 );
+
+ // Configure the transport connector (TCP)
+ tcpUrl = "tcp://127.0.0.1:" + Integer.toString(port);
+ brokerSvc.addConnector(tcpUrl);
+ }
+
+ public Connection createConnection ()
+ throws URISyntaxException, JMSException
+ {
+ Connection result;
+
+ result = org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl);
+
+ return result;
+ }
+
+ public String getConnectionUrl ()
+ {
+ return this.tcpUrl;
+ }
+
+
+ /**
+ * Create network connections to the given broker using the network-connector
+ * configuration of CORE brokers (e.g. core1.bus.dev1.coresys.tmcs)
+ *
+ * @param other
+ * @param duplex_f
+ */
+ public void coreConnectTo (EmbeddedTcpBroker other, boolean duplex_f)
+ throws Exception
+ {
+ this.makeConnectionTo(other, duplex_f, true);
+ this.makeConnectionTo(other, duplex_f, false);
+ }
+
+ public void start ()
+ throws Exception
+ {
+ brokerSvc.start();
+ //brokerSvc.waitUntilStarted();
+ }
+
+ public void stop ()
+ throws Exception
+ {
+ brokerSvc.stop();
+ }
+
+
+ /**
+ * Make one connection to the other embedded broker, of the specified type (queue or topic)
+ * using the standard CORE broker networking.
+ *
+ * @param other
+ * @param duplex_f
+ * @param queue_f
+ * @throws Exception
+ */
+ protected void makeConnectionTo (EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f)
+ throws Exception
+ {
+ NetworkConnector nw_conn;
+ String prefix;
+ ActiveMQDestination excl_dest;
+ ArrayList excludes;
+
+ nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
+ nw_conn.setDuplex(duplex_f);
+
+ if ( queue_f )
+ nw_conn.setConduitSubscriptions(false);
+ else
+ nw_conn.setConduitSubscriptions(true);
+
+ nw_conn.setNetworkTTL(5);
+ nw_conn.setSuppressDuplicateQueueSubscriptions(true);
+ nw_conn.setDecreaseNetworkConsumerPriority(true);
+ nw_conn.setBridgeTempDestinations(true);
+
+ if ( queue_f )
+ {
+ prefix = "queue";
+ excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
+ }
+ else
+ {
+ prefix = "topic";
+ excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
+ }
+
+ excludes = new ArrayList();
+ excludes.add(excl_dest);
+ nw_conn.setExcludedDestinations(excludes);
+
+ if ( duplex_f )
+ nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
+ else
+ nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
+
+ brokerSvc.addNetworkConnector(nw_conn);
+ }
+ }
+
+ protected class MessageClient extends java.lang.Thread
+ {
+ protected MessageConsumer msgCons;
+ protected boolean shutdownInd;
+ protected int expectedCount;
+ protected int lastSeq = 0;
+ protected int msgCount = 0;
+ protected boolean haveFirstSeq;
+ protected CountDownLatch shutdownLatch;
+
+ public MessageClient (MessageConsumer cons, int num_to_expect)
+ {
+ msgCons = cons;
+ expectedCount = ( num_to_expect * ( echoResponseFill + 1 ) );
+ shutdownLatch = new CountDownLatch(1);
+ }
+
+ public void run ()
+ {
+ CountDownLatch latch;
+
+ try
+ {
+ synchronized ( this )
+ {
+ latch = shutdownLatch;
+ }
+
+ shutdownInd = false;
+ processMessages();
+
+ latch.countDown();
+ }
+ catch ( Exception exc )
+ {
+ LOG.error("message client error", exc);
+ }
+ }
+
+ public void waitShutdown (long timeout)
+ {
+ CountDownLatch latch;
+
+ try
+ {
+ synchronized ( this )
+ {
+ latch = shutdownLatch;
+ }
+
+ if ( latch != null )
+ latch.await(timeout, TimeUnit.MILLISECONDS);
+ else
+ LOG.info("echo client shutdown: client does not appear to be active");
+ }
+ catch ( InterruptedException int_exc )
+ {
+ LOG.warn("wait for message client shutdown interrupted", int_exc);
+ }
+ }
+
+ public boolean shutdown ()
+ {
+ boolean down_ind;
+
+ if ( ! shutdownInd )
+ {
+ shutdownInd = true;
+ }
+
+ waitShutdown(200);
+
+ synchronized ( this )
+ {
+ if ( ( shutdownLatch == null ) || ( shutdownLatch.getCount() == 0 ) )
+ down_ind = true;
+ else
+ down_ind = false;
+ }
+
+ return down_ind;
+ }
+
+ public int getNumMsgReceived ()
+ {
+ return msgCount;
+ }
+
+ protected void processMessages ()
+ throws Exception
+ {
+ Message in_msg;
+
+ haveFirstSeq = false;
+
+ //
+ // Stop at shutdown time or after any test error is detected.
+ //
+
+ while ( ( ! shutdownInd ) && ( ! testError ) )
+ {
+ in_msg = msgCons.receive(100);
+
+ if ( in_msg != null )
+ {
+ msgCount++;
+ checkMessage(in_msg);
+ }
+ }
+ }
+
+ protected void checkMessage (Message in_msg)
+ throws Exception
+ {
+ int seq;
+
+ LOG.debug("received message " + fmtMsgInfo(in_msg));
+
+ //
+ // Only check messages with a sequence number.
+ //
+
+ if ( in_msg.propertyExists("SEQ") )
+ {
+ seq = in_msg.getIntProperty("SEQ");
+
+ if ( ( haveFirstSeq ) && ( seq != ( lastSeq + 1 ) ) )
+ {
+ LOG.error("***ERROR*** incorrect sequence number; expected " +
+ Integer.toString(lastSeq + 1) + " but have " +
+ Integer.toString(seq));
+
+ testError = true;
+ }
+
+ lastSeq = seq;
+
+ if ( msgCount > expectedCount )
+ {
+ LOG.warn("*** have more messages than expected; have " + msgCount +
+ "; expect " + expectedCount);
+
+ testError = true;
+ }
+ }
+
+ if ( in_msg.propertyExists("end-of-response") )
+ {
+ LOG.trace("received end-of-response message");
+ shutdownInd = true;
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ protected class EchoService extends java.lang.Thread
+ {
+ protected String destName;
+ protected Connection jmsConn;
+ protected Session sess;
+ protected MessageConsumer msg_cons;
+ protected boolean Shutdown_ind;
+
+ protected Destination req_dest;
+ protected Destination resp_dest;
+ protected MessageProducer msg_prod;
+
+ protected CountDownLatch waitShutdown;
+
+ public EchoService (String dest, Connection broker_conn)
+ throws Exception
+ {
+ destName = dest;
+ jmsConn = broker_conn;
+
+ Shutdown_ind = false;
+
+ sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ req_dest = sess.createQueue(destName);
+ msg_cons = sess.createConsumer(req_dest);
+
+ jmsConn.start();
+
+ waitShutdown = new CountDownLatch(1);
+ }
+
+ public EchoService (String dest, String broker_url)
+ throws Exception
+ {
+ this(dest, ActiveMQConnection.makeConnection(broker_url));
+ }
+
+ public void run ()
+ {
+ Message req;
+
+ try
+ {
+ LOG.info("STARTING ECHO SERVICE");
+
+ while ( ! Shutdown_ind )
+ {
+ req = msg_cons.receive(100);
+ if ( req != null )
+ {
+ if ( LOG.isDebugEnabled() )
+ LOG.debug("ECHO request message " + req.toString());
+
+ resp_dest = req.getJMSReplyTo();
+ if ( resp_dest != null )
+ {
+ msg_prod = sess.createProducer(resp_dest);
+ msg_prod.send(req);
+ msg_prod.close();
+ msg_prod = null;
+ }
+ else
+ {
+ LOG.warn("invalid request: no reply-to destination given");
+ }
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ LOG.error(null, ex);
+ }
+ finally
+ {
+ LOG.info("shutting down test echo service");
+
+ try
+ {
+ jmsConn.stop();
+ }
+ catch ( javax.jms.JMSException jms_exc )
+ {
+ LOG.warn("error on shutting down JMS connection", jms_exc);
+ }
+
+ synchronized ( this )
+ {
+ waitShutdown.countDown();
+ }
+ }
+ }
+
+
+ /**
+ * Shut down the service, waiting up to 3 seconds for the service to terminate.
+ */
+ public void shutdown ()
+ {
+ CountDownLatch wait_l;
+
+ synchronized ( this )
+ {
+ wait_l = waitShutdown;
+ }
+
+ Shutdown_ind = true;
+
+ try
+ {
+ if ( wait_l != null )
+ {
+ if ( wait_l.await(3000, TimeUnit.MILLISECONDS) )
+ LOG.info("echo service shutdown complete");
+ else
+ LOG.warn("timeout waiting for echo service shutdown");
+ }
+ else
+ {
+ LOG.info("echo service shutdown: service does not appear to be active");
+ }
+ }
+ catch ( InterruptedException int_exc )
+ {
+ LOG.warn("interrupted while waiting for echo service shutdown");
+ }
+ }
+ }
+}