You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/05/09 10:48:21 UTC
svn commit: r536458 [1/2] - in /incubator/qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/messageStore/
broker/src/main/java/org/apache/qpid/server/queue/
broker/src/main/java/org/apache/qp...
Author: gsim
Date: Wed May 9 01:48:18 2007
New Revision: 536458
URL: http://svn.apache.org/viewvc?view=rev&rev=536458
Log:
Patch from Arnaud Simon (asimon@redhat.com) to fix tests broken by earlier changes.
Added:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java (with props)
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java (with props)
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java (with props)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Wed May 9 01:48:18 2007
@@ -228,10 +228,14 @@
try
{
queue.delete();
- _messageStore.destroyQueue(queue);
+ if( queue.isDurable() )
+ {
+ _messageStore.destroyQueue(queue);
+ }
}
catch (Exception ex)
{
+ ex.printStackTrace();
JMException jme = new JMException(ex.getMessage());
jme.initCause(ex);
throw new MBeanException(jme, "Error in deleting queue " + queueName);
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed May 9 01:48:18 2007
@@ -73,10 +73,14 @@
*/
private AtomicLong _deliveryTag = new AtomicLong(0);
- /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
+ /**
+ * A channel has a default queue (the last declared) that is used when no queue name is explictily set
+ */
private AMQQueue _defaultQueue;
- /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
+ /**
+ * This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request.
+ */
private int _consumerTag;
/**
@@ -86,7 +90,9 @@
*/
private AMQMessage _currentMessage;
- /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
+ /**
+ * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
+ */
private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>();
private final MessageStore _messageStore;
@@ -118,7 +124,8 @@
public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, MessageStore messageStore, MessageRouter exchanges)
- throws AMQException
+ throws
+ AMQException
{
_session = session;
_channelId = channelId;
@@ -132,7 +139,9 @@
_txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
}
- /** Sets this channel to be part of a local transaction */
+ /**
+ * Sets this channel to be part of a local transaction
+ */
public void setLocalTransactional()
{
_txnContext = new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages);
@@ -193,23 +202,25 @@
}
- public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
+ public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher)
+ throws
+ AMQException
{
_currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
- _txnContext);
+ _txnContext);
_currentMessage.setPublisher(publisher);
}
public void publishContentHeader(ContentHeaderBody contentHeaderBody)
- throws AMQException
+ throws
+ AMQException
{
if (_currentMessage == null)
{
throw new AMQException("Received content header without previously receiving a BasicPublish frame");
- }
- else
+ } else
{
if (_log.isTraceEnabled())
{
@@ -228,7 +239,8 @@
}
public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession)
- throws AMQException
+ throws
+ AMQException
{
if (_currentMessage == null)
{
@@ -261,7 +273,9 @@
}
}
- protected void routeCurrentMessage() throws AMQException
+ protected void routeCurrentMessage()
+ throws
+ AMQException
{
try
{
@@ -294,14 +308,15 @@
* @param exclusive Flag requesting exclusive access to the queue
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
- *
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
- *
* @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
+ FieldTable filters, boolean noLocal, boolean exclusive)
+ throws
+ AMQException,
+ ConsumerTagNotUniqueException
{
if (tag == null)
{
@@ -318,28 +333,11 @@
}
- public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
+ public void unsubscribeConsumer(AMQProtocolSession session, final AMQShortString consumerTag)
+ throws
+ AMQException
{
- if (_log.isDebugEnabled())
- {
- _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
- _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
- {
-
- public boolean callback(UnacknowledgedMessage message) throws AMQException
- {
- _log.debug(message);
-
- return true;
- }
-
- public void visitComplete()
- {
- }
- });
- }
-
- AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
+ final AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
if (q != null)
{
q.unregisterProtocolSession(session, _channelId, consumerTag);
@@ -350,25 +348,27 @@
* Called from the protocol session to close this channel and clean up. T
*
* @param session The session to close
- *
* @throws AMQException if there is an error during closure
*/
- public void close(AMQProtocolSession session) throws AMQException
+ public void close(AMQProtocolSession session)
+ throws
+ AMQException
{
_txnContext.rollback();
unsubscribeAllConsumers(session);
requeue();
}
- private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
+ private void unsubscribeAllConsumers(AMQProtocolSession session)
+ throws
+ AMQException
{
if (_log.isInfoEnabled())
{
if (!_consumerTag2QueueMap.isEmpty())
{
_log.info("Unsubscribing all consumers on channel " + toString());
- }
- else
+ } else
{
_log.info("No consumers to unsubscribe on channel " + toString());
}
@@ -400,13 +400,12 @@
if (queue == null)
{
_log.debug("Adding unacked message with a null queue:" + message.debugIdentity());
- }
- else
+ } else
{
if (_log.isDebugEnabled())
{
_log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag +
- ") with a queue(" + queue + ") for " + consumerTag);
+ ") with a queue(" + queue + ") for " + consumerTag);
}
}
}
@@ -431,7 +430,9 @@
*
* @throws org.apache.qpid.AMQException if the requeue fails
*/
- public void requeue() throws AMQException
+ public void requeue()
+ throws
+ AMQException
{
// we must create a new map since all the messages will get a new delivery tag when they are redelivered
Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
@@ -452,12 +453,11 @@
// if (_nonTransactedContext == null)
{
_nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
- }
- else
+ } else
{
deliveryContext = _txnContext;
}
@@ -489,10 +489,11 @@
* Requeue a single message
*
* @param deliveryTag The message to requeue
- *
* @throws AMQException If something goes wrong.
*/
- public void requeue(long deliveryTag) throws AMQException
+ public void requeue(long deliveryTag)
+ throws
+ AMQException
{
UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
@@ -516,12 +517,11 @@
// if (_nonTransactedContext == null)
{
_nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
- }
- else
+ } else
{
deliveryContext = _txnContext;
}
@@ -532,19 +532,17 @@
deliveryContext.deliver(unacked.message, unacked.queue, true);
//Deliver increments the message count but we have already deliverted this once so don't increment it again
// this was because deliver did an increment changed this.
- }
- else
+ } else
{
_log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag +
- " but no queue defined and no DeadLetter queue so DROPPING message.");
+ " but no queue defined and no DeadLetter queue so DROPPING message.");
// _log.error("Requested requeue of message:" + deliveryTag +
// " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
//
// deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
//
}
- }
- else
+ } else
{
_log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size());
@@ -554,10 +552,12 @@
{
int count = 0;
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ public boolean callback(UnacknowledgedMessage message)
+ throws
+ AMQException
{
_log.debug((count++) + ": (" + message.message.debugIdentity() + ")" +
- "[" + message.deliveryTag + "]");
+ "[" + message.deliveryTag + "]");
return false; // Continue
}
@@ -577,10 +577,11 @@
* Called to resend all outstanding unacknowledged messages to this same channel.
*
* @param requeue Are the messages to be requeued or dropped.
- *
* @throws AMQException When something goes wrong.
*/
- public void resend(final boolean requeue) throws AMQException
+ public void resend(final boolean requeue)
+ throws
+ AMQException
{
final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
@@ -595,7 +596,9 @@
// and those that don't to be requeued.
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ public boolean callback(UnacknowledgedMessage message)
+ throws
+ AMQException
{
AMQShortString consumerTag = message.consumerTag;
AMQMessage msg = message.message;
@@ -606,13 +609,11 @@
if (_consumerTag2QueueMap.containsKey(consumerTag))
{
msgToResend.add(message);
- }
- else // consumer has gone
+ } else // consumer has gone
{
msgToRequeue.add(message);
}
- }
- else
+ } else
{
// Message has no consumer tag, so was "delivered" to a GET
// or consumer no longer registered
@@ -622,13 +623,11 @@
if (requeue)
{
msgToRequeue.add(message);
- }
- else
+ } else
{
_log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
}
- }
- else
+ } else
{
_log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
}
@@ -649,8 +648,7 @@
if (!msgToResend.isEmpty())
{
_log.info("Preparing (" + msgToResend.size() + ") message to resend.");
- }
- else
+ } else
{
_log.info("No message to resend.");
}
@@ -699,8 +697,7 @@
}
//move this message to requeue
msgToRequeue.add(message);
- }
- else
+ } else
{
if (_log.isDebugEnabled())
{
@@ -710,8 +707,7 @@
_unacknowledgedMessageMap.remove(message.deliveryTag);
}
} // sync(sub.getSendLock)
- }
- else
+ } else
{
if (_log.isInfoEnabled())
@@ -740,12 +736,11 @@
if (_nonTransactedContext == null)
{
_nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
- _returnMessages, _browsedAcks);
+ _returnMessages, _browsedAcks);
}
deliveryContext = _nonTransactedContext;
- }
- else
+ } else
{
deliveryContext = _txnContext;
}
@@ -768,14 +763,17 @@
* since we may get an ack for a delivery tag that was generated from the deleted queue.
*
* @param queue the queue that has been deleted
- *
* @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
*/
- public void queueDeleted(final AMQQueue queue) throws AMQException
+ public void queueDeleted(final AMQQueue queue)
+ throws
+ AMQException
{
_unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
{
- public boolean callback(UnacknowledgedMessage message) throws AMQException
+ public boolean callback(UnacknowledgedMessage message)
+ throws
+ AMQException
{
if (message.queue == queue)
{
@@ -787,7 +785,7 @@
catch (AMQException e)
{
_log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " +
- e, e);
+ e, e);
}
}
return false;
@@ -805,10 +803,11 @@
* @param deliveryTag the last delivery tag
* @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only
* acknowledges the single message specified by the delivery tag
- *
* @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
+ public void acknowledgeMessage(long deliveryTag, boolean multiple)
+ throws
+ AMQException
{
synchronized (_unacknowledgedMessageMap.getLock())
{
@@ -842,7 +841,7 @@
boolean suspend;
suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
- || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+ || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
setSuspended(suspend);
}
@@ -868,8 +867,7 @@
{
q.deliverAsync();
}
- }
- else
+ } else
{
_log.debug("Suspending channel " + this);
}
@@ -881,16 +879,20 @@
return _suspended.get();
}
- public void commit() throws AMQException
+ public void commit()
+ throws
+ AMQException
{
if (!isTransactional())
{
throw new AMQException("Fatal error: commit called on non-transactional channel");
}
- _txnContext.commit();
+ _txnContext.commit();
}
- public void rollback() throws AMQException
+ public void rollback()
+ throws
+ AMQException
{
_txnContext.rollback();
}
@@ -919,14 +921,16 @@
return _storeContext;
}
- public void processReturns(AMQProtocolSession session) throws AMQException
+ public void processReturns(AMQProtocolSession session)
+ throws
+ AMQException
{
for (RequiredDeliveryException bouncedMessage : _returnMessages)
{
AMQMessage message = bouncedMessage.getAMQMessage();
session.getProtocolOutputConverter().writeReturn(message, _channelId,
- bouncedMessage.getReplyCode().getCode(),
- new AMQShortString(bouncedMessage.getMessage()));
+ bouncedMessage.getReplyCode().getCode(),
+ new AMQShortString(bouncedMessage.getMessage()));
}
_returnMessages.clear();
}
@@ -937,8 +941,7 @@
if (isSuspended())
{
return true;
- }
- else
+ } else
{
boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
if (!willSuspend)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java Wed May 9 01:48:18 2007
@@ -19,49 +19,75 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.txn.TransactionRecord;
+import org.apache.qpid.server.txn.MemoryEnqueueRecord;
+import org.apache.qpid.server.txn.MemoryDequeueRecord;
import org.apache.qpid.server.exception.*;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
import javax.transaction.xa.Xid;
-import java.util.Collection;
+import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
/**
+ * This a simple in-memory implementation of a message store i.e. nothing is persisted
+ * <p/>
* Created by Arnaud Simon
* Date: 26-Apr-2007
* Time: 08:23:45
*/
-public class MemoryMessageStore implements MessageStore
+public class MemoryMessageStore implements MessageStore
{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
+
+ // The table of message with its corresponding stream containing the message body
+ private Map<StorableMessage, ByteArrayOutputStream> _stagedMessages;
+ // The queue/messages association
+ private Map<StorableQueue, List<StorableMessage>> _queueMap;
+ // the message ID
+ private long _messageID = 0;
+ // The transaction manager
+ private TransactionManager _txm;
+
+ //========================================================================
+ // Interface MessageStore
+ //========================================================================
public void removeExchange(Exchange exchange)
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
- throws
- InternalErrorException
+ throws
+ InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void createExchange(Exchange exchange)
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // do nothing this is inmemory
}
public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
@@ -69,14 +95,21 @@
InternalErrorException,
IllegalArgumentException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _log.info("Configuring memory message store");
+ // Initialise the maps
+ _stagedMessages = new HashMap<StorableMessage, ByteArrayOutputStream>();
+ _queueMap = new HashMap<StorableQueue, List<StorableMessage>>();
+ _txm = tm;
+ _txm.configure(this, "txn", config);
}
public void close()
throws
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ _log.info("Closing memory message store");
+ _stagedMessages.clear();
+ _queueMap.clear();
}
public void createQueue(StorableQueue queue)
@@ -84,7 +117,12 @@
InternalErrorException,
QueueAlreadyExistsException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (_queueMap.containsKey(queue))
+ {
+ throw new QueueAlreadyExistsException("queue " + queue + " already exists");
+ }
+ // add this queue into the map
+ _queueMap.put(queue, new LinkedList<StorableMessage>());
}
public void destroyQueue(StorableQueue queue)
@@ -92,7 +130,12 @@
InternalErrorException,
QueueDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (!_queueMap.containsKey(queue))
+ {
+ throw new QueueDoesntExistException("queue " + queue + " does not exist");
+ }
+ // remove this queue from the map
+ _queueMap.remove(queue);
}
public void stage(StorableMessage m)
@@ -100,7 +143,12 @@
InternalErrorException,
MessageAlreadyStagedException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (_stagedMessages.containsKey(m))
+ {
+ throw new MessageAlreadyStagedException("message " + m + " already staged");
+ }
+ _stagedMessages.put(m, new ByteArrayOutputStream());
+ m.staged();
}
public void appendContent(StorableMessage m, byte[] data, int offset, int size)
@@ -108,7 +156,11 @@
InternalErrorException,
MessageDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (!_stagedMessages.containsKey(m))
+ {
+ throw new MessageDoesntExistException("message " + m + " has not been staged");
+ }
+ _stagedMessages.get(m).write(data, offset, size);
}
public byte[] loadContent(StorableMessage m, int offset, int size)
@@ -116,7 +168,15 @@
InternalErrorException,
MessageDoesntExistException
{
- return new byte[0]; //To change body of implemented methods use File | Settings | File Templates.
+ if (!_stagedMessages.containsKey(m))
+ {
+ throw new MessageDoesntExistException("message " + m + " has not been staged");
+ }
+ byte[] result = new byte[size];
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ buf.put(_stagedMessages.get(m).toByteArray(), offset, size);
+ buf.get(result);
+ return result;
}
public void destroy(StorableMessage m)
@@ -124,7 +184,11 @@
InternalErrorException,
MessageDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (!_stagedMessages.containsKey(m))
+ {
+ throw new MessageDoesntExistException("message " + m + " has not been staged");
+ }
+ _stagedMessages.remove(m);
}
public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
@@ -135,7 +199,31 @@
UnknownXidException,
MessageDoesntExistException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (xid != null)
+ {
+ // this is a tx operation
+ TransactionRecord enqueueRecord = new MemoryEnqueueRecord(m, queue);
+ _txm.getTransaction(xid).addRecord(enqueueRecord);
+ } else
+ {
+ if (!_stagedMessages.containsKey(m))
+ {
+ try
+ {
+ stage(m);
+ } catch (MessageAlreadyStagedException e)
+ {
+ throw new InternalErrorException(e);
+ }
+ appendContent(m, m.getData(), 0, m.getPayloadSize());
+ }
+ if (!_queueMap.containsKey(queue))
+ {
+ throw new QueueDoesntExistException("queue " + queue + " dos not exist");
+ }
+ _queueMap.get(queue).add(m);
+ m.enqueue(queue);
+ }
}
public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
@@ -145,25 +233,43 @@
InvalidXidException,
UnknownXidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ if (xid != null)
+ {
+ // this is a tx operation
+ TransactionRecord dequeueRecord = new MemoryDequeueRecord(m, queue);
+ _txm.getTransaction(xid).addRecord(dequeueRecord);
+ } else
+ {
+ if (!_queueMap.containsKey(queue))
+ {
+ throw new QueueDoesntExistException("queue " + queue + " dos not exist");
+ }
+ m.dequeue(queue);
+ _queueMap.get(queue).remove(m);
+ if (!m.isEnqueued())
+ {
+ // we can delete this message
+ _stagedMessages.remove(m);
+ }
+ }
}
public Collection<StorableQueue> getAllQueues()
throws
InternalErrorException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _queueMap.keySet();
}
public Collection<StorableMessage> getAllMessages(StorableQueue queue)
throws
InternalErrorException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _queueMap.get(queue);
}
public long getNewMessageId()
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return _messageID++;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed May 9 01:48:18 2007
@@ -432,10 +432,10 @@
// enqueuing the messages ensure that if required the destinations are recorded to a
// persistent store
- for (AMQQueue q : _transientMessageData.getDestinationQueues())
- {
- _messageHandle.enqueue(storeContext, _messageId, q);
- }
+ // for (AMQQueue q : _transientMessageData.getDestinationQueues())
+ // {
+ // _messageHandle.enqueue(storeContext, _messageId, q);
+ // }
if (_transientMessageData.getContentHeaderBody().bodySize == 0)
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java Wed May 9 01:48:18 2007
@@ -169,7 +169,10 @@
{
try
{
- _messageStore.enqueue((Xid) storeContext.getPayload(), _message, queue);
+ if (queue.isDurable())
+ {
+ _messageStore.enqueue((Xid) storeContext.getPayload(), _message, queue);
+ }
} catch (Exception e)
{
throw new AMQException("PRoblem during message enqueue", e);
@@ -182,7 +185,10 @@
{
try
{
- _messageStore.dequeue((Xid) storeContext.getPayload(), _message, queue);
+ if (queue.isDurable())
+ {
+ _messageStore.dequeue((Xid) storeContext.getPayload(), _message, queue);
+ }
} catch (Exception e)
{
throw new AMQException("PRoblem during message dequeue", e);
@@ -199,8 +205,8 @@
if (_payload == null)
{
int bodySize = (int) _contentHeaderBody.bodySize;
- _buffer = ByteBuffer.allocate(bodySize);
_payload = new byte[bodySize];
+ _buffer = ByteBuffer.wrap(_payload);
for (ContentChunk contentBody : _chunks)
{
int chunkSize = contentBody.getSize();
@@ -208,7 +214,6 @@
contentBody.getData().get(chunk);
_buffer.put(chunk);
}
- _buffer.get(_payload);
}
return _payload;
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java Wed May 9 01:48:18 2007
@@ -39,7 +39,7 @@
UnknownXidException,
MessageDoesntExistException
{
- // do nothing
+ // nothing
}
public void rollback(MessageStore store)
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java Wed May 9 01:48:18 2007
@@ -46,7 +46,8 @@
//========================================================================
// The logger for this class
private static final Logger _log = Logger.getLogger(DistributedTransactionalContext.class);
-
+ private static final Object _lockXID = new Object();
+ private static int _count = 0;
//========================================================================
// Instance Fields
//========================================================================
@@ -60,7 +61,6 @@
final private List<RequiredDeliveryException> _returnMessages;
// for generating xids
private byte[] _txId = ("txid").getBytes();
- private int _count = 0;
public DistributedTransactionalContext(TransactionManager transactionManager, MessageStore messageStore, StoreContext storeContext,
List<RequiredDeliveryException> returnMessages)
@@ -75,15 +75,21 @@
throws
AMQException
{
- // begin the transaction and pass the XID through the context
- Xid xid = new XidImpl(("branch" + _count++).getBytes(), 1, _txId);
- try
- {
- _transactionManager.begin(xid);
- _storeContext.setPayload(xid);
- } catch (Exception e)
+ if (_storeContext.getPayload() == null)
{
- throw new AMQException("Problem during transaction begin", e);
+ synchronized (_lockXID)
+ {
+ // begin the transaction and pass the XID through the context
+ Xid xid = new XidImpl(("branch" + _count++).getBytes(), 1, _txId);
+ try
+ {
+ _transactionManager.begin(xid);
+ _storeContext.setPayload(xid);
+ } catch (Exception e)
+ {
+ throw new AMQException("Problem during transaction begin", e);
+ }
+ }
}
}
@@ -93,11 +99,18 @@
{
try
{
- _transactionManager.commit_one_phase((Xid) _storeContext.getPayload());
+ if (_storeContext.getPayload() != null)
+ {
+ _transactionManager.commit_one_phase((Xid) _storeContext.getPayload());
+ }
} catch (Exception e)
{
throw new AMQException("Problem during transaction commit", e);
}
+ finally
+ {
+ _storeContext.setPayload(null);
+ }
}
public void rollback()
@@ -106,11 +119,18 @@
{
try
{
- _transactionManager.rollback((Xid) _storeContext.getPayload());
+ if (_storeContext.getPayload() != null)
+ {
+ _transactionManager.rollback((Xid) _storeContext.getPayload());
+ }
} catch (Exception e)
{
throw new AMQException("Problem during transaction rollback", e);
}
+ finally
+ {
+ _storeContext.setPayload(null);
+ }
}
public void messageFullyReceived(boolean persistent)
@@ -142,6 +162,7 @@
throws
AMQException
{
+ beginTranIfNecessary();
if (multiple)
{
if (deliveryTag == 0)
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java?view=auto&rev=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java Wed May 9 01:48:18 2007
@@ -0,0 +1,82 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.exception.*;
+import org.apache.log4j.Logger;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-May-2007
+ * Time: 13:59:47
+ */
+public class MemoryDequeueRecord implements TransactionRecord
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(MemoryDequeueRecord.class);
+ // the queue
+ StorableQueue _queue;
+ // the message
+ StorableMessage _message;
+
+ //========================================================================
+ // Constructor
+ //========================================================================
+ public MemoryDequeueRecord( StorableMessage m, StorableQueue queue)
+ {
+ _queue = queue;
+ _message = m;
+ }
+
+ //========================================================================
+ // Interface TransactionRecord
+ //========================================================================
+
+ public void commit(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ store.dequeue(null, _message, _queue);
+ }
+
+ public void rollback(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ // do nothing
+ }
+
+ public void prepare(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ // do nothing
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java?view=auto&rev=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java Wed May 9 01:48:18 2007
@@ -0,0 +1,82 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.exception.*;
+import org.apache.log4j.Logger;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-May-2007
+ * Time: 14:00:04
+ */
+public class MemoryEnqueueRecord implements TransactionRecord
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(MemoryDequeueRecord.class);
+
+ // the queue
+ StorableQueue _queue;
+ // the message
+ StorableMessage _message;
+
+ //========================================================================
+ // Constructor
+ //========================================================================
+ public MemoryEnqueueRecord(StorableMessage m, StorableQueue queue)
+ {
+ _queue = queue;
+ _message = m;
+ }
+ //========================================================================
+ // Interface TransactionRecord
+ //========================================================================
+
+ public void commit(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ store.enqueue(null, _message, _queue);
+ }
+
+ public void rollback(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ // do nothing
+ }
+
+ public void prepare(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ // do nothing
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java?view=auto&rev=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java Wed May 9 01:48:18 2007
@@ -0,0 +1,158 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-May-2007
+ * Time: 14:30:41
+ */
+public class MemoryTransaction implements Transaction
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(MemoryTransaction.class);
+
+ //========================================================================
+ // Instance Fields
+ //========================================================================
+ // Indicates whether this transaction is prepared
+ private boolean _prepared = false;
+ // Indicates that this transaction has heuristically rolled back
+ private boolean _heurRollBack = false;
+ // The list of Abstract records associated with this tx
+ private List<TransactionRecord> _records = new LinkedList<TransactionRecord>();
+ // The date when this tx has been created.
+ private long _dateCreated;
+ // The timeout in seconds
+ private long _timeout;
+
+ //=========================================================
+ // Constructors
+ //=========================================================
+ /**
+ * Create a transaction that wraps a BDB tx and set the creation date.
+ *
+ */
+ public MemoryTransaction()
+ {
+ _dateCreated = System.currentTimeMillis();
+ }
+
+ //=========================================================
+ // Getter and Setter methods
+ //=========================================================
+ /**
+ * Notify that this tx has been prepared
+ */
+ public void prepare()
+ {
+ _prepared = true;
+ }
+
+ /**
+ * Specify whether this transaction is prepared
+ *
+ * @return true if this transaction is prepared, false otherwise
+ */
+ public boolean isPrepared()
+ {
+ return _prepared;
+ }
+
+ /**
+ * Notify that this tx has been heuristically rolled back
+ */
+ public void heurRollback()
+ {
+ _heurRollBack = true;
+ }
+
+ /**
+ * Specify whether this transaction has been heuristically rolled back
+ *
+ * @return true if this transaction has been heuristically rolled back , false otherwise
+ */
+ public boolean isHeurRollback()
+ {
+ return _heurRollBack;
+ }
+
+ /**
+ * Add an abstract record to this tx.
+ *
+ * @param record The record to be added
+ */
+ public void addRecord(TransactionRecord record)
+ {
+ _records.add(record);
+ }
+
+ /**
+ * Get the list of records associated with this tx.
+ *
+ * @return The list of records associated with this tx.
+ */
+ public List<TransactionRecord> getrecords()
+ {
+ return _records;
+ }
+
+ /**
+ * Set this tx timeout
+ *
+ * @param timeout This tx timeout in seconds
+ */
+ public void setTimeout(long timeout)
+ {
+ _timeout = timeout;
+ }
+
+ /**
+ * Get this tx timeout
+ *
+ * @return This tx timeout in seconds
+ */
+ public long getTimeout()
+ {
+ return _timeout;
+ }
+
+ /**
+ * Specify whether this tx has expired
+ *
+ * @return true if this tx has expired, false otherwise
+ */
+ public boolean hasExpired()
+ {
+ long currentDate = System.currentTimeMillis();
+ boolean result = currentDate - _dateCreated > _timeout * 1000;
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("transaction has expired");
+ }
+ return result;
+ }
+ }
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java Wed May 9 01:48:18 2007
@@ -18,9 +18,13 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
import javax.transaction.xa.Xid;
import java.util.Set;
+import java.util.HashMap;
/**
* Created by Arnaud Simon
@@ -29,13 +33,65 @@
*/
public class MemoryTransactionManager implements TransactionManager
{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(MemoryTransactionManager.class);
+
+ private static final String ENVIRONMENT_TX_TIMEOUT = "environment-tx-timeout";
+
+ //========================================================================
+ // Instance Fields
+ //========================================================================
+ // The underlying BDB message store
+ private MessageStore _messagStore;
+ // A map of XID/BDBtx
+ private HashMap<Xid, Transaction> _xidMap;
+ // A map of in-doubt txs
+ private HashMap<Xid, MemoryTransaction> _indoubtXidMap;
+
+ // A default tx timeout in sec
+ private int _defaultTimeout; // set to 10s if not specified in the config
+
+ //========================================================================
+ // Interface TransactionManager
+ //========================================================================
+ public void configure(MessageStore messageStroe, String base, Configuration config)
+ {
+ _messagStore = messageStroe;
+ if (config != null)
+ {
+ _defaultTimeout = config.getInt(base + "." + ENVIRONMENT_TX_TIMEOUT, 10);
+ } else
+ {
+ _defaultTimeout = 10;
+ }
+ _log.info("Using transaction timeout of " + _defaultTimeout + " s");
+ _xidMap = new HashMap<Xid, Transaction>();
+ _indoubtXidMap = new HashMap<Xid, MemoryTransaction>();
+ }
public XAFlag begin(Xid xid)
throws
InternalErrorException,
InvalidXidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ if (xid == null)
+ {
+ throw new InvalidXidException(xid, "null xid");
+ }
+ if (_xidMap.containsKey(xid))
+ {
+ throw new InvalidXidException(xid, "Xid already exist");
+ }
+ MemoryTransaction tx = new MemoryTransaction();
+ tx.setTimeout(_defaultTimeout);
+ _xidMap.put(xid, tx);
+ return XAFlag.ok;
+ }
}
public XAFlag prepare(Xid xid)
@@ -44,7 +100,35 @@
CommandInvalidException,
UnknownXidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ // get the transaction
+ MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+ XAFlag result = XAFlag.ok;
+ if (tx.hasExpired())
+ {
+ result = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ if (tx.isPrepared())
+ {
+ throw new CommandInvalidException("TransactionImpl is already prepared");
+ }
+ if (tx.getrecords().size() == 0)
+ {
+ // the tx was read only (no work has been done)
+ _xidMap.remove(xid);
+ result = XAFlag.rdonly;
+ } else
+ {
+ // we need to persist the tx records
+ tx.prepare();
+ }
+ }
+ return result;
+ }
}
public XAFlag rollback(Xid xid)
@@ -53,7 +137,28 @@
CommandInvalidException,
UnknownXidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ // get the transaction
+ MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+ XAFlag flag = XAFlag.ok;
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else
+ {
+ for (TransactionRecord record : tx.getrecords())
+ {
+ record.rollback(_messagStore);
+ }
+ _xidMap.remove(xid);
+ }
+ if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ }
+ return flag;
+ }
}
public XAFlag commit(Xid xid)
@@ -63,7 +168,44 @@
UnknownXidException,
NotPreparedException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ // get the transaction
+ MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+ XAFlag flag = XAFlag.ok;
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ if (!tx.isPrepared())
+ {
+ throw new NotPreparedException("TransactionImpl is not prepared");
+ }
+ for (TransactionRecord record : tx.getrecords())
+ {
+ try
+ {
+ record.commit(_messagStore, xid);
+ } catch (InvalidXidException e)
+ {
+ throw new UnknownXidException(xid, e);
+ } catch (Exception e)
+ {
+ // this should not happen as the queue and the message must exist
+ _log.error("Error when committing distributed transaction heurmix mode returned: " + xid);
+ flag = XAFlag.heurmix;
+ }
+ }
+ _xidMap.remove(xid);
+ }
+ return flag;
+ }
}
public XAFlag commit_one_phase(Xid xid)
@@ -72,7 +214,47 @@
CommandInvalidException,
UnknownXidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ XAFlag flag = XAFlag.ok;
+ MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ // we need to prepare the tx
+ tx.prepare();
+ try
+ {
+ for (TransactionRecord record : tx.getrecords())
+ {
+ try
+ {
+ record.commit(_messagStore, xid);
+ } catch (InvalidXidException e)
+ {
+ throw new UnknownXidException(xid, e);
+ } catch (Exception e)
+ {
+ // this should not happen as the queue and the message must exist
+ _log.error("Error when committing transaction heurmix mode returned: " + xid);
+ flag = XAFlag.heurmix;
+ }
+ }
+ }
+ finally
+ {
+ _xidMap.remove(xid);
+ }
+ }
+ return flag;
+ }
}
public void forget(Xid xid)
@@ -81,7 +263,10 @@
CommandInvalidException,
UnknownXidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ _xidMap.remove(xid);
+ }
}
public void setTimeout(Xid xid, long timeout)
@@ -89,7 +274,8 @@
InternalErrorException,
UnknownXidException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ Transaction tx = getTransaction(xid);
+ tx.setTimeout(timeout);
}
public long getTimeout(Xid xid)
@@ -97,7 +283,8 @@
InternalErrorException,
UnknownXidException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ Transaction tx = getTransaction(xid);
+ return tx.getTimeout();
}
public Set<Xid> recover(boolean startscan, boolean endscan)
@@ -105,7 +292,7 @@
InternalErrorException,
CommandInvalidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _indoubtXidMap.keySet();
}
public void HeuristicOutcome(Xid xid)
@@ -113,13 +300,32 @@
UnknownXidException,
InternalErrorException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ synchronized (xid)
+ {
+ MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+ if (!tx.isPrepared())
+ {
+ // heuristically rollback this tx
+ for (TransactionRecord record : tx.getrecords())
+ {
+ record.rollback(_messagStore);
+ }
+ tx.heurRollback();
+ }
+ // add this branch in the list of indoubt tx
+ _indoubtXidMap.put(xid, tx);
+ }
}
public Transaction getTransaction(Xid xid)
throws
UnknownXidException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ Transaction tx = _xidMap.get(xid);
+ if (tx == null)
+ {
+ throw new UnknownXidException(xid);
+ }
+ return tx;
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Wed May 9 01:48:18 2007
@@ -93,6 +93,7 @@
{
try
{
+ message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
queue.process(_storeContext, message, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java Wed May 9 01:48:18 2007
@@ -26,10 +26,24 @@
public interface Transaction
{
- /**
+ /**
* Add an abstract record to this tx.
*
* @param record The record to be added
*/
public void addRecord(TransactionRecord record);
+
+ /**
+ * Set this tx timeout
+ *
+ * @param timeout This tx timeout in seconds
+ */
+ public void setTimeout(long timeout);
+
+ /**
+ * Get this tx timeout
+ *
+ * @return This tx timeout in seconds
+ */
+ public long getTimeout();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java Wed May 9 01:48:18 2007
@@ -19,6 +19,8 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.commons.configuration.Configuration;
import javax.transaction.xa.Xid;
import java.util.Set;
@@ -30,6 +32,17 @@
*/
public interface TransactionManager
{
+
+ /**
+ * Configure this TM with the Message store implementation
+ *
+ * @param base The base element identifier from which all configuration items are relative. For example, if the base
+ * element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object
+ * @param messageStroe the message store associated with the TM
+ */
+ public void configure(MessageStore messageStroe, String base, Configuration config);
+
/**
* Begin a transaction branch identified by Xid
*
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Wed May 9 01:48:18 2007
@@ -0,0 +1,230 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.messageStore.MemoryMessageStore;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+
+import java.util.*;
+
+public class TxAckTest extends TestCase
+{
+ private Scenario individual;
+ private Scenario multiple;
+ private Scenario combined;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ //ack only 5th msg
+ individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
+ individual.update(5, false);
+
+ //ack all up to and including 5th msg
+ multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
+ multiple.update(5, true);
+
+ //leave only 8th and 9th unacked
+ combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
+ combined.update(3, false);
+ combined.update(5, true);
+ combined.update(7, true);
+ combined.update(2, true);//should be ignored
+ combined.update(1, false);//should be ignored
+ combined.update(10, false);
+ }
+
+ public void testPrepare() throws AMQException
+ {
+ individual.prepare();
+ multiple.prepare();
+ combined.prepare();
+ }
+
+ public void testUndoPrepare() throws AMQException
+ {
+ individual.undoPrepare();
+ multiple.undoPrepare();
+ combined.undoPrepare();
+ }
+
+ public void testCommit() throws AMQException
+ {
+ individual.commit();
+ multiple.commit();
+ combined.commit();
+ }
+
+ public static junit.framework.Test suite()
+ {
+ return new junit.framework.TestSuite(TxAckTest.class);
+ }
+
+ private class Scenario
+ {
+ private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000);
+ private final TxAck _op = new TxAck(_map);
+ private final List<Long> _acked;
+ private final List<Long> _unacked;
+ private StoreContext _storeContext = new StoreContext();
+
+ Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+ {
+ TransactionalContext txnContext = new NonTransactionalContext(new MemoryMessageStore(),
+ _storeContext, null,
+ new LinkedList<RequiredDeliveryException>(),
+ new HashSet<Long>());
+ for (int i = 0; i < messageCount; i++)
+ {
+ long deliveryTag = i + 1;
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
+ _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
+ }
+ _acked = acked;
+ _unacked = unacked;
+ }
+
+ void update(long deliverytag, boolean multiple)
+ {
+ _op.update(deliverytag, multiple);
+ }
+
+ private void assertCount(List<Long> tags, int expected)
+ {
+ for (long tag : tags)
+ {
+ UnacknowledgedMessage u = _map.get(tag);
+ assertTrue("Message not found for tag " + tag, u != null);
+ ((TestMessage) u.message).assertCountEquals(expected);
+ }
+ }
+
+ void prepare() throws AMQException
+ {
+ _op.consolidate();
+ _op.prepare(_storeContext);
+
+ assertCount(_acked, -1);
+ assertCount(_unacked, 0);
+
+ }
+
+ void undoPrepare()
+ {
+ _op.consolidate();
+ _op.undoPrepare();
+
+ assertCount(_acked, 0);
+ assertCount(_unacked, 0);
+ }
+
+ void commit()
+ {
+ _op.consolidate();
+ _op.commit(_storeContext);
+
+ //check acked messages are removed from map
+ Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags());
+ keys.retainAll(_acked);
+ assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
+ //check unacked messages are still in map
+ keys = new HashSet<Long>(_unacked);
+ keys.removeAll(_map.getDeliveryTags());
+ assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
+ }
+ }
+
+ private class TestMessage extends AMQMessage
+ {
+ private final long _tag;
+ private int _count;
+
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext)
+ {
+ super(messageId, publishBody, txnContext);
+ try
+ {
+ setContentHeaderBody(new ContentHeaderBody()
+ {
+ public int getSize()
+ {
+ return 1;
+ }
+ });
+ }
+ catch (AMQException e)
+ {
+ // won't happen
+ }
+ _tag = tag;
+ }
+
+ public void incrementReference()
+ {
+ _count++;
+ }
+
+ public void decrementReference(StoreContext context)
+ {
+ _count--;
+ }
+
+ void assertCountEquals(int expected)
+ {
+ assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+ }
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java Wed May 9 01:48:18 2007
@@ -0,0 +1,75 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.TestCase;
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.management.JMException;
+
+/** Test class to test MBean operations for AMQMinaProtocolSession. */
+public class MaxChannelsTest extends TestCase
+{
+// private MessageStore _messageStore = new SkeletonMessageStore();
+
+ public void testChannels() throws Exception
+ {
+ IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+ AMQMinaProtocolSession _protocolSession = new AMQMinaProtocolSession(new MockIoSession(),
+ appRegistry.getVirtualHostRegistry(),
+ new AMQCodecFactory(true),
+ null);
+ _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
+
+ // check the channel count is correct
+ int channelCount = _protocolSession.getChannels().size();
+ assertEquals("Initial channel count wrong", 0, channelCount);
+
+ long maxChannels = 10L;
+ _protocolSession.setMaximumNumberOfChannels(maxChannels);
+ assertEquals("Number of channels not correctly set.", new Long(maxChannels), _protocolSession.getMaximumNumberOfChannels());
+
+
+ try
+ {
+ for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
+ {
+ _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null, null));
+ }
+ }
+ catch (AMQException e)
+ {
+ assertEquals("Wrong exception recevied.", e.getErrorCode(), AMQConstant.NOT_ALLOWED);
+ }
+ assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_protocolSession.getChannels().size()));
+ }
+
+}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Wed May 9 01:48:18 2007
@@ -3,7 +3,6 @@
import junit.framework.TestCase;
import org.apache.log4j.Logger;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.client.*;
import org.apache.qpid.client.transport.TransportConnection;
@@ -269,7 +268,14 @@
public void onException(JMSException jmsException)
{
- Exception linkedException = jmsException.getLinkedException();
+ Exception linkedException = null;
+ try
+ {
+ linkedException = jmsException.getLinkedException();
+ } catch (Exception e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
if (linkedException instanceof AMQNoRouteException)
{
AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java Wed May 9 01:48:18 2007
@@ -8,7 +8,6 @@
import org.apache.log4j.Logger;
import javax.jms.JMSException;
-import javax.jms.DeliveryMode;
import java.io.IOException;
@@ -17,7 +16,7 @@
{
private static final Logger _logger = Logger.getLogger(HeapExhaustion.class);
- protected QpidClientConnection conn;
+ protected QpidClientConnection conn;
protected final String BROKER = "localhost";
protected final String vhost = "/test";
protected final String queue = "direct://amq.direct//queue";
@@ -35,7 +34,13 @@
conn = new QpidClientConnection(BROKER);
conn.setVirtualHost(vhost);
- conn.connect();
+ try
+ {
+ conn.connect();
+ } catch (JMSException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
// clear queue
_logger.debug("setup: clearing test queue");
conn.consume(queue, 2000);
@@ -55,7 +60,7 @@
*
* @throws Exception on error
*/
- public void testUntilFailureTransient() throws Exception
+ public void testUntilFailure() throws Exception
{
int copies = 0;
int total = 0;
@@ -63,7 +68,7 @@
int size = payload.getBytes().length;
while (true)
{
- conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
+ conn.put(queue, payload, 1);
copies++;
total += size;
System.out.println("put copy " + copies + " OK for total bytes: " + total);
@@ -75,7 +80,7 @@
*
* @throws Exception on error
*/
- public void testUntilFailureWithDelaysTransient() throws Exception
+ public void testUntilFailureWithDelays() throws Exception
{
int copies = 0;
int total = 0;
@@ -83,7 +88,7 @@
int size = payload.getBytes().length;
while (true)
{
- conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
+ conn.put(queue, payload, 1);
copies++;
total += size;
System.out.println("put copy " + copies + " OK for total bytes: " + total);
@@ -110,7 +115,7 @@
_logger.info("Running testUntilFailure");
try
{
- he.testUntilFailureTransient();
+ he.testUntilFailure();
}
catch (FailoverException fe)
{
@@ -159,7 +164,7 @@
_logger.info("Running testUntilFailure");
try
{
- he.testUntilFailureWithDelaysTransient();
+ he.testUntilFailureWithDelays();
}
catch (FailoverException fe)
{
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java Wed May 9 01:48:18 2007
@@ -38,7 +38,7 @@
{
private final Random random = new Random();
- private final int numMessages = 1000;
+ private final int numMessages = 10;
private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>();
private final Set<Subscription> _active = new HashSet<Subscription>();