You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/12 18:25:49 UTC
svn commit: r620871 - in /incubator/qpid/branches/thegreatmerge/qpid: ./
dotnet/Qpid.Client/Client/ dotnet/Qpid.Integration.Tests/testcases/
java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/handler/ jav...
Author: aidan
Date: Tue Feb 12 09:25:44 2008
New Revision: 620871
URL: http://svn.apache.org/viewvc?rev=620871&view=rev
Log:
Additonal fixes to perftests as well were necesary, plus these merges:
Merged revisions 619868,620495-620496 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1
........
r619868 | aidan | 2008-02-08 12:52:54 +0000 (Fri, 08 Feb 2008) | 4 lines
QPID-588: change instances of trace() and isTraceEnabled to debug equivalent to support older versions of log4j
........
r620495 | rupertlssmith | 2008-02-11 14:48:35 +0000 (Mon, 11 Feb 2008) | 1 line
QPID-730 : Changed durable subscription test, to ensure re-connection happens under the same client name.
........
r620496 | rupertlssmith | 2008-02-11 14:50:18 +0000 (Mon, 11 Feb 2008) | 1 line
QPID-729 : Added explicit list of unacked messages, acked on commit, rejected on roll-back.
........
Modified:
incubator/qpid/branches/thegreatmerge/qpid/ (props changed)
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
Propchange: incubator/qpid/branches/thegreatmerge/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Tue Feb 12 09:25:44 2008
@@ -457,7 +457,7 @@
foreach (BasicMessageConsumer consumer in _consumers.Values)
{
// Sends acknowledgement to server.
- consumer.AcknowledgeLastDelivered();
+ consumer.AcknowledgeDelivered();
}
// Commits outstanding messages sent and outstanding acknowledgements.
@@ -485,13 +485,16 @@
{
Suspend(true);
}
-
- // todo: rollback dispatcher when TX support is added
- //if ( _dispatcher != null )
- // _dispatcher.Rollback();
- _connection.ConvenientProtocolWriter.SyncWrite(
- TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
+ // Reject up to message last delivered (if any) for each consumer.
+ // Need to send reject for messages delivered to consumers so far.
+ foreach (BasicMessageConsumer consumer in _consumers.Values)
+ {
+ // Sends acknowledgement to server.
+ consumer.RejectUnacked();
+ }
+
+ _connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody));
if ( !suspended )
{
@@ -1012,6 +1015,15 @@
_connection.ProtocolWriter.Write(ackFrame);
}
+ public void RejectMessage(ulong deliveryTag, bool requeue)
+ {
+ if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted))
+ {
+ AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue);
+ _connection.ProtocolWriter.Write(rejectFrame);
+ }
+ }
+
/// <summary>
/// Handle a message that bounced from the server, creating
/// the corresponding exception and notifying the connection about it
@@ -1104,8 +1116,8 @@
/// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on
/// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks>
///
- /// <remarks>Exception swalled, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should
- /// fall through and termiante the loop, as it is a bug if it occurrs.</remarks>
+ /// <remarks>Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should
+ /// fall through and terminate the loop, as it is a bug if it occurrs.</remarks>
private class Dispatcher
{
/// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary>
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs Tue Feb 12 09:25:44 2008
@@ -20,6 +20,8 @@
*/
using System;
using System.Threading;
+using System.Collections;
+using System.Collections.Generic;
using log4net;
using Apache.Qpid.Client.Message;
using Apache.Qpid.Collections;
@@ -106,10 +108,15 @@
private AmqChannel _channel;
+ // <summary>
+ // Tag of last message delievered, whoch should be acknowledged on commit in transaction mode.
+ // </summary>
+ //private long _lastDeliveryTag;
+
/// <summary>
- /// Tag of last message delievered, whoch should be acknowledged on commit in transaction mode.
+ /// Explicit list of all received but un-acked messages in a transaction. Used to ensure acking is completed when transaction is committed.
/// </summary>
- private long _lastDeliveryTag;
+ private LinkedList<long> _receivedDeliveryTags;
/// <summary>
/// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
@@ -135,6 +142,11 @@
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
+
+ if (_acknowledgeMode == AcknowledgeMode.SessionTransacted)
+ {
+ _receivedDeliveryTags = new LinkedList<long>();
+ }
}
#region IMessageConsumer Members
@@ -391,13 +403,24 @@
/// <summary>
/// Acknowledge up to last message delivered (if any). Used when commiting.
/// </summary>
- internal void AcknowledgeLastDelivered()
+ internal void AcknowledgeDelivered()
{
- if (_lastDeliveryTag > 0)
+ foreach (long tag in _receivedDeliveryTags)
{
- _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // XXX evil cast
- _lastDeliveryTag = -1;
+ _channel.AcknowledgeMessage((ulong)tag, false);
}
+
+ _receivedDeliveryTags.Clear();
+ }
+
+ internal void RejectUnacked()
+ {
+ foreach (long tag in _receivedDeliveryTags)
+ {
+ _channel.RejectMessage((ulong)tag, true);
+ }
+
+ _receivedDeliveryTags.Clear();
}
private void PreDeliver(AbstractQmsMessage msg)
@@ -442,7 +465,7 @@
break;
case AcknowledgeMode.SessionTransacted:
- _lastDeliveryTag = msg.DeliveryTag;
+ _receivedDeliveryTags.AddLast(msg.DeliveryTag);
break;
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/dotnet/Qpid.Integration.Tests/testcases/DurableSubscriptionTest.cs Tue Feb 12 09:25:44 2008
@@ -78,6 +78,8 @@
SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
true, "TestSubscription" + testId);
+ Thread.Sleep(500);
+
// Send messages and receive on both consumers.
testProducer[0].Send(testChannel[0].CreateTextMessage("A"));
@@ -93,15 +95,15 @@
ConsumeNMessagesOnly(1, "B", testConsumer[1]);
// Re-attach consumer, check that it gets the messages that it missed.
- SetUpEndPoint(3, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
+ SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, ackMode, false, ExchangeNameDefaults.TOPIC, true,
true, "TestSubscription" + testId);
- ConsumeNMessagesOnly(1, "B", testConsumer[3]);
+ ConsumeNMessagesOnly(1, "B", testConsumer[2]);
// Clean up any open consumers at the end of the test.
- CloseEndPoint(0);
+ CloseEndPoint(2);
CloseEndPoint(1);
- CloseEndPoint(3);
+ CloseEndPoint(0);
}
/// <summary> Check that an uncommitted receive can be re-received, on re-consume from the same durable subscription. </summary>
@@ -122,13 +124,13 @@
// Close end-point 1 without committing the message, then re-open the subscription to consume again.
CloseEndPoint(1);
- SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
true, false, null);
// Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
- ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+ ConsumeNMessagesOnly(1, "A", testConsumer[1]);
- CloseEndPoint(2);
+ CloseEndPoint(1);
CloseEndPoint(0);
}
@@ -151,13 +153,13 @@
// Close end-point 1 without committing the message, then re-open the subscription to consume again.
CloseEndPoint(1);
- SetUpEndPoint(2, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
+ SetUpEndPoint(1, false, true, TEST_ROUTING_KEY + testId, AcknowledgeMode.AutoAcknowledge, true, ExchangeNameDefaults.DIRECT,
true, false, null);
// Check that the message was released from the rolled back end-point an can be received on the alternative one instead.
- ConsumeNMessagesOnly(1, "A", testConsumer[2]);
+ ConsumeNMessagesOnly(1, "A", testConsumer[1]);
- CloseEndPoint(2);
+ CloseEndPoint(1);
CloseEndPoint(0);
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Feb 12 09:25:44 2008
@@ -218,9 +218,9 @@
}
else
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(debugIdentity() + "Content header received on channel " + _channelId);
+ _log.debug(debugIdentity() + "Content header received on channel " + _channelId);
}
if (ENABLE_JMSXUserID)
@@ -255,9 +255,9 @@
throw new AMQException(null, "Received content body without previously receiving a JmsPublishBody", null);
}
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(debugIdentity() + "Content body received on channel " + _channelId);
+ _log.debug(debugIdentity() + "Content body received on channel " + _channelId);
}
try
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Feb 12 09:25:44 2008
@@ -79,9 +79,9 @@
if (queue == null)
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace("No queue for '" + body.getQueue() + "'");
+ _log.debug("No queue for '" + body.getQueue() + "'");
}
if (body.getQueue() != null)
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Tue Feb 12 09:25:44 2008
@@ -99,9 +99,9 @@
}
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Feb 12 09:25:44 2008
@@ -302,9 +302,9 @@
public void populatePreDeliveryQueue(Subscription subscription)
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
+ _log.debug("Populating PreDeliveryQueue for Subscription(" + System.identityHashCode(subscription) + ")");
}
Iterator<QueueEntry> currentQueue = _messages.iterator();
@@ -532,9 +532,9 @@
//else the clean up is not required as the message has already been taken for this queue therefore
// it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated.
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace("Removed taken message:" + message.debugIdentity());
+ _log.debug("Removed taken message:" + message.debugIdentity());
}
// try the next message
@@ -627,9 +627,9 @@
Queue<QueueEntry> messageQueue = sub.getNextQueue(_messages);
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) +
+ _log.debug(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) +
") from queue (" + System.identityHashCode(messageQueue) +
") AMQQueue (" + System.identityHashCode(queue) + ")");
}
@@ -655,9 +655,9 @@
// message will be null if we have no messages in the messageQueue.
if (entry == null)
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
+ _log.debug(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")");
}
return;
}
@@ -696,9 +696,9 @@
if (messageQueue == sub.getResendQueue())
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub);
+ _log.debug(debugIdentity() + "All messages sent from resendQueue for " + sub);
}
if (messageQueue.isEmpty())
{
@@ -884,9 +884,9 @@
{
if (!s.isSuspended())
{
- if (_log.isTraceEnabled())
+ if (_log.isDebugEnabled())
{
- _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
+ _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Feb 12 09:25:44 2008
@@ -416,9 +416,9 @@
}
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
+ _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity());
}
return checkFilters(entry);
@@ -563,9 +563,9 @@
{
QueueEntry resent = _resendQueue.poll();
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Removed for resending:" + resent.debugIdentity());
+ _logger.debug("Removed for resending:" + resent.debugIdentity());
}
resent.release();
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 12 09:25:44 2008
@@ -1369,9 +1369,9 @@
public void rejectMessage(UnprocessedMessage message, boolean requeue)
{
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejecting Unacked message:" + message.getDeliveryTag());
+ _logger.debug("Rejecting Unacked message:" + message.getDeliveryTag());
}
rejectMessage(message.getDeliveryTag(), requeue);
@@ -1379,9 +1379,9 @@
public void rejectMessage(AbstractJMSMessage message, boolean requeue)
{
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejecting Abstract message:" + message.getDeliveryTag());
+ _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
}
rejectMessage(message.getDeliveryTag(), requeue);
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Feb 12 09:25:44 2008
@@ -567,12 +567,12 @@
{
if (!_closed.getAndSet(true))
{
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+ _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
}
else
{
@@ -639,13 +639,14 @@
{
_closed.set(true);
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " markClosed():" + Arrays
- .asList(Thread.currentThread().getStackTrace()).subList(3, 8));
- _logger.trace(_consumerTag + " previously:" + _closedStack.toString());
+ StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
+ _logger.debug(_consumerTag + " markClosed():"
+ + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+ _logger.debug(_consumerTag + " previously:" + _closedStack.toString());
}
else
{
@@ -881,14 +882,14 @@
// synchronized (_closed)
{
_closed.set(true);
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " notifyError():"
+ _logger.debug(_consumerTag + " notifyError():"
+ Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
- _logger.trace(_consumerTag + " previously" + _closedStack.toString());
+ _logger.debug(_consumerTag + " previously" + _closedStack.toString());
}
else
{
@@ -1035,9 +1036,9 @@
{
_session.rejectMessage(((AbstractJMSMessage) o), true);
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
+ _logger.debug("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
}
iterator.remove();
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/PropertyValueTest.java Tue Feb 12 09:25:44 2008
@@ -171,7 +171,7 @@
m.setJMSReplyTo(q);
m.setStringProperty("TempQueue", q.toString());
- _logger.trace("Message:" + m);
+ _logger.debug("Message:" + m);
Assert.assertEquals("Check temp queue has been set correctly", m.getJMSReplyTo().toString(),
m.getStringProperty("TempQueue"));
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Tue Feb 12 09:25:44 2008
@@ -706,12 +706,15 @@
public void writeToBuffer(ByteBuffer buffer)
{
- final boolean trace = _logger.isTraceEnabled();
+ final boolean trace = _logger.isDebugEnabled();
if (trace)
{
- _logger.trace("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "...");
- _logger.trace(_properties.toString());
+ _logger.debug("FieldTable::writeToBuffer: Writing encoded length of " + getEncodedSize() + "...");
+ if (_properties != null)
+ {
+ _logger.debug(_properties.toString());
+ }
}
EncodingUtils.writeUnsignedInteger(buffer, getEncodedSize());
@@ -915,11 +918,11 @@
final Map.Entry<AMQShortString, AMQTypedValue> me = it.next();
try
{
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
+ _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
+ me.getValue().getValue());
- _logger.trace("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
+ _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
}
// Write the actual parameter name
@@ -928,12 +931,12 @@
}
catch (Exception e)
{
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Exception thrown:" + e);
- _logger.trace("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
+ _logger.debug("Exception thrown:" + e);
+ _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
+ me.getValue().getValue());
- _logger.trace("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
+ _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
}
throw new RuntimeException(e);
@@ -945,7 +948,7 @@
private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException
{
- final boolean trace = _logger.isTraceEnabled();
+ final boolean trace = _logger.isDebugEnabled();
if (length > 0)
{
@@ -961,7 +964,7 @@
if (trace)
{
- _logger.trace("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType()
+ _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType()
+ "', key '" + key + "', value '" + value.getValue() + "'");
}
@@ -976,7 +979,7 @@
if (trace)
{
- _logger.trace("FieldTable::FieldTable(buffer," + length + "): Done.");
+ _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done.");
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/integrationtests/src/main/java/org/apache/qpid/sustained/SustainedClientTestCase.java Tue Feb 12 09:25:44 2008
@@ -335,9 +335,9 @@
public void onMessage(Message message)
{
- if (log.isTraceEnabled())
+ if (log.isDebugEnabled())
{
- log.trace("Message " + _received + "received in listener");
+ log.debug("Message " + _received + "received in listener");
}
if (message instanceof TextMessage)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/distribution/pom.xml Tue Feb 12 09:25:44 2008
@@ -56,13 +56,11 @@
<dependency>
<groupId>uk.co.thebadgerset</groupId>
<artifactId>junit-toolkit</artifactId>
- <version>0.6-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>uk.co.thebadgerset</groupId>
<artifactId>junit-toolkit-maven-plugin</artifactId>
- <version>0.6-SNAPSHOT</version>
<scope>runtime</scope>
</dependency>
</dependencies>
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/client/message/TestMessageFactory.java Tue Feb 12 09:25:44 2008
@@ -20,12 +20,16 @@
*/
package org.apache.qpid.client.message;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.BytesMessage;
import javax.jms.TextMessage;
+import javax.jms.Queue;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -36,6 +40,15 @@
public static TextMessage newTextMessage(Session session, int size) throws JMSException
{
return session.createTextMessage(createMessagePayload(size));
+ }
+
+ public static JMSTextMessage newJMSTextMessage(int size, String encoding) throws JMSException
+ {
+ ByteBuffer byteBuffer = (new SimpleByteBufferAllocator()).allocate(size, true);
+ JMSTextMessage message = new JMSTextMessage(byteBuffer, encoding);
+ message.clearBody();
+ message.setText(createMessagePayload(size));
+ return message;
}
public static BytesMessage newBytesMessage(Session session, int size) throws JMSException
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingDurableClient.java Tue Feb 12 09:25:44 2008
@@ -116,6 +116,7 @@
defaults.setProperty(PERSISTENT_MODE_PROPNAME, "true");
defaults.setProperty(TX_BATCH_SIZE_PROPNAME, "10");
defaults.setProperty(RATE_PROPNAME, "20");
+ defaults.setProperty(DURABLE_DESTS_PROPNAME, "true");
defaults.setProperty(NUM_MESSAGES_TO_ACTION_PROPNAME, NUM_MESSAGES_TO_ACTION_DEFAULT);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/ping/PingSendOnlyClient.java Tue Feb 12 09:25:44 2008
@@ -26,6 +26,7 @@
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
+import javax.jms.ObjectMessage;
import org.apache.log4j.Logger;
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongBouncer.java Tue Feb 12 09:25:44 2008
@@ -26,23 +26,30 @@
import java.util.Date;
import javax.jms.*;
-import javax.naming.Context;
import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQQueue;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.Session;
+import org.apache.qpid.topic.Config;
+import org.apache.qpid.exchange.ExchangeDefaults;
+
/**
* PingPongBouncer is a message listener the bounces back messages to their reply to destination. This is used to return
* ping messages generated by {@link org.apache.qpid.requestreply.PingPongProducer} but could be used for other purposes
* too.
- * <p/>
+ *
* <p/>The correlation id from the received message is extracted, and placed into the reply as the correlation id. Messages
* are bounced back to their reply-to destination. The original sender of the message has the option to use either a unique
* temporary queue or the correlation id to correlate the original message to the reply.
- * <p/>
+ *
* <p/>There is a verbose mode flag which causes information about each ping to be output to the console
* (info level logging, so usually console). This can be helpfull to check the bounce backs are happening but should
* be disabled for real timing tests as writing to the console will slow things down.
- * <p/>
+ *
* <p><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Bounce back messages to their reply to destination.
@@ -50,74 +57,51 @@
* </table>
*
* @todo Replace the command line parsing with a neater tool.
+ *
* @todo Make verbose accept a number of messages, only prints to console every X messages.
*/
public class PingPongBouncer implements MessageListener
{
private static final Logger _logger = Logger.getLogger(PingPongBouncer.class);
- /**
- * The default prefetch size for the message consumer.
- */
+ /** The default prefetch size for the message consumer. */
private static final int PREFETCH = 1;
- /**
- * The default no local flag for the message consumer.
- */
+ /** The default no local flag for the message consumer. */
private static final boolean NO_LOCAL = true;
private static final String DEFAULT_DESTINATION_NAME = "ping";
- /**
- * The default exclusive flag for the message consumer.
- */
+ /** The default exclusive flag for the message consumer. */
private static final boolean EXCLUSIVE = false;
- /**
- * A convenient formatter to use when time stamping output.
- */
+ /** A convenient formatter to use when time stamping output. */
protected static final SimpleDateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");
- /**
- * Used to indicate that the reply generator should log timing info to the console (logger info level).
- */
+ /** Used to indicate that the reply generator should log timing info to the console (logger info level). */
private boolean _verbose = false;
- /**
- * Determines whether this bounce back client bounces back messages persistently.
- */
+ /** Determines whether this bounce back client bounces back messages persistently. */
private boolean _persistent = false;
private Destination _consumerDestination;
- /**
- * Keeps track of the response destination of the previous message for the last reply to producer cache.
- */
+ /** Keeps track of the response destination of the previous message for the last reply to producer cache. */
private Destination _lastResponseDest;
- /**
- * The producer for sending replies with.
- */
+ /** The producer for sending replies with. */
private MessageProducer _replyProducer;
- /**
- * The consumer controlSession.
- */
+ /** The consumer controlSession. */
private Session _consumerSession;
- /**
- * The producer controlSession.
- */
+ /** The producer controlSession. */
private Session _producerSession;
- /**
- * Holds the connection to the broker.
- */
- private Connection _connection;
+ /** Holds the connection to the broker. */
+ private AMQConnection _connection;
- /**
- * Flag used to indicate if this is a point to point or pub/sub ping client.
- */
+ /** Flag used to indicate if this is a point to point or pub/sub ping client. */
private boolean _isPubSub = false;
/**
@@ -135,21 +119,22 @@
/**
* Creates a PingPongBouncer on the specified producer and consumer sessions.
*
- * @param fileProperties The path to the file properties
- * @param factoryName The factory name
+ * @param brokerDetails The addresses of the brokers to connect to.
* @param username The broker username.
* @param password The broker password.
+ * @param virtualpath The virtual host name within the broker.
* @param destinationName The name of the queue to receive pings on
* (or root of the queue name where many queues are generated).
* @param persistent A flag to indicate that persistent message should be used.
* @param transacted A flag to indicate that pings should be sent within transactions.
* @param selector A message selector to filter received pings with.
* @param verbose A flag to indicate that message timings should be sent to the console.
+ *
* @throws Exception All underlying exceptions allowed to fall through. This is only test code...
*/
- public PingPongBouncer(String fileProperties, String factoryName, String username, String password,
- String destinationName, boolean persistent, boolean transacted,
- String selector, boolean verbose, boolean pubsub) throws Exception
+ public PingPongBouncer(String brokerDetails, String username, String password, String virtualpath,
+ String destinationName, boolean persistent, boolean transacted, String selector, boolean verbose,
+ boolean pubsub) throws Exception
{
// Create a client id to uniquely identify this client.
InetAddress address = InetAddress.getLocalHost();
@@ -158,9 +143,11 @@
_persistent = persistent;
setPubSub(pubsub);
// Connect to the broker.
- Context context = InitialContextHelper.getInitialContext(fileProperties);
- ConnectionFactory factory = (ConnectionFactory) context.lookup(factoryName);
- setConnection(factory.createConnection(username, password));
+ setConnection(new AMQConnection(brokerDetails, username, password, clientId, virtualpath));
+ _logger.info("Connected with URL:" + getConnection().toURL());
+
+ // Set up the failover notifier.
+ getConnection().setConnectionListener(new FailoverNotifier());
// Create a controlSession to listen for messages on and one to send replies on, transactional depending on the
// command line option.
@@ -169,7 +156,8 @@
// Create the queue to listen for message on.
createConsumerDestination(destinationName);
- MessageConsumer consumer = _consumerSession.createConsumer(_consumerDestination, selector, NO_LOCAL);
+ MessageConsumer consumer =
+ _consumerSession.createConsumer(_consumerDestination, PREFETCH, NO_LOCAL, EXCLUSIVE, selector);
// Create a producer for the replies, without a default destination.
_replyProducer = _producerSession.createProducer(null);
@@ -180,10 +168,57 @@
consumer.setMessageListener(this);
}
+ /**
+ * Starts a stand alone ping-pong client running in verbose mode.
+ *
+ * @param args
+ */
+ public static void main(String[] args) throws Exception
+ {
+ System.out.println("Starting...");
+
+ // Display help on the command line.
+ if (args.length == 0)
+ {
+ _logger.info("Running test with default values...");
+ //usage();
+ //System.exit(0);
+ }
+
+ // Extract all command line parameters.
+ Config config = new Config();
+ config.setOptions(args);
+ String brokerDetails = config.getHost() + ":" + config.getPort();
+ String virtualpath = "test";
+ String destinationName = config.getDestination();
+ if (destinationName == null)
+ {
+ destinationName = DEFAULT_DESTINATION_NAME;
+ }
+
+ String selector = config.getSelector();
+ boolean transacted = config.isTransacted();
+ boolean persistent = config.usePersistentMessages();
+ boolean pubsub = config.isPubSub();
+ boolean verbose = true;
+
+ //String selector = null;
+
+ // Instantiate the ping pong client with the command line options and start it running.
+ PingPongBouncer pingBouncer =
+ new PingPongBouncer(brokerDetails, "guest", "guest", virtualpath, destinationName, persistent, transacted,
+ selector, verbose, pubsub);
+ pingBouncer.getConnection().start();
+
+ System.out.println("Waiting...");
+ }
+
private static void usage()
{
- System.err.println(
- "Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n" + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n" + "-persistent : (true/false). Default is false\n" + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n");
+ System.err.println("Usage: PingPongBouncer \n" + "-host : broker host\n" + "-port : broker port\n"
+ + "-destinationname : queue/topic name\n" + "-transacted : (true/false). Default is false\n"
+ + "-persistent : (true/false). Default is false\n"
+ + "-pubsub : (true/false). Default is false\n" + "-selector : selector string\n");
}
/**
@@ -200,8 +235,8 @@
String messageCorrelationId = message.getJMSCorrelationID();
if (_verbose)
{
- _logger.info(timestampFormatter
- .format(new Date()) + ": Got ping with correlation id, " + messageCorrelationId);
+ _logger.info(timestampFormatter.format(new Date()) + ": Got ping with correlation id, "
+ + messageCorrelationId);
}
// Get the reply to destination from the message and check it is set.
@@ -234,8 +269,8 @@
if (_verbose)
{
- _logger.info(timestampFormatter
- .format(new Date()) + ": Sent reply with correlation id, " + messageCorrelationId);
+ _logger.info(timestampFormatter.format(new Date()) + ": Sent reply with correlation id, "
+ + messageCorrelationId);
}
// Commit the transaction if running in transactional mode.
@@ -252,7 +287,7 @@
*
* @return The underlying connection that this ping client is running on.
*/
- public Connection getConnection()
+ public AMQConnection getConnection()
{
return _connection;
}
@@ -262,7 +297,7 @@
*
* @param connection The ping connection.
*/
- public void setConnection(Connection connection)
+ public void setConnection(AMQConnection connection)
{
this._connection = connection;
}
@@ -290,7 +325,7 @@
/**
* Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not
* a transactional controlSession, this method does nothing.
- * <p/>
+ *
* <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the
* commit is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker
* after the commit is applied.
@@ -305,7 +340,7 @@
{
if (_failBeforeCommit)
{
- _logger.trace("Failing Before Commit");
+ _logger.debug("Failing Before Commit");
doFailover();
}
@@ -313,11 +348,11 @@
if (_failAfterCommit)
{
- _logger.trace("Failing After Commit");
+ _logger.debug("Failing After Commit");
doFailover();
}
- _logger.trace("Session Commited.");
+ _logger.debug("Session Commited.");
}
catch (JMSException e)
{
@@ -353,8 +388,7 @@
System.in.read();
}
catch (IOException e)
- {
- }
+ { }
System.out.println("Continuing.");
}
@@ -371,22 +405,49 @@
System.in.read();
}
catch (IOException e)
- {
- }
+ { }
System.out.println("Continuing.");
}
- private void createConsumerDestination(String name) throws JMSException
+ private void createConsumerDestination(String name)
{
if (isPubSub())
{
- _consumerDestination = _consumerSession.createTopic(name);
+ _consumerDestination = new AMQTopic(ExchangeDefaults.TOPIC_EXCHANGE_NAME, name);
}
else
{
- _consumerDestination = _consumerSession.createQueue(name);
+ _consumerDestination = new AMQQueue(ExchangeDefaults.DIRECT_EXCHANGE_NAME, name);
+ }
+ }
+
+ /**
+ * A connection listener that logs out any failover complete events. Could do more interesting things with this
+ * at some point...
+ */
+ public static class FailoverNotifier implements ConnectionListener
+ {
+ public void bytesSent(long count)
+ { }
+
+ public void bytesReceived(long count)
+ { }
+
+ public boolean preFailover(boolean redirect)
+ {
+ return true;
+ }
+
+ public boolean preResubscribe()
+ {
+ return true;
+ }
+
+ public void failoverComplete()
+ {
+ _logger.info("App got failover complete callback.");
}
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongProducer.java Tue Feb 12 09:25:44 2008
@@ -669,9 +669,12 @@
// _log.debug("protected void createConnection(String clientID = " + clientID + "): called");
// _log.debug("Creating a connection for the message producer.");
-
+ File propsFile = new File(_fileProperties);
+ InputStream is = new FileInputStream(propsFile);
+ Properties properties = new Properties();
+ properties.load(is);
- Context context = InitialContextHelper.getInitialContext(_fileProperties);
+ Context context = new InitialContext(properties);
ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName);
_connection = factory.createConnection(_username, _password);
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java?rev=620871&r1=620870&r2=620871&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/perftests/src/main/java/org/apache/qpid/requestreply/PingPongTestPerf.java Tue Feb 12 09:25:44 2008
@@ -172,10 +172,10 @@
PerThreadSetup perThreadSetup = new PerThreadSetup();
// Extract the test set up paramaeters.
- String fileProperties = testParameters.getProperty(PingPongProducer.FILE_PROPERTIES_PROPNAME);
- String factoryName = testParameters.getProperty(PingPongProducer.FACTORY_NAME_PROPNAME);
+ String brokerDetails = testParameters.getProperty(PingPongProducer.BROKER_PROPNAME);
String username = testParameters.getProperty(PingPongProducer.USERNAME_PROPNAME);
String password = testParameters.getProperty(PingPongProducer.PASSWORD_PROPNAME);
+ String virtualPath = testParameters.getProperty(PingPongProducer.VIRTUAL_HOST_PROPNAME);
String destinationName = testParameters.getProperty(PingPongProducer.PING_QUEUE_NAME_PROPNAME);
boolean persistent = testParameters.getPropertyAsBoolean(PingPongProducer.PERSISTENT_MODE_PROPNAME);
boolean transacted = testParameters.getPropertyAsBoolean(PingPongProducer.TRANSACTED_PROPNAME);
@@ -187,7 +187,7 @@
{
// Establish a bounce back client on the ping queue to bounce back the pings.
perThreadSetup._testPingBouncer =
- new PingPongBouncer(fileProperties, factoryName, username, password, destinationName, persistent,
+ new PingPongBouncer(brokerDetails, username, password, virtualPath, destinationName, persistent,
transacted, selector, verbose, pubsub);
// Start the connections for client and producer running.