You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/03/06 15:12:49 UTC
svn commit: r515127 [2/2] - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/serv...
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Mar 6 06:12:47 2007
@@ -198,9 +198,10 @@
private final Object _suspensionLock = new Object();
-
/** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
+ private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+
private class Dispatcher extends Thread
{
@@ -212,12 +213,37 @@
public Dispatcher()
{
super("Dispatcher-Channel-" + _channelId);
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(getName() + " created");
+ }
}
public void run()
{
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(getName() + " started");
+ }
+
UnprocessedMessage message;
+ // Allow disptacher to start stopped
+ synchronized (_lock)
+ {
+ while (connectionStopped())
+ {
+ try
+ {
+ _lock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ // ignore
+ }
+ }
+ }
+
try
{
while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
@@ -243,10 +269,12 @@
}
catch (InterruptedException e)
{
- ;
+ //ignore
+ }
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId);
}
-
- _logger.info("Dispatcher thread terminating for channel " + _channelId);
}
// only call while holding lock
@@ -263,6 +291,12 @@
currently = _connectionStopped;
_connectionStopped = connectionStopped;
_lock.notify();
+
+ if (_dispatcherLogger.isDebugEnabled())
+ {
+ _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") +
+ ": Currently " + (currently ? "Started" : "Stopped"));
+ }
}
return currently;
}
@@ -275,9 +309,14 @@
if (consumer == null)
{
- _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring...");
- _logger.warn("Consumers that exist: " + _consumers);
- _logger.warn("Session hashcode: " + System.identityHashCode(this));
+ if (_dispatcherLogger.isInfoEnabled())
+ {
+ _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
+ "[" + message.getDeliverBody().deliveryTag + "] from queue "
+ + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)...");
+ }
+
+ rejectMessage(message, true);
}
else
{
@@ -311,7 +350,7 @@
rejectAllMessages(true);
- _logger.debug("Session Pre Dispatch Queue cleared");
+ _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
for (BasicMessageConsumer consumer : _consumers.values())
{
@@ -323,20 +362,28 @@
}
- public void rejectPending(AMQShortString consumerTag)
+ public void rejectPending(BasicMessageConsumer consumer)
{
synchronized (_lock)
{
- boolean stopped = connectionStopped();
+ boolean stopped = _dispatcher.connectionStopped();
- _dispatcher.setConnectionStopped(false);
-
- rejectMessagesForConsumerTag(consumerTag, true);
-
- if (stopped)
+ if (!stopped)
{
- _dispatcher.setConnectionStopped(stopped);
+ _dispatcher.setConnectionStopped(true);
}
+
+ // Reject messages on pre-receive queue
+ consumer.rollback();
+
+ // Reject messages on pre-dispatch queue
+ rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+
+ // Remove consumer from map.
+ deregisterConsumer(consumer);
+
+ _dispatcher.setConnectionStopped(stopped);
+
}
}
}
@@ -549,14 +596,15 @@
suspendChannel(true);
}
- _connection.getProtocolHandler().syncWrite(
- TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
-
if (_dispatcher != null)
{
_dispatcher.rollback();
}
+ _connection.getProtocolHandler().syncWrite(
+ TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+
+
if (!isSuspended)
{
suspendChannel(false);
@@ -663,14 +711,10 @@
jmse = e;
}
}
- finally
+ if (jmse != null)
{
- if (jmse != null)
- {
- throw jmse;
- }
+ throw jmse;
}
-
}
@@ -835,6 +879,11 @@
consumer.clearUnackedMessages();
}
+ if (_dispatcher != null)
+ {
+ _dispatcher.rollback();
+ }
+
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
@@ -844,11 +893,6 @@
false) // requeue
, BasicRecoverOkBody.class);
- if (_dispatcher != null)
- {
- _dispatcher.rollback();
- }
-
if (!isSuspended)
{
suspendChannel(false);
@@ -1223,35 +1267,17 @@
return (counter != null) && (counter.get() != 0);
}
-
- public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException
+ public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
{
- declareExchange(name, type, getProtocolHandler());
+ declareExchange(name, type, getProtocolHandler(), nowait);
}
- public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException
+ private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
- // TODO: Be aware of possible changes to parameter order as versions change.
- AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
- null, // arguments
- false, // autoDelete
- false, // durable
- name, // exchange
- false, // internal
- false, // nowait
- false, // passive
- getTicket(), // ticket
- type); // type
- getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
+ declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
}
- private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
- {
- declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
- }
-
- private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException
+ private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
{
// TODO: Be aware of possible changes to parameter order as versions change.
AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
@@ -1261,7 +1287,7 @@
false, // durable
name, // exchange
false, // internal
- false, // nowait
+ nowait, // nowait
false, // passive
getTicket(), // ticket
type); // type
@@ -1874,15 +1900,21 @@
synchronized void startDistpatcherIfNecessary()
{
+ startDistpatcherIfNecessary(false);
+ }
+
+ synchronized void startDistpatcherIfNecessary(boolean initiallyStopped)
+ {
if (_dispatcher == null)
{
_dispatcher = new Dispatcher();
_dispatcher.setDaemon(true);
+ _dispatcher.setConnectionStopped(initiallyStopped);
_dispatcher.start();
}
else
{
- _dispatcher.setConnectionStopped(false);
+ _dispatcher.setConnectionStopped(initiallyStopped);
}
}
@@ -1910,7 +1942,7 @@
AMQProtocolHandler protocolHandler = getProtocolHandler();
- declareExchange(amqd, protocolHandler);
+ declareExchange(amqd, protocolHandler, false);
AMQShortString queueName = declareQueue(amqd, protocolHandler);
@@ -1950,12 +1982,6 @@
_destinationConsumerCount.remove(dest);
}
}
-
- //ensure we remove the messages from the consumer even if the dispatcher hasn't started
- if (_dispatcher == null)
- {
- rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
- }// if the dispatcher is running we have to do the clean up in the Ok Handler.
}
}
@@ -2033,6 +2059,8 @@
public void confirmConsumerCancelled(AMQShortString consumerTag)
{
+
+ // Remove the consumer from the map
BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
if (consumer != null)
{
@@ -2040,26 +2068,33 @@
{
consumer.closeWhenNoMessages(true);
}
+
+ //Clean the Maps up first
+ //Flush any pending messages for this consumerTag
+ if (_dispatcher != null)
+ {
+ _logger.info("Dispatcher is not null");
+ }
else
{
- consumer.rollback();
+ _logger.info("Dispatcher is null so created stopped dispatcher");
+
+ startDistpatcherIfNecessary(true);
}
- }
- //Flush any pending messages for this consumerTag
- if (_dispatcher != null)
- {
- _dispatcher.rejectPending(consumerTag);
+ _dispatcher.rejectPending(consumer);
}
else
{
- rejectMessagesForConsumerTag(consumerTag, true);
+ _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
}
+
+
}
/*
- * I could have combined the last 3 methods, but this way it improves readability
- */
+ * I could have combined the last 3 methods, but this way it improves readability
+ */
private AMQTopic checkValidTopic(Topic topic) throws JMSException
{
if (topic == null)
@@ -2189,16 +2224,20 @@
if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag))
{
- if (_logger.isTraceEnabled())
+ if (_logger.isDebugEnabled())
{
- _logger.trace("Removing message from _queue:" + message);
+ _logger.debug("Removing message(" + System.identityHashCode(message) +
+ ") from _queue DT:" + message.getDeliverBody().deliveryTag);
}
messages.remove();
- rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+ rejectMessage(message, requeue);
- _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
+ }
}
else
{
@@ -2207,15 +2246,45 @@
}
}
+
+ public void rejectMessage(UnprocessedMessage message, boolean requeue)
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+ }
+
+ rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+ }
+
+ public void rejectMessage(AbstractJMSMessage message, boolean requeue)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
+ }
+ rejectMessage(message.getDeliveryTag(), requeue);
+
+ }
+
public void rejectMessage(long deliveryTag, boolean requeue)
{
- AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
- getProtocolMajorVersion(),
- getProtocolMinorVersion(),
- deliveryTag,
- requeue);
+ if (_acknowledgeMode == CLIENT_ACKNOWLEDGE ||
+ _acknowledgeMode == SESSION_TRANSACTED)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting delivery tag:" + deliveryTag);
+ }
+ AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ deliveryTag,
+ requeue);
- _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ _connection.getProtocolHandler().writeFrame(basicRejectBody);
+ }
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Mar 6 06:12:47 2007
@@ -21,6 +21,7 @@
package org.apache.qpid.client;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@@ -109,9 +110,6 @@
/** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
private int _outstanding;
- /** Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. */
- private long _lastDeliveryTag;
-
/**
* Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
* number of msgs >= _prefetchHigh and disabled at < _prefetchLow
@@ -120,6 +118,9 @@
private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+ /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */
+ private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+
/**
* The thread that was used to call receive(). This is important for being able to interrupt that thread if a
* receive() is in progress.
@@ -432,6 +433,11 @@
public void close(boolean sendClose) throws JMSException
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Closing consumer:" + debugIdentity());
+ }
+
synchronized (_connection.getFailoverMutex())
{
if (!_closed.getAndSet(true))
@@ -448,6 +454,12 @@
try
{
_protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("CancelOk'd for consumer:" + debugIdentity());
+ }
+
}
catch (AMQException e)
{
@@ -456,11 +468,14 @@
}
}
- deregisterConsumer();
- _unacknowledgedDeliveryTags.clear();
+ //done in BasicCancelOK Handler
+ //deregisterConsumer();
if (_messageListener != null && _receiving.get())
{
- _logger.info("Interrupting thread: " + _receivingThread);
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Interrupting thread: " + _receivingThread);
+ }
_receivingThread.interrupt();
}
}
@@ -616,7 +631,7 @@
}
else
{
- _lastDeliveryTag = msg.getDeliveryTag();
+ _receivedDeliveryTags.add(msg.getDeliveryTag());
}
break;
}
@@ -625,10 +640,16 @@
/** Acknowledge up to last message delivered (if any). Used when commiting. */
void acknowledgeLastDelivered()
{
- if (_lastDeliveryTag > 0)
+ if (!_receivedDeliveryTags.isEmpty())
{
- _session.acknowledgeMessage(_lastDeliveryTag, true);
- _lastDeliveryTag = -1;
+ long lastDeliveryTag = _receivedDeliveryTags.poll();
+
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ lastDeliveryTag = _receivedDeliveryTags.poll();
+ }
+
+ _session.acknowledgeMessage(lastDeliveryTag, true);
}
}
@@ -738,43 +759,76 @@
public void rollback()
{
+ clearUnackedMessages();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Rejecting received messages");
+ }
+ //rollback received but not committed messages
+ while (!_receivedDeliveryTags.isEmpty())
+ {
+ Long tag = _receivedDeliveryTags.poll();
+
+ if (tag != null)
+ {
+ if (_logger.isTraceEnabled())
+ {
+ _logger.trace("Rejecting tag from _receivedDTs:" + tag);
+ }
+
+ _session.rejectMessage(tag, true);
+ }
+ }
+
+ //rollback pending messages
if (_synchronousQueue.size() > 0)
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag);
+ _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" +
+ "for consumer with tag:" + _consumerTag);
}
Iterator iterator = _synchronousQueue.iterator();
+
while (iterator.hasNext())
{
- Object o = iterator.next();
+ Object o = iterator.next();
if (o instanceof AbstractJMSMessage)
{
- _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true);
+ _session.rejectMessage(((AbstractJMSMessage) o), true);
if (_logger.isTraceEnabled())
{
- _logger.trace("Rejected message" + o);
- iterator.remove();
+ _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
}
+ iterator.remove();
}
else
{
_logger.error("Queue contained a :" + o.getClass() +
" unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+ iterator.remove();
}
}
if (_synchronousQueue.size() != 0)
{
_logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
+ rollback();
}
_synchronousQueue.clear();
}
+ }
+
+
+ public String debugIdentity()
+ {
+ return String.valueOf(_consumerTag);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java Tue Mar 6 06:12:47 2007
@@ -28,27 +28,29 @@
import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-/**
- * @author Apache Software Foundation
- */
public class BasicCancelOkMethodHandler implements StateAwareMethodListener
{
- private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class);
- private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler();
+ private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class);
+ private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler();
- public static BasicCancelOkMethodHandler getInstance()
- {
- return _instance;
- }
+ public static BasicCancelOkMethodHandler getInstance()
+ {
+ return _instance;
+ }
- private BasicCancelOkMethodHandler()
- {
- }
+ private BasicCancelOkMethodHandler()
+ {
+ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
- {
- _logger.debug("New BasicCancelOk method received");
- BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
- protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
- }
+ public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+ {
+ BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("New BasicCancelOk method received for consumer:" + body.consumerTag);
+ }
+
+ protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Tue Mar 6 06:12:47 2007
@@ -30,13 +30,11 @@
import org.apache.qpid.framing.ContentHeaderBody;
/**
- * This class contains everything needed to process a JMS message. It assembles the
- * deliver body, the content header and the content body/ies.
- *
- * Note that the actual work of creating a JMS message for the client code's use is done
- * outside of the MINA dispatcher thread in order to minimise the amount of work done in
- * the MINA dispatcher thread.
+ * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
+ * the content body/ies.
*
+ * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
+ * thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
public class UnprocessedMessage
{
@@ -47,9 +45,7 @@
private final int _channelId;
private ContentHeaderBody _contentHeader;
- /**
- * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
- */
+ /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
@@ -74,9 +70,9 @@
{
final long payloadSize = body.payload.remaining();
- if(_bodies == null)
+ if (_bodies == null)
{
- if(payloadSize == getContentHeader().bodySize)
+ if (payloadSize == getContentHeader().bodySize)
{
_bodies = Collections.singletonList(body);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java Tue Mar 6 06:12:47 2007
@@ -58,6 +58,7 @@
{
_logger.debug("State " + _state + " not achieved so waiting...");
_monitor.wait(TIME_OUT);
+ //fixme this won't cause the timeout to exit the loop. need to set _throwable
}
catch (InterruptedException e)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Tue Mar 6 06:12:47 2007
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.client.util;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.log4j.Logger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java Tue Mar 6 06:12:47 2007
@@ -73,7 +73,8 @@
Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ // This is the default now
AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -130,7 +131,8 @@
Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
MessageConsumer consumer = consumerSession.createConsumer(queue);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ // This is the default now
AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java Tue Mar 6 06:12:47 2007
@@ -109,6 +109,10 @@
}
catch (AMQException e)
{
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Exception occured was:" + e.getErrorCode());
+ }
assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
_connection = newConnection();
@@ -315,15 +319,15 @@
}
catch (JMSException e)
{
- fail("Creating new connection when:"+e.getMessage());
+ fail("Creating new connection when:" + e.getMessage());
}
catch (AMQException e)
{
- fail("Creating new connection when:"+e.getMessage());
+ fail("Creating new connection when:" + e.getMessage());
}
catch (URLSyntaxException e)
{
- fail("Creating new connection when:"+e.getMessage());
+ fail("Creating new connection when:" + e.getMessage());
}
Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?view=auto&rev=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Tue Mar 6 06:12:47 2007
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.close;
+
+import junit.framework.TestCase;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.MessageConsumer;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Level;
+
+public class MessageRequeueTest extends TestCase
+{
+
+ private static final Logger _logger = Logger.getLogger(MessageRequeueTest.class);
+
+ protected static AtomicInteger consumerIds = new AtomicInteger(0);
+ protected final Integer numTestMessages = 150;
+
+ protected final int consumeTimeout = 3000;
+
+ protected final String queue = "direct://amq.direct//queue";
+ protected String payload = "Message:";
+
+ protected final String BROKER = "vm://:1";
+ private boolean testReception = true;
+
+ private long[] receieved = new long[numTestMessages + 1];
+ private boolean passed=false;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ TransportConnection.createVMBroker(1);
+
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
+ // load test data
+ _logger.info("creating test data, " + numTestMessages + " messages");
+ conn.put(queue, payload, numTestMessages);
+ // close this connection
+ conn.disconnect();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ if (!passed)
+ {
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+ // clear queue
+ conn.consume(queue, consumeTimeout);
+ }
+ TransportConnection.killVMBroker(1);
+ }
+
+ /** multiple consumers */
+ public void testDrain() throws JMSException, InterruptedException
+ {
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+
+ _logger.info("consuming queue " + queue);
+ Queue q = conn.getSession().createQueue(queue);
+
+ final MessageConsumer consumer = conn.getSession().createConsumer(q);
+ int messagesReceived = 0;
+
+ long messageLog[] = new long[numTestMessages + 1];
+
+ _logger.info("consuming...");
+ Message msg = consumer.receive(1000);
+ while (msg != null)
+ {
+ messagesReceived++;
+
+ long dt = ((AbstractJMSMessage) msg).getDeliveryTag();
+
+ int msgindex = msg.getIntProperty("index");
+ if (messageLog[msgindex] != 0)
+ {
+ _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() +
+ ") more than once.");
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
+ "DT:" + dt +
+ "IN:" + msgindex);
+ }
+
+ if (dt == 0)
+ {
+ _logger.error("DT is zero for msg:" + msgindex);
+ }
+
+ messageLog[msgindex] = dt;
+
+ //get Next message
+ msg = consumer.receive(1000);
+ }
+
+ conn.getSession().commit();
+ consumer.close();
+ assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
+
+ int index = 0;
+ StringBuilder list = new StringBuilder();
+ list.append("Failed to receive:");
+ int failed = 0;
+
+ for (long b : messageLog)
+ {
+ if (b == 0 && index != 0) //delivery tag of zero shouldn't exist
+ {
+ _logger.error("Index: " + index + " was not received.");
+ list.append(" ");
+ list.append(index);
+ list.append(":");
+ list.append(b);
+ failed++;
+ }
+
+ index++;
+ }
+ assertEquals(list.toString(), 0, failed);
+ _logger.info("consumed: " + messagesReceived);
+ conn.disconnect();
+ }
+
+ /** multiple consumers */
+ public void testTwoCompetingConsumers()
+ {
+ Consumer c1 = new Consumer();
+ Consumer c2 = new Consumer();
+ Consumer c3 = new Consumer();
+ Consumer c4 = new Consumer();
+
+ Thread t1 = new Thread(c1);
+ Thread t2 = new Thread(c2);
+ Thread t3 = new Thread(c3);
+ Thread t4 = new Thread(c4);
+
+ t1.start();
+// t2.start();
+// t3.start();
+// t4.start();
+
+ try
+ {
+ t1.join();
+ t2.join();
+ t3.join();
+ t4.join();
+ }
+ catch (InterruptedException e)
+ {
+ fail("Uanble to join to Consumer theads");
+ }
+
+ _logger.info("consumer 1 count is " + c1.getCount());
+ _logger.info("consumer 2 count is " + c2.getCount());
+ _logger.info("consumer 3 count is " + c3.getCount());
+ _logger.info("consumer 4 count is " + c4.getCount());
+
+ Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount();
+
+ // Check all messages were correctly delivered
+ int index = 0;
+ StringBuilder list = new StringBuilder();
+ list.append("Failed to receive:");
+ int failed = 0;
+
+ for (long b : receieved)
+ {
+ if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0)
+ {
+ _logger.error("Index: " + index + " was not received.");
+ list.append(" ");
+ list.append(index);
+ list.append(":");
+ list.append(b);
+ failed++;
+ }
+ index++;
+ }
+ assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
+ assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
+ passed=true;
+ }
+
+ class Consumer implements Runnable
+ {
+ private Integer count = 0;
+ private Integer id;
+
+ public Consumer()
+ {
+ id = consumerIds.addAndGet(1);
+ }
+
+ public void run()
+ {
+ try
+ {
+ _logger.info("consumer-" + id + ": starting");
+ QpidClientConnection conn = new QpidClientConnection();
+
+ conn.connect();
+
+ _logger.info("consumer-" + id + ": connected, consuming...");
+ Message result;
+ do
+ {
+ result = conn.getNextMessage(queue, consumeTimeout);
+ if (result != null)
+ {
+
+ long dt = ((AbstractJMSMessage) result).getDeliveryTag();
+
+ if (testReception)
+ {
+ int msgindex = result.getIntProperty("index");
+ if (receieved[msgindex] != 0)
+ {
+ _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() +
+ ") more than once.");
+ }
+
+ if (_logger.isInfoEnabled())
+ {
+ _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
+ "DT:" + dt +
+ "IN:" + msgindex);
+ }
+
+ if (dt == 0)
+ {
+ _logger.error("DT is zero for msg:" + msgindex);
+ }
+
+ receieved[msgindex] = dt;
+ }
+
+
+ count++;
+ if (count % 100 == 0)
+ {
+ _logger.info("consumer-" + id + ": got " + result + ", new count is " + count);
+ }
+ }
+ }
+ while (result != null);
+
+ _logger.info("consumer-" + id + ": complete");
+ conn.disconnect();
+
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ public Integer getCount()
+ {
+ return count;
+ }
+
+ public Integer getId()
+ {
+ return id;
+ }
+ }
+
+
+ public class QpidClientConnection implements ExceptionListener
+ {
+ private boolean transacted = true;
+ private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+ private Connection connection;
+
+ private String virtualHost;
+ private String brokerlist;
+ private int prefetch;
+ protected Session session;
+ protected boolean connected;
+
+ public QpidClientConnection()
+ {
+ super();
+ setVirtualHost("/test");
+ setBrokerList(BROKER);
+ setPrefetch(5000);
+ }
+
+
+ public void connect() throws JMSException
+ {
+ if (!connected)
+ {
+ /*
+ * amqp://[user:pass@][clientid]/virtualhost?
+ * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+ * [&failover='method[?option='value'[&option='value']]']
+ * [&option='value']"
+ */
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+ try
+ {
+ AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+ _logger.info("connecting to Qpid :" + brokerUrl);
+ connection = factory.createConnection();
+
+ // register exception listener
+ connection.setExceptionListener(this);
+
+ session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+ _logger.info("starting connection");
+ connection.start();
+
+ connected = true;
+ }
+ catch (URLSyntaxException e)
+ {
+ throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+ }
+ }
+ }
+
+ public void disconnect() throws JMSException
+ {
+ if (connected)
+ {
+ session.commit();
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected");
+ }
+ }
+
+ public void disconnectWithoutCommit() throws JMSException
+ {
+ if (connected)
+ {
+ session.close();
+ connection.close();
+ connected = false;
+ _logger.info("disconnected without commit");
+ }
+ }
+
+ public String getBrokerList()
+ {
+ return brokerlist;
+ }
+
+ public void setBrokerList(String brokerlist)
+ {
+ this.brokerlist = brokerlist;
+ }
+
+ public String getVirtualHost()
+ {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost)
+ {
+ this.virtualHost = virtualHost;
+ }
+
+ public void setPrefetch(int prefetch)
+ {
+ this.prefetch = prefetch;
+ }
+
+
+ /** override as necessary */
+ public void onException(JMSException exception)
+ {
+ _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+ }
+
+ public boolean isConnected()
+ {
+ return connected;
+ }
+
+ public Session getSession()
+ {
+ return session;
+ }
+
+ /**
+ * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+ *
+ * @param queueName The queue name to put to
+ * @param payload the content of the payload
+ * @param copies the number of messages to put
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public void put(String queueName, String payload, int copies) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("putting to queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageProducer sender = session.createProducer(queue);
+
+ for (int i = 0; i < copies; i++)
+ {
+ Message m = session.createTextMessage(payload + i);
+ m.setIntProperty("index", i + 1);
+ sender.send(m);
+ }
+
+ session.commit();
+ sender.close();
+ _logger.info("put " + copies + " copies");
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message. Accepts timeout value.
+ *
+ * @param queueName The quename to get from
+ * @param readTimeout The timeout to use
+ *
+ * @return the content of the text message if any
+ *
+ * @throws javax.jms.JMSException any exception that occured
+ */
+ public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ Message message = consumer.receive(readTimeout);
+ session.commit();
+ consumer.close();
+
+ Message result;
+
+ // all messages we consume should be TextMessages
+ if (message instanceof TextMessage)
+ {
+ result = ((TextMessage) message);
+ }
+ else if (null == message)
+ {
+ result = null;
+ }
+ else
+ {
+ _logger.info("warning: received non-text message");
+ result = message;
+ }
+
+ return result;
+ }
+
+ /**
+ * GET the top message on a queue. Consumes the message.
+ *
+ * @param queueName The Queuename to get from
+ *
+ * @return The string content of the text message, if any received
+ *
+ * @throws javax.jms.JMSException any exception that occurs
+ */
+ public Message getNextMessage(String queueName) throws JMSException
+ {
+ return getNextMessage(queueName, 0);
+ }
+
+ /**
+ * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+ *
+ * @param queueName The Queue name to consume from
+ * @param readTimeout The timeout for each consume
+ *
+ * @throws javax.jms.JMSException Any exception that occurs during the consume
+ * @throws InterruptedException If the consume thread was interrupted during a consume.
+ */
+ public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+ {
+ if (!connected)
+ {
+ connect();
+ }
+
+ _logger.info("consuming queue " + queueName);
+ Queue queue = session.createQueue(queueName);
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+ int messagesReceived = 0;
+
+ _logger.info("consuming...");
+ while ((consumer.receive(readTimeout)) != null)
+ {
+ messagesReceived++;
+ }
+
+ session.commit();
+ consumer.close();
+ _logger.info("consumed: " + messagesReceived);
+ }
+ }
+
+
+ public void testRequeue() throws JMSException, AMQException, URLSyntaxException
+ {
+ String virtualHost = "/test";
+ String brokerlist = "vm://:1";
+ String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+
+ Connection conn = new AMQConnection(brokerUrl);
+ Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue q = session.createQueue(queue);
+
+ _logger.info("Create Consumer");
+ MessageConsumer consumer = session.createConsumer(q);
+
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+ //
+ }
+
+ _logger.info("Receiving msg");
+ Message msg = consumer.receive();
+
+ assertNotNull("Message should not be null", msg);
+
+ _logger.info("Close Consumer");
+ consumer.close();
+
+ _logger.info("Close Connection");
+ conn.close();
+ }
+
+}
\ No newline at end of file
Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Tue Mar 6 06:12:47 2007
@@ -80,7 +80,8 @@
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ // This is the default now
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Tue Mar 6 06:12:47 2007
@@ -43,7 +43,8 @@
public class CommitRollbackTest extends TestCase
{
protected AMQConnection conn;
- protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
+ protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
+ protected static int testMethod = 0;
protected String payload = "xyzzy";
private Session _session;
private MessageProducer _publisher;
@@ -57,6 +58,11 @@
{
super.setUp();
TransportConnection.createVMBroker(1);
+
+ testMethod++;
+ queue += testMethod;
+
+
newConnection();
}
@@ -84,7 +90,11 @@
TransportConnection.killVMBroker(1);
}
- /** PUT a text message, disconnect before commit, confirm it is gone. */
+ /**
+ * PUT a text message, disconnect before commit, confirm it is gone.
+ *
+ * @throws Exception On error
+ */
public void testPutThenDisconnect() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -109,7 +119,11 @@
assertNull("test message was put and disconnected before commit, but is still present", result);
}
- /** PUT a text message, disconnect before commit, confirm it is gone. */
+ /**
+ * PUT a text message, disconnect before commit, confirm it is gone.
+ *
+ * @throws Exception On error
+ */
public void testPutThenCloseDisconnect() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -140,6 +154,8 @@
/**
* PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different
* session as producer
+ *
+ * @throws Exception On error
*/
public void testPutThenRollback() throws Exception
{
@@ -160,7 +176,11 @@
assertNull("test message was put and rolled back, but is still present", result);
}
- /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */
+ /**
+ * GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection
+ *
+ * @throws Exception On error
+ */
public void testGetThenDisconnect() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -194,6 +214,8 @@
/**
* GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the
* same connection but different session as producer
+ *
+ * @throws Exception On error
*/
public void testGetThenCloseDisconnect() throws Exception
{
@@ -230,6 +252,8 @@
/**
* GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt
* session to the producer
+ *
+ * @throws Exception On error
*/
public void testGetThenRollback() throws Exception
{
@@ -266,6 +290,8 @@
/**
* GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same
* connection but different session as producer
+ *
+ * @throws Exception On error
*/
public void testGetThenCloseRollback() throws Exception
{
@@ -304,7 +330,11 @@
}
- /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */
+ /**
+ * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order
+ *
+ * @throws Exception On error
+ */
public void testSend2ThenRollback() throws Exception
{
assertTrue("session is not transacted", _session.getTransacted());
@@ -339,37 +369,41 @@
public void testSend2ThenCloseAfter1andTryAgain() throws Exception
{
-// assertTrue("session is not transacted", _session.getTransacted());
-// assertTrue("session is not transacted", _pubSession.getTransacted());
-//
-// _logger.info("sending two test messages");
-// _publisher.send(_pubSession.createTextMessage("1"));
-// _publisher.send(_pubSession.createTextMessage("2"));
-// _pubSession.commit();
-//
-// _logger.info("getting test message");
-// assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
-//
-// _consumer.close();
-//
-// _consumer = _session.createConsumer(_jmsQueue);
-//
-// _logger.info("receiving result");
-// Message result = _consumer.receive(1000);
-// _logger.error("1:" + result);
-//// assertNotNull("test message was consumed and rolled back, but is gone", result);
-//// assertEquals("1" , ((TextMessage) result).getText());
-//// assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
-//
-// result = _consumer.receive(1000);
-// _logger.error("2" + result);
-//// assertNotNull("test message was consumed and rolled back, but is gone", result);
-//// assertEquals("2", ((TextMessage) result).getText());
-//// assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
-//
-// result = _consumer.receive(1000);
-// _logger.error("3" + result);
-// assertNull("test message should be null:" + result, result);
+ assertTrue("session is not transacted", _session.getTransacted());
+ assertTrue("session is not transacted", _pubSession.getTransacted());
+
+ _logger.info("sending two test messages");
+ _publisher.send(_pubSession.createTextMessage("1"));
+ _publisher.send(_pubSession.createTextMessage("2"));
+ _pubSession.commit();
+
+ _logger.info("getting test message");
+ Message result = _consumer.receive(1000);
+
+ assertNotNull("Message received should not be null", result);
+ assertEquals("1", ((TextMessage) result).getText());
+ assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
+
+
+ _logger.info("Closing Consumer");
+ _consumer.close();
+
+ _logger.info("Creating New consumer");
+ _consumer = _session.createConsumer(_jmsQueue);
+
+ _logger.info("receiving result");
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("1", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+ assertNotNull("test message was consumed and rolled back, but is gone", result);
+ assertEquals("2", ((TextMessage) result).getText());
+ assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+
+ result = _consumer.receive(1000);
+ assertNull("test message should be null:" + result, result);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Tue Mar 6 06:12:47 2007
@@ -62,69 +62,125 @@
{
super.setUp();
TransportConnection.createVMBroker(1);
+ _logger.info("Create Connection");
con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
+
+ _logger.info("Create Session");
session = con.createSession(true, Session.SESSION_TRANSACTED);
+ _logger.info("Create Q1");
queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
+ _logger.info("Create Q2");
queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false);
-
+ _logger.info("Create Consumer of Q1");
consumer1 = session.createConsumer(queue1);
- //Dummy just to create the queue.
+ //Dummy just to create the queue.
+ _logger.info("Create Consumer of Q2");
MessageConsumer consumer2 = session.createConsumer(queue2);
+ _logger.info("Close Consumer of Q2");
consumer2.close();
+
+ _logger.info("Create producer to Q2");
producer2 = session.createProducer(queue2);
+
+ _logger.info("Start Connection");
con.start();
+ _logger.info("Create prep connection");
prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test");
+
+ _logger.info("Create prep session");
prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+ _logger.info("Create prep producer to Q1");
prepProducer1 = prepSession.createProducer(queue1);
+
+ _logger.info("Create prep connection start");
prepCon.start();
- //add some messages
- prepProducer1.send(prepSession.createTextMessage("A"));
- prepProducer1.send(prepSession.createTextMessage("B"));
- prepProducer1.send(prepSession.createTextMessage("C"));
+ _logger.info("Create test connection");
testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
+ _logger.info("Create test session");
testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ _logger.info("Create test consumer of q2");
testConsumer2 = testSession.createConsumer(queue2);
-
}
protected void tearDown() throws Exception
{
+ _logger.info("Close connection");
con.close();
+ _logger.info("Close test connection");
testCon.close();
+ _logger.info("Close prep connection");
prepCon.close();
+ _logger.info("Kill broker");
TransportConnection.killAllVMBrokers();
super.tearDown();
}
public void testCommit() throws Exception
{
+ //add some messages
+ _logger.info("Send prep A");
+ prepProducer1.send(prepSession.createTextMessage("A"));
+ _logger.info("Send prep B");
+ prepProducer1.send(prepSession.createTextMessage("B"));
+ _logger.info("Send prep C");
+ prepProducer1.send(prepSession.createTextMessage("C"));
+
//send and receive some messages
+ _logger.info("Send X to Q2");
producer2.send(session.createTextMessage("X"));
+ _logger.info("Send Y to Q2");
producer2.send(session.createTextMessage("Y"));
+ _logger.info("Send Z to Q2");
producer2.send(session.createTextMessage("Z"));
+
+
+ _logger.info("Read A from Q1");
expect("A", consumer1.receive(1000));
+ _logger.info("Read B from Q1");
expect("B", consumer1.receive(1000));
+ _logger.info("Read C from Q1");
expect("C", consumer1.receive(1000));
//commit
+ _logger.info("session commit");
session.commit();
+ _logger.info("Start test Connection");
testCon.start();
+
//ensure sent messages can be received and received messages are gone
+ _logger.info("Read X from Q2");
expect("X", testConsumer2.receive(1000));
+ _logger.info("Read Y from Q2");
expect("Y", testConsumer2.receive(1000));
+ _logger.info("Read Z from Q2");
expect("Z", testConsumer2.receive(1000));
+ _logger.info("create test session on Q1");
testConsumer1 = testSession.createConsumer(queue1);
+ _logger.info("Read null from Q1");
assertTrue(null == testConsumer1.receive(1000));
+ _logger.info("Read null from Q2");
assertTrue(null == testConsumer2.receive(1000));
}
public void testRollback() throws Exception
{
+ //add some messages
+ _logger.info("Send prep A");
+ prepProducer1.send(prepSession.createTextMessage("A"));
+ _logger.info("Send prep B");
+ prepProducer1.send(prepSession.createTextMessage("B"));
+ _logger.info("Send prep C");
+ prepProducer1.send(prepSession.createTextMessage("C"));
+
+ //Quick sleep to ensure all three get pre-fetched
+ Thread.sleep(500);
+
_logger.info("Sending X Y Z");
producer2.send(session.createTextMessage("X"));
producer2.send(session.createTextMessage("Y"));
@@ -140,9 +196,9 @@
_logger.info("Receiving A B C");
//ensure sent messages are not visible and received messages are requeued
- expect("A", consumer1.receive(1000));
- expect("B", consumer1.receive(1000));
- expect("C", consumer1.receive(1000));
+ expect("A", consumer1.receive(1000), true);
+ expect("B", consumer1.receive(1000), true);
+ expect("C", consumer1.receive(1000), true);
_logger.info("Starting new connection");
testCon.start();
@@ -152,20 +208,22 @@
assertTrue(null == testConsumer2.receive(1000));
session.commit();
+
+ _logger.info("Testing we have no messages left after commit");
+ assertTrue(null == testConsumer1.receive(1000));
+ assertTrue(null == testConsumer2.receive(1000));
}
public void testResendsMsgsAfterSessionClose() throws Exception
{
AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
- Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
MessageConsumer consumer = consumerSession.createConsumer(queue3);
- //force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
- Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = producerSession.createProducer(queue3);
_logger.info("Sending four messages");
@@ -176,65 +234,77 @@
producerSession.commit();
-
_logger.info("Starting connection");
con.start();
TextMessage tm = (TextMessage) consumer.receive();
+ assertNotNull(tm);
+ assertEquals("msg1", tm.getText());
- tm.acknowledge();
consumerSession.commit();
- _logger.info("Received and acknowledged first message");
+ _logger.info("Received and committed first message");
tm = (TextMessage) consumer.receive(1000);
assertNotNull(tm);
+ assertEquals("msg2", tm.getText());
+
tm = (TextMessage) consumer.receive(1000);
assertNotNull(tm);
+ assertEquals("msg3", tm.getText());
+
tm = (TextMessage) consumer.receive(1000);
assertNotNull(tm);
+ assertEquals("msg4", tm.getText());
+
_logger.info("Received all four messages. Closing connection with three outstanding messages");
consumerSession.close();
- consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+ consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
consumer = consumerSession.createConsumer(queue3);
// no ack for last three messages so when I call recover I expect to get three messages back
-
tm = (TextMessage) consumer.receive(3000);
assertNotNull(tm);
assertEquals("msg2", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
tm = (TextMessage) consumer.receive(3000);
assertNotNull(tm);
assertEquals("msg3", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
tm = (TextMessage) consumer.receive(3000);
assertNotNull(tm);
assertEquals("msg4", tm.getText());
+ assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+
+ _logger.info("Received redelivery of three messages. Committing");
- _logger.info("Received redelivery of three messages. Acknowledging last message");
- tm.acknowledge();
consumerSession.commit();
- _logger.info("Calling acknowledge with no outstanding messages");
- // all acked so no messages to be delivered
+ _logger.info("Called commit");
- tm = (TextMessage) consumer.receiveNoWait();
+ tm = (TextMessage) consumer.receive(1000);
assertNull(tm);
+
_logger.info("No messages redelivered as is expected");
con.close();
con2.close();
-
}
-
private void expect(String text, Message msg) throws JMSException
{
+ expect(text, msg, false);
+ }
+
+ private void expect(String text, Message msg, boolean requeued) throws JMSException
+ {
assertNotNull("Message should not be null", msg);
assertTrue("Message should be a text message", msg instanceof TextMessage);
assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText());
+ assertEquals("Message should " + (requeued ? "" : "not") + " be requeued", requeued, msg.getJMSRedelivered());
}
public static junit.framework.Test suite()
Modified: incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java Tue Mar 6 06:12:47 2007
@@ -37,7 +37,7 @@
AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test");
AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
System.out.println("Session created");
- session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"));
+ session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"), true);
System.out.println("Exchange declared");
con.close();
System.out.println("Connection closed");
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Tue Mar 6 06:12:47 2007
@@ -100,7 +100,7 @@
public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true);
- public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true);
+ public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true);
public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java Tue Mar 6 06:12:47 2007
@@ -41,6 +41,11 @@
return super.size() + _messageHeadSize.get();
}
+ public int headSize()
+ {
+ return _messageHeadSize.get();
+ }
+
@Override
public E poll()
{
@@ -50,10 +55,14 @@
}
else
{
- _logger.debug("Providing item from message head");
-
E e = _messageHead.poll();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Providing item(" + e + ")from message head");
+ }
+
+
if (e != null)
{
_messageHeadSize.decrementAndGet();
@@ -159,8 +168,12 @@
}
else
{
- _logger.debug("Providing item from message head");
- return _messageHead.peek();
+ E o = _messageHead.peek();
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Peeking item (" + o + ") from message head");
+ }
+ return o;
}
}
@@ -186,7 +199,10 @@
public boolean pushHead(E o)
{
- _logger.debug("Adding item to head of queue");
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Adding item(" + o + ") to head of queue");
+ }
if (_messageHead.offer(o))
{
_messageHeadSize.incrementAndGet();
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Tue Mar 6 06:12:47 2007
@@ -67,7 +67,8 @@
MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
//force synch to ensure the consumer has resulted in a bound queue
- ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ // This is the default now
Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");