You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/12 18:25:49 UTC

svn commit: r620871 - in /incubator/qpid/branches/thegreatmerge/qpid: ./ dotnet/Qpid.Client/Client/ dotnet/Qpid.Integration.Tests/testcases/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/handler/ jav...

Author: aidan
Date: Tue Feb 12 09:25:44 2008
New Revision: 620871

URL: http://svn.apache.org/viewvc?rev=620871&view=rev
Log:
Additonal fixes to perftests as well were necesary, plus these merges:

Merged revisions 619868,620495-620496 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1

........
  r619868 | aidan | 2008-02-08 12:52:54 +0000 (Fri, 08 Feb 2008) | 4 lines
  
  QPID-588: change instances of trace() and isTraceEnabled to debug equivalent to support older versions of log4j
........
  r620495 | rupertlssmith | 2008-02-11 14:48:35 +0000 (Mon, 11 Feb 2008) | 1 line
  
  QPID-730 : Changed durable subscription test, to ensure re-connection happens under the same client name.
........
  r620496 | rupertlssmith | 2008-02-11 14:50:18 +0000 (Mon, 11 Feb 2008) | 1 line
  
  QPID-729 : Added explicit list of unacked messages, acked on commit, rejected on roll-back.
........

Modified:
    incubator/qpid/branches/thegreatmerge/qpid/   (props changed)
    incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
    incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
    incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
    incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
    incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java
    incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml
    incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
    incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
    incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
    incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
    incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
    incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java

Propchange: incubator/qpid/branches/thegreatmerge/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Tue Feb 12 09:25:44 2008
@@ -457,7 +457,7 @@
                 foreach (BasicMessageConsumer consumer  in _consumers.Values)
                 {
                     // Sends acknowledgement to server.
-                    consumer.AcknowledgeLastDelivered();
+                    consumer.AcknowledgeDelivered();
                 }
 
                 // Commits outstanding messages sent and outstanding acknowledgements.
@@ -485,13 +485,16 @@
                     {
                         Suspend(true);
                     }
-                 
-                    // todo: rollback dispatcher when TX support is added
-                    //if ( _dispatcher != null )
-                    //   _dispatcher.Rollback();
 
-                    _connection.ConvenientProtocolWriter.SyncWrite(
-                                                                   TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+                    // Reject up to message last delivered (if any) for each consumer.
+                    // Need to send reject for messages delivered to consumers so far.
+                    foreach (BasicMessageConsumer consumer  in _consumers.Values)
+                    {
+                        // Sends acknowledgement to server.
+                        consumer.RejectUnacked();
+                    }
+
+                    _connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
 
                     if ( !suspended )
                     {
@@ -1012,6 +1015,15 @@
             _connection.ProtocolWriter.Write(ackFrame);
         }
 
+        public void RejectMessage(ulong deliveryTag, bool requeue)
+        {
+            if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted))
+            {
+                AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue);
+                _connection.ProtocolWriter.Write(rejectFrame);
+            }
+        }
+        
         /// <summary>
         /// Handle a message that bounced from the server, creating
         /// the corresponding exception and notifying the connection about it
@@ -1104,8 +1116,8 @@
         /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on 
         /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks>
         ///
