You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2012/09/27 22:43:03 UTC

svn commit: r1391204 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest.java

Author: gtully
Date: Thu Sep 27 20:43:03 2012
New Revision: 1391204

URL: http://svn.apache.org/viewvc?rev=1391204&view=rev
Log:
add validation of request/reply over network with temp topic replyto, fine with duplex network connectors

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest.java   (with props)

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest.java?rev=1391204&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest.java Thu Sep 27 20:43:03 2012
@@ -0,0 +1,1037 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+
+public class RequestReplyToTopicViaThreeNetworkHopsTest {
+    protected static final int CONCURRENT_CLIENT_COUNT = 5;
+    protected static final int CONCURRENT_SERVER_COUNT = 5;
+    protected static final int TOTAL_CLIENT_ITER = 10;
+
+    protected static int Next_broker_num = 0;
+    protected EmbeddedTcpBroker edge1;
+    protected EmbeddedTcpBroker edge2;
+    protected EmbeddedTcpBroker core1;
+    protected EmbeddedTcpBroker core2;
+
+    protected boolean testError = false;
+    protected boolean fatalTestError = false;
+
+    protected int echoResponseFill = 0;   // Number of "filler" response messages per request
+
+    protected static Log LOG;
+
+    static {
+        LOG = LogFactory.getLog(RequestReplyToTopicViaThreeNetworkHopsTest.class);
+    }
+
+    public RequestReplyToTopicViaThreeNetworkHopsTest()
+            throws Exception {
+        edge1 = new EmbeddedTcpBroker("edge", 1);
+        edge2 = new EmbeddedTcpBroker("edge", 2);
+        core1 = new EmbeddedTcpBroker("core", 1);
+        core2 = new EmbeddedTcpBroker("core", 2);
+
+        // duplex is necessary to serialise sends with consumer/destination creation
+        edge1.coreConnectTo(core1, true);
+        edge2.coreConnectTo(core2, true);
+        core1.coreConnectTo(core2, true);
+
+    }
+
+    public void logMessage(String msg) {
+        System.out.println(msg);
+        System.out.flush();
+    }
+
+    public void testMessages(Session sess, MessageProducer req_prod, Destination resp_dest, int num_msg)
+            throws Exception {
+        MessageConsumer resp_cons;
+        TextMessage msg;
+        MessageClient cons_client;
+        int cur;
+        int tot_expected;
+
+        resp_cons = sess.createConsumer(resp_dest);
+
+        cons_client = new MessageClient(resp_cons, num_msg);
+        cons_client.start();
+
+        cur = 0;
+        while ((cur < num_msg) && (!fatalTestError)) {
+            msg = sess.createTextMessage("MSG AAAA " + cur);
+            msg.setIntProperty("SEQ", 100 + cur);
+            msg.setStringProperty("TEST", "TOPO");
+            msg.setJMSReplyTo(resp_dest);
+
+            if (cur == (num_msg - 1))
+                msg.setBooleanProperty("end-of-response", true);
+
+            sendWithRetryOnDeletedDest(req_prod, msg);
+            LOG.debug("Sent:" + msg);
+
+            cur++;
+        }
+
+        //
+        // Give the consumer some time to receive the response.
+        //
+        cons_client.waitShutdown(5000);
+
+        //
+        // Now shutdown the consumer if it's still running.
+        //
+        if (cons_client.shutdown())
+            LOG.debug("Consumer client shutdown complete");
+        else
+            LOG.debug("Consumer client shutdown incomplete!!!");
+
+
+        //
+        // Check that the correct number of messages was received.
+        //
+        tot_expected = num_msg * (echoResponseFill + 1);
+
+        if (cons_client.getNumMsgReceived() == tot_expected) {
+            LOG.debug("Have " + tot_expected + " messages, as-expected");
+        } else {
+            testError = true;
+
+            if (cons_client.getNumMsgReceived() == 0)
+                fatalTestError = true;
+
+            LOG.error("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected +
+                    " on destination " + resp_dest);
+        }
+
+        resp_cons.close();
+    }
+
+    protected void sendWithRetryOnDeletedDest(MessageProducer prod, Message msg)
+            throws JMSException {
+        try {
+            if (LOG.isDebugEnabled())
+                LOG.debug("SENDING REQUEST message " + msg);
+
+            prod.send(msg);
+        } catch (JMSException jms_exc) {
+            System.out.println("AAA: " + jms_exc.getMessage());
+            throw jms_exc;
+        }
+    }
+
+    /**
+     * Test one destination between the given "producer broker" and "consumer broker" specified.
+     */
+    public void testOneDest(Connection conn, Session sess, Destination cons_dest, int num_msg)
+            throws Exception {
+        Destination prod_dest;
+        MessageProducer msg_prod;
+
+
+        //
+        // Create the Producer to the echo request Queue
+        //
+        LOG.trace("Creating echo queue and producer");
+        prod_dest = sess.createQueue("echo");
+        msg_prod = sess.createProducer(prod_dest);
+
+
+        //
+        // Pass messages around.
+        //
+        testMessages(sess, msg_prod, cons_dest, num_msg);
+
+        msg_prod.close();
+    }
+
+
+    /**
+     * TEST TEMPORARY TOPICS
+     */
+    public void testTempTopic(String prod_broker_url, String cons_broker_url)
+            throws Exception {
+        Connection conn;
+        Session sess;
+        Destination cons_dest;
+        int echo_id;
+        int num_msg;
+
+        num_msg = 5;
+
+        LOG.debug("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+                " messages)");
+
+
+        //
+        // Connect to the bus.
+        //
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+        //
+        // Create the destination on which messages are being tested.
+        //
+
+        LOG.trace("Creating destination");
+        cons_dest = sess.createTemporaryTopic();
+
+        testOneDest(conn, sess, cons_dest, num_msg);
+
+
+        //
+        // Cleanup
+        //
+
+        sess.close();
+        conn.close();
+    }
+
+
+    /**
+     * TEST TOPICS
+     */
+    public void testTopic(String prod_broker_url, String cons_broker_url)
+            throws Exception {
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+        String topic_name;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+                " messages)");
+
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+        //
+        // Create the destination on which messages are being tested.
+        //
+
+        topic_name = "topotest2.perm.topic";
+        LOG.trace("Removing existing Topic");
+        removeTopic(conn, topic_name);
+        LOG.trace("Creating Topic, " + topic_name);
+        cons_dest = sess.createTopic(topic_name);
+
+        testOneDest(conn, sess, cons_dest, num_msg);
+
+
+        //
+        // Cleanup
+        //
+
+        removeTopic(conn, topic_name);
+        sess.close();
+        conn.close();
+    }
+
+
+    /**
+     * TEST TEMPORARY QUEUES
+     */
+    public void testTempQueue(String prod_broker_url, String cons_broker_url)
+            throws Exception {
+        int echo_id;
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+                " messages)");
+
+
+        //
+        // Connect to the bus.
+        //
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+        //
+        // Create the destination on which messages are being tested.
+        //
+
+        LOG.trace("Creating destination");
+        cons_dest = sess.createTemporaryQueue();
+
+        testOneDest(conn, sess, cons_dest, num_msg);
+
+
+        //
+        // Cleanup
+        //
+
+        sess.close();
+        conn.close();
+    }
+
+
+    /**
+     * TEST QUEUES
+     */
+    public void testQueue(String prod_broker_url, String cons_broker_url)
+            throws Exception {
+        int num_msg;
+
+        Connection conn;
+        Session sess;
+        String queue_name;
+
+        Destination cons_dest;
+
+        num_msg = 5;
+
+        LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg +
+                " messages)");
+
+
+        conn = createConnection(cons_broker_url);
+        conn.start();
+        sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+
+        //
+        // Create the destination on which messages are being tested.
+        //
+        queue_name = "topotest2.perm.queue";
+        LOG.trace("Removing existing Queue");
+        removeQueue(conn, queue_name);
+        LOG.trace("Creating Queue, " + queue_name);
+        cons_dest = sess.createQueue(queue_name);
+
+        testOneDest(conn, sess, cons_dest, num_msg);
+
+
+        removeQueue(conn, queue_name);
+        sess.close();
+        conn.close();
+    }
+
+    @Test
+    public void runWithTempTopicReplyTo()
+            throws Exception {
+        EchoService echo_svc;
+        TopicTrafficGenerator traffic_gen;
+        Thread start1;
+        Thread start2;
+        Thread start3;
+        Thread start4;
+        ThreadPoolExecutor clientExecPool;
+        final CountDownLatch clientCompletionLatch;
+        int iter;
+
+        fatalTestError = false;
+        testError = false;
+
+        //
+        // Execute up to 20 clients at a time to simulate that load.
+        //
+
+        clientExecPool = new ThreadPoolExecutor(CONCURRENT_CLIENT_COUNT, CONCURRENT_CLIENT_COUNT,
+                0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10000));
+        clientCompletionLatch = new CountDownLatch(TOTAL_CLIENT_ITER);
+
+
+        // Use threads to avoid startup deadlock since the first broker started waits until
+        //	it knows the name of the remote broker before finishing its startup, which means
+        //	the remote must already be running.
+
+        start1 = new Thread() {
+            public void run() {
+                try {
+                    edge1.start();
+                } catch (Exception ex) {
+                    LOG.error(null, ex);
+                }
+            }
+        };
+
+        start2 = new Thread() {
+            public void run() {
+                try {
+                    edge2.start();
+                } catch (Exception ex) {
+                    LOG.error(null, ex);
+                }
+            }
+        };
+
+        start3 = new Thread() {
+            public void run() {
+                try {
+                    core1.start();
+                } catch (Exception ex) {
+                    LOG.error(null, ex);
+                }
+            }
+        };
+
+        start4 = new Thread() {
+            public void run() {
+                try {
+                    core2.start();
+                } catch (Exception ex) {
+                    LOG.error(null, ex);
+                }
+            }
+        };
+
+        start1.start();
+        start2.start();
+        start3.start();
+        start4.start();
+
+        start1.join();
+        start2.join();
+        start3.join();
+        start4.join();
+
+        traffic_gen = new TopicTrafficGenerator(edge1.getConnectionUrl(), edge2.getConnectionUrl());
+        traffic_gen.start();
+
+
+        //
+        // Now start the echo service with that queue.
+        //
+        echo_svc = new EchoService("echo", edge1.getConnectionUrl());
+        echo_svc.start();
+
+
+        //
+        // Run the tests on Temp Topics.
+        //
+
+        LOG.info("** STARTING TEMP TOPIC TESTS");
+        iter = 0;
+        while ((iter < TOTAL_CLIENT_ITER) && (!fatalTestError)) {
+            clientExecPool.execute(new Runnable() {
+                public void run() {
+                    try {
+                        RequestReplyToTopicViaThreeNetworkHopsTest.this.testTempTopic(edge1.getConnectionUrl(),
+                                edge2.getConnectionUrl());
+                    } catch (Exception exc) {
+                        LOG.error("test exception", exc);
+                        fatalTestError = true;
+                        testError = true;
+                    }
+
+                    clientCompletionLatch.countDown();
+                }
+            });
+
+            iter++;
+        }
+
+        boolean allDoneOnTime = clientCompletionLatch.await(20, TimeUnit.MINUTES);
+
+        LOG.info("** FINISHED TEMP TOPIC TESTS AFTER " + iter + " ITERATIONS, testError:" + testError + ", fatal: " + fatalTestError + ", onTime:" + allDoneOnTime);
+
+        Thread.sleep(100);
+
+        echo_svc.shutdown();
+        traffic_gen.shutdown();
+
+        shutdown();
+
+        assertTrue("test completed in time", allDoneOnTime);
+        assertTrue("no errors", !testError);
+    }
+
+    public void shutdown()
+            throws Exception {
+        edge1.stop();
+        edge2.stop();
+        core1.stop();
+        core2.stop();
+    }
+
+    protected Connection createConnection(String url)
+            throws Exception {
+        return org.apache.activemq.ActiveMQConnection.makeConnection(url);
+    }
+
+    protected static void removeQueue(Connection conn, String dest_name)
+            throws java.lang.Exception {
+        org.apache.activemq.command.ActiveMQDestination dest;
+
+        if (conn instanceof org.apache.activemq.ActiveMQConnection) {
+            dest = org.apache.activemq.command.ActiveMQDestination.
+                    createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE);
+            ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
+        }
+    }
+
+    protected static void removeTopic(Connection conn, String dest_name)
+            throws java.lang.Exception {
+        org.apache.activemq.command.ActiveMQDestination dest;
+
+        if (conn instanceof org.apache.activemq.ActiveMQConnection) {
+            dest = org.apache.activemq.command.ActiveMQDestination.
+                    createDestination(dest_name, (byte) org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE);
+            ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest);
+        }
+    }
+
+    public static String fmtMsgInfo(Message msg)
+            throws Exception {
+        StringBuilder msg_desc;
+        String prop;
+        Enumeration prop_enum;
+
+        msg_desc = new StringBuilder();
+        msg_desc = new StringBuilder();
+
+        if (msg instanceof TextMessage) {
+            msg_desc.append(((TextMessage) msg).getText());
+        } else {
+            msg_desc.append("[");
+            msg_desc.append(msg.getClass().getName());
+            msg_desc.append("]");
+        }
+
+        prop_enum = msg.getPropertyNames();
+        while (prop_enum.hasMoreElements()) {
+            prop = (String) prop_enum.nextElement();
+            msg_desc.append("; ");
+            msg_desc.append(prop);
+            msg_desc.append("=");
+            msg_desc.append(msg.getStringProperty(prop));
+        }
+
+        return msg_desc.toString();
+    }
+
+    protected class EmbeddedTcpBroker {
+        protected BrokerService brokerSvc;
+        protected int brokerNum;
+        protected String brokerName;
+        protected String brokerId;
+        protected int port;
+        protected String tcpUrl;
+        protected String fullUrl;
+
+        public EmbeddedTcpBroker(String name, int number)
+                throws Exception {
+            brokerSvc = new BrokerService();
+
+            synchronized (this.getClass()) {
+                brokerNum = Next_broker_num;
+                Next_broker_num++;
+            }
+
+            brokerName = name + number;
+            brokerId = brokerName;
+
+            brokerSvc.setBrokerName(brokerName);
+            brokerSvc.setBrokerId(brokerId);
+
+            brokerSvc.setPersistent(false);
+            brokerSvc.setUseJmx(false);
+
+            port = 60000 + (brokerNum * 10);
+
+            tcpUrl = "tcp://127.0.0.1:" + Integer.toString(port);
+            fullUrl = tcpUrl + "?jms.watchTopicAdvisories=false";
+
+            brokerSvc.addConnector(tcpUrl);
+        }
+
+        public Connection createConnection()
+                throws URISyntaxException, JMSException {
+            Connection result;
+
+            result = org.apache.activemq.ActiveMQConnection.makeConnection(this.fullUrl);
+
+            return result;
+        }
+
+        public String getConnectionUrl() {
+            return this.fullUrl;
+        }
+
+
+        public void coreConnectTo(EmbeddedTcpBroker other, boolean duplex_f)
+                throws Exception {
+            this.makeConnectionTo(other, duplex_f, true);
+            this.makeConnectionTo(other, duplex_f, false);
+        }
+
+        public void start()
+                throws Exception {
+            brokerSvc.start();
+            brokerSvc.waitUntilStarted();
+        }
+
+        public void stop()
+                throws Exception {
+            brokerSvc.stop();
+        }
+
+
+        protected void makeConnectionTo(EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f)
+                throws Exception {
+            NetworkConnector nw_conn;
+            String prefix;
+            ActiveMQDestination excl_dest;
+            ArrayList excludes;
+
+            nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")"));
+            nw_conn.setDuplex(duplex_f);
+
+            if (queue_f)
+                nw_conn.setConduitSubscriptions(false);
+            else
+                nw_conn.setConduitSubscriptions(true);
+
+            nw_conn.setNetworkTTL(3);
+            nw_conn.setSuppressDuplicateQueueSubscriptions(true);
+            nw_conn.setDecreaseNetworkConsumerPriority(true);
+            nw_conn.setBridgeTempDestinations(queue_f);
+
+            if (queue_f) {
+                prefix = "queue";
+                excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE);
+            } else {
+                prefix = "topic";
+                excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE);
+            }
+
+            excludes = new ArrayList();
+            excludes.add(excl_dest);
+            nw_conn.setExcludedDestinations(excludes);
+
+            if (duplex_f)
+                nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId);
+            else
+                nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId);
+
+            brokerSvc.addNetworkConnector(nw_conn);
+        }
+    }
+
+    protected class MessageClient extends java.lang.Thread {
+        protected MessageConsumer msgCons;
+        protected boolean shutdownInd;
+        protected int expectedCount;
+        protected int lastSeq = 0;
+        protected int msgCount = 0;
+        protected boolean haveFirstSeq;
+        protected CountDownLatch shutdownLatch;
+
+        public MessageClient(MessageConsumer cons, int num_to_expect) {
+            msgCons = cons;
+            expectedCount = (num_to_expect * (echoResponseFill + 1));
+            shutdownLatch = new CountDownLatch(1);
+        }
+
+        public void run() {
+            CountDownLatch latch;
+
+            try {
+                synchronized (this) {
+                    latch = shutdownLatch;
+                }
+
+                shutdownInd = false;
+                processMessages();
+
+                latch.countDown();
+            } catch (Exception exc) {
+                LOG.error("message client error", exc);
+            }
+        }
+
+        public void waitShutdown(long timeout) {
+            CountDownLatch latch;
+
+            try {
+                synchronized (this) {
+                    latch = shutdownLatch;
+                }
+
+                if (latch != null)
+                    latch.await(timeout, TimeUnit.MILLISECONDS);
+                else
+                    LOG.info("echo client shutdown: client does not appear to be active");
+            } catch (InterruptedException int_exc) {
+                LOG.warn("wait for message client shutdown interrupted", int_exc);
+            }
+        }
+
+        public boolean shutdown() {
+            boolean down_ind;
+
+            if (!shutdownInd) {
+                shutdownInd = true;
+            }
+
+            waitShutdown(200);
+
+            synchronized (this) {
+                if ((shutdownLatch == null) || (shutdownLatch.getCount() == 0))
+                    down_ind = true;
+                else
+                    down_ind = false;
+            }
+
+            return down_ind;
+        }
+
+        public int getNumMsgReceived() {
+            return msgCount;
+        }
+
+        protected void processMessages()
+                throws Exception {
+            Message in_msg;
+
+            haveFirstSeq = false;
+
+            //
+            // Stop at shutdown time or after any test error is detected.
+            //
+
+            while ((!shutdownInd) && (!fatalTestError)) {
+                in_msg = msgCons.receive(100);
+
+                if (in_msg != null) {
+                    msgCount++;
+                    checkMessage(in_msg);
+                }
+            }
+
+            msgCons.close();
+        }
+
+        protected void checkMessage(Message in_msg)
+                throws Exception {
+            int seq;
+
+            LOG.debug("received message " + fmtMsgInfo(in_msg) + " from " + in_msg.getJMSDestination());
+
+            //
+            // Only check messages with a sequence number.
+            //
+
+            if (in_msg.propertyExists("SEQ")) {
+                seq = in_msg.getIntProperty("SEQ");
+
+                if ((haveFirstSeq) && (seq != (lastSeq + 1))) {
+                    LOG.error("***ERROR*** incorrect sequence number; expected " +
+                            Integer.toString(lastSeq + 1) + " but have " +
+                            Integer.toString(seq));
+
+                    testError = true;
+                }
+
+                lastSeq = seq;
+
+                if (msgCount > expectedCount) {
+                    LOG.error("*** have more messages than expected; have " + msgCount +
+                            "; expect " + expectedCount);
+
+                    testError = true;
+                }
+            }
+
+            if (in_msg.propertyExists("end-of-response")) {
+                LOG.trace("received end-of-response message");
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    protected class EchoService extends java.lang.Thread {
+        protected String destName;
+        protected Connection jmsConn;
+        protected Session sess;
+        protected MessageConsumer msg_cons;
+        protected boolean Shutdown_ind;
+
+        protected Destination req_dest;
+
+        protected CountDownLatch waitShutdown;
+
+        protected ThreadPoolExecutor processorPool;
+
+        public EchoService(String dest, Connection broker_conn)
+                throws Exception {
+            destName = dest;
+            jmsConn = broker_conn;
+
+            Shutdown_ind = false;
+
+            sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            req_dest = sess.createQueue(destName);
+            msg_cons = sess.createConsumer(req_dest);
+
+            jmsConn.start();
+
+            waitShutdown = new CountDownLatch(1);
+
+            processorPool = new ThreadPoolExecutor(CONCURRENT_SERVER_COUNT, CONCURRENT_SERVER_COUNT,
+                    0, TimeUnit.SECONDS,
+                    new ArrayBlockingQueue<Runnable>(10000));
+        }
+
+        public EchoService(String dest, String broker_url)
+                throws Exception {
+            this(dest, ActiveMQConnection.makeConnection(broker_url));
+        }
+
+        public void run() {
+            Message req;
+
+            try {
+                LOG.info("STARTING ECHO SERVICE");
+
+                while (!Shutdown_ind) {
+                    req = msg_cons.receive(100);
+                    if (req != null) {
+                        processorPool.execute(new EchoRequestProcessor(sess, req));
+                    }
+                }
+            } catch (Exception ex) {
+                LOG.error("error processing echo service requests", ex);
+            } finally {
+                LOG.info("shutting down test echo service");
+
+                try {
+                    jmsConn.stop();
+                } catch (javax.jms.JMSException jms_exc) {
+                    LOG.warn("error on shutting down JMS connection", jms_exc);
+                }
+
+                synchronized (this) {
+                    waitShutdown.countDown();
+                }
+            }
+        }
+
+
+        /**
+         * Shut down the service, waiting up to 3 seconds for the service to terminate.
+         */
+        public void shutdown() {
+            CountDownLatch wait_l;
+
+            synchronized (this) {
+                wait_l = waitShutdown;
+            }
+
+            Shutdown_ind = true;
+
+            try {
+                if (wait_l != null) {
+                    if (wait_l.await(3000, TimeUnit.MILLISECONDS))
+                        LOG.info("echo service shutdown complete");
+                    else
+                        LOG.warn("timeout waiting for echo service shutdown");
+                } else {
+                    LOG.info("echo service shutdown: service does not appear to be active");
+                }
+            } catch (InterruptedException int_exc) {
+                LOG.warn("interrupted while waiting for echo service shutdown");
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    protected class EchoRequestProcessor implements Runnable {
+        protected Session session;
+
+        protected Destination resp_dest;
+        protected MessageProducer msg_prod;
+
+        protected Message request;
+
+        public EchoRequestProcessor(Session sess, Message req)
+                throws Exception {
+            this.session = sess;
+            this.request = req;
+
+            this.resp_dest = req.getJMSReplyTo();
+
+            if (resp_dest == null) {
+                throw new Exception("invalid request: no reply-to destination given");
+            }
+
+            this.msg_prod = session.createProducer(this.resp_dest);
+        }
+
+        public void run() {
+            try {
+                this.processRequest(this.request);
+            } catch (Exception ex) {
+                LOG.error("Failed to process request", ex);
+            }
+        }
+
+        /**
+         * Process one request for the Echo Service.
+         */
+        protected void processRequest(Message req)
+                throws Exception {
+            if (LOG.isDebugEnabled())
+                LOG.debug("ECHO request message " + req.toString());
+
+            resp_dest = req.getJMSReplyTo();
+            if (resp_dest != null) {
+                msg_prod = session.createProducer(resp_dest);
+
+                LOG.debug("SENDING ECHO RESPONSE to:" + resp_dest);
+
+                msg_prod.send(req);
+
+                LOG.debug((((ActiveMQSession) session).getConnection()).getBrokerName() + " SENT ECHO RESPONSE to " + resp_dest);
+
+                msg_prod.close();
+                msg_prod = null;
+            } else {
+                LOG.warn("invalid request: no reply-to destination given");
+            }
+        }
+    }
+
+    protected class TopicTrafficGenerator extends java.lang.Thread {
+        protected Connection conn1;
+        protected Connection conn2;
+        protected Session sess1;
+        protected Session sess2;
+        protected Destination dest;
+        protected MessageProducer prod;
+        protected MessageConsumer cons;
+        protected boolean Shutdown_ind;
+        protected int send_count;
+
+        public TopicTrafficGenerator(String url1, String url2)
+                throws Exception {
+            conn1 = createConnection(url1);
+            conn2 = createConnection(url2);
+
+            sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            conn1.start();
+            conn2.start();
+
+            dest = sess1.createTopic("traffic");
+            prod = sess1.createProducer(dest);
+
+            dest = sess2.createTopic("traffic");
+            cons = sess2.createConsumer(dest);
+        }
+
+        public void shutdown() {
+            Shutdown_ind = true;
+        }
+
+        public void run() {
+            Message msg;
+
+            try {
+                LOG.info("Starting Topic Traffic Generator");
+
+                while (!Shutdown_ind) {
+                    msg = sess1.createTextMessage("TRAFFIC");
+
+                    prod.send(msg);
+
+                    send_count++;
+
+                    //
+                    // Time out the receipt; early messages may not make it.
+                    //
+
+                    msg = cons.receive(250);
+                }
+            } catch (JMSException jms_exc) {
+                LOG.warn("traffic generator failed on jms exception", jms_exc);
+            } finally {
+                LOG.info("Shutdown of Topic Traffic Generator; send count = " + send_count);
+
+                if (conn1 != null) {
+                    try {
+                        conn1.stop();
+                    } catch (JMSException jms_exc) {
+                        LOG.warn("failed to shutdown connection", jms_exc);
+                    }
+                }
+
+                if (conn2 != null) {
+                    try {
+                        conn2.stop();
+                    } catch (JMSException jms_exc) {
+                        LOG.warn("failed to shutdown connection", jms_exc);
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/RequestReplyToTopicViaThreeNetworkHopsTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date