-        /// <remarks>Exception swalled, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should
-        /// fall through and termiante the loop, as it is a bug if it occurrs.</remarks>
+        /// <remarks>Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should
+        /// fall through and terminate the loop, as it is a bug if it occurrs.</remarks>
         private class Dispatcher
         {            
             /// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary>

Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs Tue Feb 12 09:25:44 2008
@@ -20,6 +20,8 @@
  */
 using System;
 using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
 using log4net;
 using Apache.Qpid.Client.Message;
 using Apache.Qpid.Collections;
@@ -106,10 +108,15 @@
 
         private AmqChannel _channel;
 
+        // <summary>
+        // Tag of last message delievered, whoch should be acknowledged on commit in transaction mode.
+        // </summary>
+        //private long _lastDeliveryTag;
+
         /// <summary>
-        /// Tag of last message delievered, whoch should be acknowledged on commit in transaction mode.
+        /// Explicit list of all received but un-acked messages in a transaction. Used to ensure acking is completed when transaction is committed.
         /// </summary>
-        private long _lastDeliveryTag;
+        private LinkedList<long> _receivedDeliveryTags;
 
         /// <summary>
         /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
@@ -135,6 +142,11 @@
             _prefetchHigh = prefetchHigh;
             _prefetchLow = prefetchLow;
             _exclusive = exclusive;
+
+            if (_acknowledgeMode == AcknowledgeMode.SessionTransacted)
+            {
+                _receivedDeliveryTags = new LinkedList<long>();
+            }
         }
 
         #region IMessageConsumer Members
@@ -391,13 +403,24 @@
         /// <summary>
         /// Acknowledge up to last message delivered (if any). Used when commiting.
         /// </summary>
-        internal void AcknowledgeLastDelivered()
+        internal void AcknowledgeDelivered()
         {
-            if (_lastDeliveryTag > 0)
+            foreach (long tag in _receivedDeliveryTags)
             {
-                _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // XXX evil cast
-                _lastDeliveryTag = -1;
+                _channel.AcknowledgeMessage((ulong)tag, false);
             }
+
+            _receivedDeliveryTags.Clear();
+        }
+
+        internal void RejectUnacked()
+        {
+            foreach (long tag in _receivedDeliveryTags)
+            {
+                _channel.RejectMessage((ulong)tag, true);
+            }
+
+            _receivedDeliveryTags.Clear();
         }
 
         private void PreDeliver(AbstractQmsMessage msg)
@@ -442,7 +465,7 @@
                     break;
                 
                 case AcknowledgeMode.SessionTransacted:
-                    _lastDeliveryTag = msg.DeliveryTag;
+                    _receivedDeliveryTags.AddLast(msg.DeliveryTag);
                     break;
             }
         }

Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs Tue Feb 12 09:25:44 2008
@@ -78,6 +78,8 @@
             SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
                           true, "TestSubscription" + testId);
 
+            Thread.Sleep(500);
+
             // Send messages and receive on both consumers.
             testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
 
@@ -93,15 +95,15 @@
             ConsumeNMessagesOnly(1, "B", testConsumer[1]);
 
             // Re-attach consumer, check that it gets the messages that it missed.
-            SetUpEndPoint(3, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
+            SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
                           true, "TestSubscription" + testId);
 
-            ConsumeNMessagesOnly(1, "B", testConsumer[3]);
+            ConsumeNMessagesOnly(1, "B", testConsumer[2]);
 
             // Clean up any open consumers at the end of the test.
-            CloseEndPoint(0);
+            CloseEndPoint(2);
             CloseEndPoint(1);
-            CloseEndPoint(3);
+            CloseEndPoint(0);
         }
 
         /// <summary> Check that an uncommitted receive can be re-received, on re-consume from the same durable subscription. </summary>
@@ -122,13 +124,13 @@
 
             // Close end-point 1 without committing the message, then re-open the subscription to consume again.
             CloseEndPoint(1);
-            SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
+            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
                           true, false, null);
 
             // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
-            ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
 
-            CloseEndPoint(2);
+            CloseEndPoint(1);
             CloseEndPoint(0);
         }
 
@@ -151,13 +153,13 @@
 
             // Close end-point 1 without committing the message, then re-open the subscription to consume again.
             CloseEndPoint(1);
-            SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
+            SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT, 
                           true, false, null);
 
             // Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
-            ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+            ConsumeNMessagesOnly(1, "A", testConsumer[1]);
 
-            CloseEndPoint(2);
+            CloseEndPoint(1);
             CloseEndPoint(0);
         }
     }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Feb 12 09:25:44 2008
@@ -218,9 +218,9 @@
         }
         else
         {
-            if (_log.isTraceEnabled())
+            if (_log.isDebugEnabled())
             {
-                _log.trace(debugIdentity() + "Content header received on channel " + _channelId);
+                _log.debug(debugIdentity() + "Content header received on channel " + _channelId);
             }
 
             if (ENABLE_JMSXUserID)
@@ -255,9 +255,9 @@
             throw new AMQException(null, "Received content body without previously receiving a JmsPublishBody", null);
         }
 
-        if (_log.isTraceEnabled())
+        if (_log.isDebugEnabled())
         {
-            _log.trace(debugIdentity() + "Content body received on channel " + _channelId);
+            _log.debug(debugIdentity() + "Content body received on channel " + _channelId);
         }
 
         try

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Feb 12 09:25:44 2008
@@ -79,9 +79,9 @@
 
             if (queue == null)
             {
-                if (_log.isTraceEnabled())
+                if (_log.isDebugEnabled())
                 {
-                    _log.trace("No queue for '" + body.getQueue() + "'");
+                    _log.debug("No queue for '" + body.getQueue() + "'");
                 }
                 if (body.getQueue() != null)
                 {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Tue Feb 12 09:25:44 2008
@@ -99,9 +99,9 @@
             }
 
 
-            if (_logger.isTraceEnabled())
+            if (_logger.isDebugEnabled())
             {
-                _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+                _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
                               ": Requeue:" + body.getRequeue() +
                               //": Resend:" + evt.getMethod().resend +
                               " on channel:" + channel.debugIdentity());

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Feb 12 09:25:44 2008
@@ -302,9 +302,9 @@
 
     public void populatePreDeliveryQueue(Subscription subscription)
     {
-        if (_log.isTraceEnabled())
+        if (_log.isDebugEnabled())
         {
-            _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
+            _log.debug("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
         }
 
         Iterator<QueueEntry> currentQueue = _messages.iterator();
@@ -532,9 +532,9 @@
             //else the clean up is not required as the message has already been taken for this queue therefore
             // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated.
 
-            if (_log.isTraceEnabled())
+            if (_log.isDebugEnabled())
             {
-                _log.trace("Removed taken message:" + message.debugIdentity());
+                _log.debug("Removed taken message:" + message.debugIdentity());
             }
 
             // try the next message
@@ -627,9 +627,9 @@
 
         Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages);
 
-        if (_log.isTraceEnabled())
+        if (_log.isDebugEnabled())
         {
-            _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+            _log.debug(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) +
                        ") from queue (" + System.identityHashCode(messageQueue) +
                        ") AMQQueue (" + System.identityHashCode(queue) + ")");
         }
@@ -655,9 +655,9 @@
                 // message will be null if we have no messages in the messageQueue.
                 if (entry == null)
                 {
-                    if (_log.isTraceEnabled())
+                    if (_log.isDebugEnabled())
                     {
-                        _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+                        _log.debug(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
                     }
                     return;
                 }
@@ -696,9 +696,9 @@
 
             if (messageQueue == sub.getResendQueue())
             {
-                if (_log.isTraceEnabled())
+                if (_log.isDebugEnabled())
                 {
-                    _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub);
+                    _log.debug(debugIdentity() + "All messages sent from resendQueue for " + sub);
                 }
                 if (messageQueue.isEmpty())
                 {
@@ -884,9 +884,9 @@
                 {
                     if (!s.isSuspended())
                     {
-                        if (_log.isTraceEnabled())
+                        if (_log.isDebugEnabled())
                         {
-                            _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
+                            _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
                                        System.identityHashCode(s) + ") :" + s);
                         }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Feb 12 09:25:44 2008
@@ -416,9 +416,9 @@
         }
 
 
-        if (_logger.isTraceEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
+            _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
         }
         return checkFilters(entry);
 
@@ -563,9 +563,9 @@
             {
                 QueueEntry resent = _resendQueue.poll();
 
-                if (_logger.isTraceEnabled())
+                if (_logger.isDebugEnabled())
                 {
-                    _logger.trace("Removed for resending:" + resent.debugIdentity());
+                    _logger.debug("Removed for resending:" + resent.debugIdentity());
                 }
 
                 resent.release();

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 12 09:25:44 2008
@@ -1369,9 +1369,9 @@
     public void rejectMessage(UnprocessedMessage message, boolean requeue)
     {
 
-        if (_logger.isTraceEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.trace("Rejecting Unacked message:" + message.getDeliveryTag());
+            _logger.debug("Rejecting Unacked message:" + message.getDeliveryTag());
         }
 
         rejectMessage(message.getDeliveryTag(), requeue);
@@ -1379,9 +1379,9 @@
 
     public void rejectMessage(AbstractJMSMessage message, boolean requeue)
     {
-        if (_logger.isTraceEnabled())
+        if (_logger.isDebugEnabled())
         {
-            _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag());
+            _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
         }
 
         rejectMessage(message.getDeliveryTag(), requeue);

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Feb 12 09:25:44 2008
@@ -567,12 +567,12 @@
         {
             if (!_closed.getAndSet(true))
             {
-                if (_logger.isTraceEnabled())
+                if (_logger.isDebugEnabled())
                 {
                     StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                     if (_closedStack != null)
                     {
-                        _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+                        _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
                     }
                     else
                     {
@@ -639,13 +639,14 @@
         {
             _closed.set(true);
 
-            if (_logger.isTraceEnabled())
+            if (_logger.isDebugEnabled())
             {
                 if (_closedStack != null)
                 {
-                    _logger.trace(_consumerTag + " markClosed():" + Arrays
-                            .asList(Thread.currentThread().getStackTrace()).subList(3, 8));
-                    _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+                    StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+                    _logger.debug(_consumerTag + " markClosed():"
+                                  + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+                    _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
                 }
                 else
                 {
@@ -881,14 +882,14 @@
         // synchronized (_closed)
         {
             _closed.set(true);
-            if (_logger.isTraceEnabled())
+            if (_logger.isDebugEnabled())
             {
                 StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                 if (_closedStack != null)
                 {
-                    _logger.trace(_consumerTag + " notifyError():"
+                    _logger.debug(_consumerTag + " notifyError():"
                                   + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
-                    _logger.trace(_consumerTag + " previously" + _closedStack.toString());
+                    _logger.debug(_consumerTag + " previously" + _closedStack.toString());
                 }
                 else
                 {
@@ -1035,9 +1036,9 @@
                 {
                     _session.rejectMessage(((AbstractJMSMessage) o), true);
 
-                    if (_logger.isTraceEnabled())
+                    if (_logger.isDebugEnabled())
                     {
-                        _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
+                        _logger.debug("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
                     }
 
                     iterator.remove();

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Tue Feb 12 09:25:44 2008
@@ -171,7 +171,7 @@
             m.setJMSReplyTo(q);
             m.setStringProperty("TempQueue", q.toString());
 
-            _logger.trace("Message:" + m);
+            _logger.debug("Message:" + m);
 
             Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(),
                 m.getStringProperty("TempQueue"));

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Tue Feb 12 09:25:44 2008
@@ -706,12 +706,15 @@
 
     public void writeToBuffer(ByteBuffer buffer)
     {
-        final boolean trace = _logger.isTraceEnabled();
+        final boolean trace = _logger.isDebugEnabled();
 
         if (trace)
         {
-            _logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "...");
-            _logger.trace(_properties.toString());
+            _logger.debug("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "...");
+            if (_properties != null)
+            {
+                _logger.debug(_properties.toString());
+            }
         }
 
         EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize());
@@ -915,11 +918,11 @@
                 final Map.Entry<AMQShortString, AMQTypedValue> me = it.next();
                 try
                 {
-                    if (_logger.isTraceEnabled())
+                    if (_logger.isDebugEnabled())
                     {
-                        _logger.trace("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
+                        _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
                             + me.getValue().getValue());
-                        _logger.trace("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
+                        _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
                     }
 
                     // Write the actual parameter name
@@ -928,12 +931,12 @@
                 }
                 catch (Exception e)
                 {
-                    if (_logger.isTraceEnabled())
+                    if (_logger.isDebugEnabled())
                     {
-                        _logger.trace("Exception thrown:" + e);
-                        _logger.trace("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
+                        _logger.debug("Exception thrown:" + e);
+                        _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
                             + me.getValue().getValue());
-                        _logger.trace("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
+                        _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
                     }
 
                     throw new RuntimeException(e);
@@ -945,7 +948,7 @@
     private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException
     {
 
-        final boolean trace = _logger.isTraceEnabled();
+        final boolean trace = _logger.isDebugEnabled();
         if (length > 0)
         {
 
@@ -961,7 +964,7 @@
 
                 if (trace)
                 {
-                    _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType()
+                    _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType()
                         + "', key '" + key + "', value '" + value.getValue() + "'");
                 }
 
@@ -976,7 +979,7 @@
 
         if (trace)
         {
-            _logger.trace("FieldTable::FieldTable(buffer," + length + "): Done.");
+            _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done.");
         }
     }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java Tue Feb 12 09:25:44 2008
@@ -335,9 +335,9 @@
 
         public void onMessage(Message message)
         {
-            if (log.isTraceEnabled())
+            if (log.isDebugEnabled())
             {
-                log.trace("Message " + _received + "received in listener");
+                log.debug("Message " + _received + "received in listener");
             }
 
             if (message instanceof TextMessage)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml Tue Feb 12 09:25:44 2008
@@ -56,13 +56,11 @@
         <dependency>
             <groupId>uk.co.thebadgerset</groupId>
             <artifactId>junit-toolkit</artifactId>
-            <version>0.6-SNAPSHOT</version>
             <scope>runtime</scope>
             </dependency>
         <dependency>
             <groupId>uk.co.thebadgerset</groupId>
             <artifactId>junit-toolkit-maven-plugin</artifactId>
-            <version>0.6-SNAPSHOT</version>
             <scope>runtime</scope>
         </dependency>
     </dependencies>

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java Tue Feb 12 09:25:44 2008
@@ -20,12 +20,16 @@
  */
 package org.apache.qpid.client.message;
 
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+
 import javax.jms.JMSException;
 import javax.jms.Session;
 import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.BytesMessage;
 import javax.jms.TextMessage;
+import javax.jms.Queue;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 
@@ -36,6 +40,15 @@
     public static TextMessage newTextMessage(Session session, int size) throws JMSException
     {
         return session.createTextMessage(createMessagePayload(size));
+    }
+
+    public static JMSTextMessage newJMSTextMessage(int size, String encoding) throws JMSException
+    {
+        ByteBuffer byteBuffer = (new SimpleByteBufferAllocator()).allocate(size, true);
+        JMSTextMessage message = new JMSTextMessage(byteBuffer, encoding);
+        message.clearBody();
+        message.setText(createMessagePayload(size));
+        return message;
     }
 
     public static BytesMessage newBytesMessage(Session session, int size) throws JMSException

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java Tue Feb 12 09:25:44 2008
@@ -116,6 +116,7 @@
         defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
         defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
         defaults.setProperty(RATE_PROPNAME, "20");
+        defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
         defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT);
     }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java Tue Feb 12 09:25:44 2008
@@ -26,6 +26,7 @@
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.ObjectMessage;
 
 import org.apache.log4j.Logger;
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java Tue Feb 12 09:25:44 2008
@@ -26,23 +26,30 @@
 import java.util.Date;
 
 import javax.jms.*;
-import javax.naming.Context;
 
 import org.apache.log4j.Logger;
 
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.topic.Config;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
 /**
  * PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
  * ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
  * too.
- * <p/>
+ *
  * <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages
  * are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique
  * temporary queue or the correlation id to correlate the original message to the reply.
- * <p/>
+ *
  * <p/>There is a verbose mode flag which causes information about each ping to be output to the console
  * (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
  * be disabled for real timing tests as writing to the console will slow things down.
- * <p/>
+ *
  * <p><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Bounce back messages to their reply to destination.
@@ -50,74 +57,51 @@
  * </table>
  *
  * @todo Replace the command line parsing with a neater tool.
+ *
  * @todo Make verbose accept a number of messages, only prints to console every X messages.
  */
 public class PingPongBouncer implements MessageListener
 {
     private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);
 
-    /**
-     * The default prefetch size for the message consumer.
-     */
+    /** The default prefetch size for the message consumer. */
     private static final int PREFETCH = 1;
 
-    /**
-     * The default no local flag for the message consumer.
-     */
+    /** The default no local flag for the message consumer. */
     private static final boolean NO_LOCAL = true;
 
     private static final String DEFAULT_DESTINATION_NAME = "ping";
 
-    /**
-     * The default exclusive flag for the message consumer.
-     */
+    /** The default exclusive flag for the message consumer. */
     private static final boolean EXCLUSIVE = false;
 
-    /**
-     * A convenient formatter to use when time stamping output.
-     */
+    /** A convenient formatter to use when time stamping output. */
     protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
 
-    /**
-     * Used to indicate that the reply generator should log timing info to the console (logger info level).
-     */
+    /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
     private boolean _verbose = false;
 
-    /**
-     * Determines whether this bounce back client bounces back messages persistently.
-     */
+    /** Determines whether this bounce back client bounces back messages persistently. */
     private boolean _persistent = false;
 
     private Destination _consumerDestination;
 
-    /**
-     * Keeps track of the response destination of the previous message for the last reply to producer cache.
-     */
+    /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
     private Destination _lastResponseDest;
 
-    /**
-     * The producer for sending replies with.
-     */
+    /** The producer for sending replies with. */
     private MessageProducer _replyProducer;
 
-    /**
-     * The consumer controlSession.
-     */
+    /** The consumer controlSession. */
     private Session _consumerSession;
 
-    /**
-     * The producer controlSession.
-     */
+    /** The producer controlSession. */
     private Session _producerSession;
 
-    /**
-     * Holds the connection to the broker.
-     */
-    private Connection _connection;
+    /** Holds the connection to the broker. */
+    private AMQConnection _connection;
 
-    /**
-     * Flag used to indicate if this is a point to point or pub/sub ping client.
-     */
+    /** Flag used to indicate if this is a point to point or pub/sub ping client. */
     private boolean _isPubSub = false;
 
     /**
@@ -135,21 +119,22 @@
     /**
      * Creates a PingPongBouncer on the specified producer and consumer sessions.
      *
-     * @param fileProperties  The path to the file properties
-     * @param factoryName     The factory name
+     * @param brokerDetails The addresses of the brokers to connect to.
      * @param username        The broker username.
      * @param password        The broker password.
+     * @param virtualpath     The virtual host name within the broker.
      * @param destinationName The name of the queue to receive pings on
      *                        (or root of the queue name where many queues are generated).
      * @param persistent      A flag to indicate that persistent message should be used.
      * @param transacted      A flag to indicate that pings should be sent within transactions.
      * @param selector        A message selector to filter received pings with.
      * @param verbose         A flag to indicate that message timings should be sent to the console.
+     *
      * @throws Exception All underlying exceptions allowed to fall through. This is only test code...
      */
-    public PingPongBouncer(String fileProperties, String factoryName, String username, String password,
-                           String destinationName, boolean persistent, boolean transacted,
-                           String selector, boolean verbose, boolean pubsub) throws Exception
+    public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
+                           String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose,
+                           boolean pubsub) throws Exception
     {
         // Create a client id to uniquely identify this client.
         InetAddress address = InetAddress.getLocalHost();
@@ -158,9 +143,11 @@
         _persistent = persistent;
         setPubSub(pubsub);
         // Connect to the broker.
-        Context context = InitialContextHelper.getInitialContext(fileProperties);
-        ConnectionFactory factory = (ConnectionFactory) context.lookup(factoryName);
-        setConnection(factory.createConnection(username, password));
+        setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
+        _logger.info("Connected with URL:" + getConnection().toURL());
+
+        // Set up the failover notifier.
+        getConnection().setConnectionListener(new FailoverNotifier());
 
         // Create a controlSession to listen for messages on and one to send replies on, transactional depending on the
         // command line option.
@@ -169,7 +156,8 @@
 
         // Create the queue to listen for message on.
         createConsumerDestination(destinationName);
-        MessageConsumer consumer = _consumerSession.createConsumer(_consumerDestination, selector, NO_LOCAL);
+        MessageConsumer consumer =
+            _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
 
         // Create a producer for the replies, without a default destination.
         _replyProducer = _producerSession.createProducer(null);
@@ -180,10 +168,57 @@
         consumer.setMessageListener(this);
     }
 
+    /**
+     * Starts a stand alone ping-pong client running in verbose mode.
+     *
+     * @param args
+     */
+    public static void main(String[] args) throws Exception
+    {
+        System.out.println("Starting...");
+
+        // Display help on the command line.
+        if (args.length == 0)
+        {
+            _logger.info("Running test with default values...");
+            //usage();
+            //System.exit(0);
+        }
+
+        // Extract all command line parameters.
+        Config config = new Config();
+        config.setOptions(args);
+        String brokerDetails = config.getHost() + ":" + config.getPort();
+        String virtualpath = "test";
+        String destinationName = config.getDestination();
+        if (destinationName == null)
+        {
+            destinationName = DEFAULT_DESTINATION_NAME;
+        }
+
+        String selector = config.getSelector();
+        boolean transacted = config.isTransacted();
+        boolean persistent = config.usePersistentMessages();
+        boolean pubsub = config.isPubSub();
+        boolean verbose = true;
+
+        //String selector = null;
+
+        // Instantiate the ping pong client with the command line options and start it running.
+        PingPongBouncer pingBouncer =
+            new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted,
+                                selector, verbose, pubsub);
+        pingBouncer.getConnection().start();
+
+        System.out.println("Waiting...");
+    }
+
     private static void usage()
     {
-        System.err.println(
-                "Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" + "-persistent : (true/false). Default is false\n" + "-pubsub     : (true/false). Default is false\n" + "-selector   : selector string\n");
+        System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n"
+                           + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n"
+                           + "-persistent : (true/false). Default is false\n"
+                           + "-pubsub     : (true/false). Default is false\n" + "-selector   : selector string\n");
     }
 
     /**
@@ -200,8 +235,8 @@
             String messageCorrelationId = message.getJMSCorrelationID();
             if (_verbose)
             {
-                _logger.info(timestampFormatter
-                        .format(new Date()) + ": Got ping with correlation id, " + messageCorrelationId);
+                _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "
+                             + messageCorrelationId);
             }
 
             // Get the reply to destination from the message and check it is set.
@@ -234,8 +269,8 @@
 
             if (_verbose)
             {
-                _logger.info(timestampFormatter
-                        .format(new Date()) + ": Sent reply with correlation id, " + messageCorrelationId);
+                _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, "
+                             + messageCorrelationId);
             }
 
             // Commit the transaction if running in transactional mode.
@@ -252,7 +287,7 @@
      *
      * @return The underlying connection that this ping client is running on.
      */
-    public Connection getConnection()
+    public AMQConnection getConnection()
     {
         return _connection;
     }
@@ -262,7 +297,7 @@
      *
      * @param connection The ping connection.
      */
-    public void setConnection(Connection connection)
+    public void setConnection(AMQConnection connection)
     {
         this._connection = connection;
     }
@@ -290,7 +325,7 @@
     /**
      * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not
      * a transactional controlSession, this method does nothing.
-     * <p/>
+     *
      * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
      * commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
      * after the commit is applied.
@@ -305,7 +340,7 @@
             {
                 if (_failBeforeCommit)
                 {
-                    _logger.trace("Failing Before Commit");
+                    _logger.debug("Failing Before Commit");
                     doFailover();
                 }
 
@@ -313,11 +348,11 @@
 
                 if (_failAfterCommit)
                 {
-                    _logger.trace("Failing After Commit");
+                    _logger.debug("Failing After Commit");
                     doFailover();
                 }
 
-                _logger.trace("Session Commited.");
+                _logger.debug("Session Commited.");
             }
             catch (JMSException e)
             {
@@ -353,8 +388,7 @@
             System.in.read();
         }
         catch (IOException e)
-        {
-        }
+        { }
 
         System.out.println("Continuing.");
     }
@@ -371,22 +405,49 @@
             System.in.read();
         }
         catch (IOException e)
-        {
-        }
+        { }
 
         System.out.println("Continuing.");
 
     }
 
-    private void createConsumerDestination(String name) throws JMSException
+    private void createConsumerDestination(String name)
     {
         if (isPubSub())
         {
-            _consumerDestination = _consumerSession.createTopic(name);
+            _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name);
         }
         else
         {
-            _consumerDestination = _consumerSession.createQueue(name);
+            _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name);
+        }
+    }
+
+    /**
+     * A connection listener that logs out any failover complete events. Could do more interesting things with this
+     * at some point...
+     */
+    public static class FailoverNotifier implements ConnectionListener
+    {
+        public void bytesSent(long count)
+        { }
+
+        public void bytesReceived(long count)
+        { }
+
+        public boolean preFailover(boolean redirect)
+        {
+            return true;
+        }
+
+        public boolean preResubscribe()
+        {
+            return true;
+        }
+
+        public void failoverComplete()
+        {
+            _logger.info("App got failover complete callback.");
         }
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Tue Feb 12 09:25:44 2008
@@ -669,9 +669,12 @@
         // _log.debug("protected void createConnection(String clientID = " + clientID + "): called");
 
         // _log.debug("Creating a connection for the message producer.");
-      
+        File propsFile = new File(_fileProperties);
+        InputStream is = new FileInputStream(propsFile);
+        Properties properties = new Properties();
+        properties.load(is);
 
-        Context context = InitialContextHelper.getInitialContext(_fileProperties);
+        Context context = new InitialContext(properties);
         ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName);
         _connection = factory.createConnection(_username, _password);
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java Tue Feb 12 09:25:44 2008
@@ -172,10 +172,10 @@
             PerThreadSetup perThreadSetup = new PerThreadSetup();
 
             // Extract the test set up paramaeters.
-            String fileProperties = testParameters.getProperty(PingPongProducer.FILE_PROPERTIES_PROPNAME);
-            String factoryName = testParameters.getProperty(PingPongProducer.FACTORY_NAME_PROPNAME);
+            String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
             String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
             String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+            String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
             String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
             boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
             boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
@@ -187,7 +187,7 @@
             {
                 // Establish a bounce back client on the ping queue to bounce back the pings.
                 perThreadSetup._testPingBouncer =
-                    new PingPongBouncer(fileProperties, factoryName, username, password, destinationName, persistent,
+                    new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent,
                         transacted, selector, verbose, pubsub);
 
                 // Start the connections for client and producer running.