You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/05/09 10:48:21 UTC

svn commit: r536458 [1/2] - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/messageStore/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qp...

Author: gsim
Date: Wed May  9 01:48:18 2007
New Revision: 536458

URL: http://svn.apache.org/viewvc?view=rev&rev=536458
Log:
Patch from Arnaud Simon (asimon@redhat.com) to fix tests broken by earlier changes.


Added:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java   (with props)
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java   (with props)
Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Wed May  9 01:48:18 2007
@@ -228,10 +228,14 @@
         try
         {
             queue.delete();
-            _messageStore.destroyQueue(queue);
+            if( queue.isDurable() )
+            {
+                _messageStore.destroyQueue(queue);
+            }
         }
         catch (Exception ex)
         {
+           ex.printStackTrace();
             JMException jme = new JMException(ex.getMessage());
             jme.initCause(ex);
             throw new MBeanException(jme, "Error in deleting queue " + queueName);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Wed May  9 01:48:18 2007
@@ -73,10 +73,14 @@
      */
     private AtomicLong _deliveryTag = new AtomicLong(0);
 
-    /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
+    /**
+     * A channel has a default queue (the last declared) that is used when no queue name is explictily set
+     */
     private AMQQueue _defaultQueue;
 
-    /** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
+    /**
+     * This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request.
+     */
     private int _consumerTag;
 
     /**
@@ -86,7 +90,9 @@
      */
     private AMQMessage _currentMessage;
 
-    /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */
+    /**
+     * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
+     */
     private final Map<AMQShortString, AMQQueue> _consumerTag2QueueMap = new HashMap<AMQShortString, AMQQueue>();
 
     private final MessageStore _messageStore;
@@ -118,7 +124,8 @@
 
 
     public AMQChannel(AMQProtocolSession session, int channelId, TransactionManager transactionManager, MessageStore messageStore, MessageRouter exchanges)
-            throws AMQException
+            throws
+            AMQException
     {
         _session = session;
         _channelId = channelId;
@@ -132,7 +139,9 @@
         _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks);
     }
 
-    /** Sets this channel to be part of a local transaction */
+    /**
+     * Sets this channel to be part of a local transaction
+     */
     public void setLocalTransactional()
     {
         _txnContext = new DistributedTransactionalContext(_transactionManager, _messageStore, _storeContext, _returnMessages);
@@ -193,23 +202,25 @@
     }
 
 
-    public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException
+    public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher)
+            throws
+            AMQException
     {
 
 
         _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info,
-                                         _txnContext);
+                _txnContext);
         _currentMessage.setPublisher(publisher);
     }
 
     public void publishContentHeader(ContentHeaderBody contentHeaderBody)
-            throws AMQException
+            throws
+            AMQException
     {
         if (_currentMessage == null)
         {
             throw new AMQException("Received content header without previously receiving a BasicPublish frame");
-        }
-        else
+        } else
         {
             if (_log.isTraceEnabled())
             {
@@ -228,7 +239,8 @@
     }
 
     public void publishContentBody(ContentBody contentBody, AMQProtocolSession protocolSession)
-            throws AMQException
+            throws
+            AMQException
     {
         if (_currentMessage == null)
         {
@@ -261,7 +273,9 @@
         }
     }
 
-    protected void routeCurrentMessage() throws AMQException
+    protected void routeCurrentMessage()
+            throws
+            AMQException
     {
         try
         {
@@ -294,14 +308,15 @@
      * @param exclusive Flag requesting exclusive access to the queue
      * @param acks      Are acks enabled for this subscriber
      * @param filters   Filters to apply to this subscriber
-     *
      * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
-     *
      * @throws ConsumerTagNotUniqueException if the tag is not unique
      * @throws AMQException                  if something goes wrong
      */
     public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, AMQProtocolSession session, boolean acks,
-                                           FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
+                                           FieldTable filters, boolean noLocal, boolean exclusive)
+            throws
+            AMQException,
+            ConsumerTagNotUniqueException
     {
         if (tag == null)
         {
@@ -318,28 +333,11 @@
     }
 
 
-    public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException
+    public void unsubscribeConsumer(AMQProtocolSession session, final AMQShortString consumerTag)
+            throws
+            AMQException
     {
-        if (_log.isDebugEnabled())
-        {
-            _log.debug("Unacked Map Dump size:" + _unacknowledgedMessageMap.size());
-            _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-            {
-
-                public boolean callback(UnacknowledgedMessage message) throws AMQException
-                {
-                    _log.debug(message);
-
-                    return true;
-                }
-
-                public void visitComplete()
-                {
-                }
-            });
-        }
-
-        AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
+        final AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
         if (q != null)
         {
             q.unregisterProtocolSession(session, _channelId, consumerTag);
@@ -350,25 +348,27 @@
      * Called from the protocol session to close this channel and clean up. T
      *
      * @param session The session to close
-     *
      * @throws AMQException if there is an error during closure
      */
-    public void close(AMQProtocolSession session) throws AMQException
+    public void close(AMQProtocolSession session)
+            throws
+            AMQException
     {
         _txnContext.rollback();
         unsubscribeAllConsumers(session);
         requeue();
     }
 
-    private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
+    private void unsubscribeAllConsumers(AMQProtocolSession session)
+            throws
+            AMQException
     {
         if (_log.isInfoEnabled())
         {
             if (!_consumerTag2QueueMap.isEmpty())
             {
                 _log.info("Unsubscribing all consumers on channel " + toString());
-            }
-            else
+            } else
             {
                 _log.info("No consumers to unsubscribe on channel " + toString());
             }
@@ -400,13 +400,12 @@
             if (queue == null)
             {
                 _log.debug("Adding unacked message with a null queue:" + message.debugIdentity());
-            }
-            else
+            } else
             {
                 if (_log.isDebugEnabled())
                 {
                     _log.debug(debugIdentity() + " Adding unacked message(" + message.toString() + " DT:" + deliveryTag +
-                               ") with a queue(" + queue + ") for " + consumerTag);
+                            ") with a queue(" + queue + ") for " + consumerTag);
                 }
             }
         }
@@ -431,7 +430,9 @@
      *
      * @throws org.apache.qpid.AMQException if the requeue fails
      */
-    public void requeue() throws AMQException
+    public void requeue()
+            throws
+            AMQException
     {
         // we must create a new map since all the messages will get a new delivery tag when they are redelivered
         Collection<UnacknowledgedMessage> messagesToBeDelivered = _unacknowledgedMessageMap.cancelAllMessages();
@@ -452,12 +453,11 @@
 //                if (_nonTransactedContext == null)
                 {
                     _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
-                                                                        _returnMessages, _browsedAcks);
+                            _returnMessages, _browsedAcks);
                 }
 
                 deliveryContext = _nonTransactedContext;
-            }
-            else
+            } else
             {
                 deliveryContext = _txnContext;
             }
@@ -489,10 +489,11 @@
      * Requeue a single message
      *
      * @param deliveryTag The message to requeue
-     *
      * @throws AMQException If something goes wrong.
      */
-    public void requeue(long deliveryTag) throws AMQException
+    public void requeue(long deliveryTag)
+            throws
+            AMQException
     {
         UnacknowledgedMessage unacked = _unacknowledgedMessageMap.remove(deliveryTag);
 
@@ -516,12 +517,11 @@
 //                if (_nonTransactedContext == null)
                 {
                     _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
-                                                                        _returnMessages, _browsedAcks);
+                            _returnMessages, _browsedAcks);
                 }
 
                 deliveryContext = _nonTransactedContext;
-            }
-            else
+            } else
             {
                 deliveryContext = _txnContext;
             }
@@ -532,19 +532,17 @@
                 deliveryContext.deliver(unacked.message, unacked.queue, true);
                 //Deliver increments the message count but we have already deliverted this once so don't increment it again
                 // this was because deliver did an increment changed this.
-            }
-            else
+            } else
             {
                 _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.message.debugIdentity() + "):" + deliveryTag +
-                          " but no queue defined and no DeadLetter queue so DROPPING message.");
+                        " but no queue defined and no DeadLetter queue so DROPPING message.");
 //                _log.error("Requested requeue of message:" + deliveryTag +
 //                           " but no queue defined using DeadLetter queue:" + getDeadLetterQueue());
 //
 //                deliveryContext.deliver(unacked.message, getDeadLetterQueue(), false);
 //
             }
-        }
-        else
+        } else
         {
             _log.warn("Requested requeue of message:" + deliveryTag + " but no such delivery tag exists." + _unacknowledgedMessageMap.size());
 
@@ -554,10 +552,12 @@
                 {
                     int count = 0;
 
-                    public boolean callback(UnacknowledgedMessage message) throws AMQException
+                    public boolean callback(UnacknowledgedMessage message)
+                            throws
+                            AMQException
                     {
                         _log.debug((count++) + ": (" + message.message.debugIdentity() + ")" +
-                                   "[" + message.deliveryTag + "]");
+                                "[" + message.deliveryTag + "]");
                         return false;  // Continue
                     }
 
@@ -577,10 +577,11 @@
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
      * @param requeue Are the messages to be requeued or dropped.
-     *
      * @throws AMQException When something goes wrong.
      */
-    public void resend(final boolean requeue) throws AMQException
+    public void resend(final boolean requeue)
+            throws
+            AMQException
     {
         final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
         final List<UnacknowledgedMessage> msgToResend = new LinkedList<UnacknowledgedMessage>();
@@ -595,7 +596,9 @@
         // and those that don't to be requeued.
         _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
         {
-            public boolean callback(UnacknowledgedMessage message) throws AMQException
+            public boolean callback(UnacknowledgedMessage message)
+                    throws
+                    AMQException
             {
                 AMQShortString consumerTag = message.consumerTag;
                 AMQMessage msg = message.message;
@@ -606,13 +609,11 @@
                     if (_consumerTag2QueueMap.containsKey(consumerTag))
                     {
                         msgToResend.add(message);
-                    }
-                    else // consumer has gone
+                    } else // consumer has gone
                     {
                         msgToRequeue.add(message);
                     }
-                }
-                else
+                } else
                 {
                     // Message has no consumer tag, so was "delivered" to a GET
                     // or consumer no longer registered
@@ -622,13 +623,11 @@
                         if (requeue)
                         {
                             msgToRequeue.add(message);
-                        }
-                        else
+                        } else
                         {
                             _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message);
                         }
-                    }
-                    else
+                    } else
                     {
                         _log.info("Message.queue is null and no DeadLetter Queue so dropping message:" + message);
                     }
@@ -649,8 +648,7 @@
             if (!msgToResend.isEmpty())
             {
                 _log.info("Preparing (" + msgToResend.size() + ") message to resend.");
-            }
-            else
+            } else
             {
                 _log.info("No message to resend.");
             }
@@ -699,8 +697,7 @@
                         }
                         //move this message to requeue
                         msgToRequeue.add(message);
-                    }
-                    else
+                    } else
                     {
                         if (_log.isDebugEnabled())
                         {
@@ -710,8 +707,7 @@
                         _unacknowledgedMessageMap.remove(message.deliveryTag);
                     }
                 } // sync(sub.getSendLock)
-            }
-            else
+            } else
             {
 
                 if (_log.isInfoEnabled())
@@ -740,12 +736,11 @@
             if (_nonTransactedContext == null)
             {
                 _nonTransactedContext = new NonTransactionalContext(_messageStore, _storeContext, this,
-                                                                    _returnMessages, _browsedAcks);
+                        _returnMessages, _browsedAcks);
             }
 
             deliveryContext = _nonTransactedContext;
-        }
-        else
+        } else
         {
             deliveryContext = _txnContext;
         }
@@ -768,14 +763,17 @@
      * since we may get an ack for a delivery tag that was generated from the deleted queue.
      *
      * @param queue the queue that has been deleted
-     *
      * @throws org.apache.qpid.AMQException if there is an error processing the unacked messages
      */
-    public void queueDeleted(final AMQQueue queue) throws AMQException
+    public void queueDeleted(final AMQQueue queue)
+            throws
+            AMQException
     {
         _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
         {
-            public boolean callback(UnacknowledgedMessage message) throws AMQException
+            public boolean callback(UnacknowledgedMessage message)
+                    throws
+                    AMQException
             {
                 if (message.queue == queue)
                 {
@@ -787,7 +785,7 @@
                     catch (AMQException e)
                     {
                         _log.error("Error decrementing ref count on message " + message.message.getMessageId() + ": " +
-                                   e, e);
+                                e, e);
                     }
                 }
                 return false;
@@ -805,10 +803,11 @@
      * @param deliveryTag the last delivery tag
      * @param multiple    if true will acknowledge all messages up to an including the delivery tag. if false only
      *                    acknowledges the single message specified by the delivery tag
-     *
      * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
      */
-    public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
+    public void acknowledgeMessage(long deliveryTag, boolean multiple)
+            throws
+            AMQException
     {
         synchronized (_unacknowledgedMessageMap.getLock())
         {
@@ -842,7 +841,7 @@
         boolean suspend;
 
         suspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark)
-                  || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
+                || ((_prefetchSize != 0) && _prefetchSize < _unacknowledgedMessageMap.getUnacknowledgeBytes());
 
         setSuspended(suspend);
     }
@@ -868,8 +867,7 @@
                 {
                     q.deliverAsync();
                 }
-            }
-            else
+            } else
             {
                 _log.debug("Suspending channel " + this);
             }
@@ -881,16 +879,20 @@
         return _suspended.get();
     }
 
-    public void commit() throws AMQException
+    public void commit()
+            throws
+            AMQException
     {
         if (!isTransactional())
         {
             throw new AMQException("Fatal error: commit called on non-transactional channel");
         }
-        _txnContext.commit();
+        _txnContext.commit();       
     }
 
-    public void rollback() throws AMQException
+    public void rollback()
+            throws
+            AMQException
     {
         _txnContext.rollback();
     }
@@ -919,14 +921,16 @@
         return _storeContext;
     }
 
-    public void processReturns(AMQProtocolSession session) throws AMQException
+    public void processReturns(AMQProtocolSession session)
+            throws
+            AMQException
     {
         for (RequiredDeliveryException bouncedMessage : _returnMessages)
         {
             AMQMessage message = bouncedMessage.getAMQMessage();
             session.getProtocolOutputConverter().writeReturn(message, _channelId,
-                                                             bouncedMessage.getReplyCode().getCode(),
-                                                             new AMQShortString(bouncedMessage.getMessage()));
+                    bouncedMessage.getReplyCode().getCode(),
+                    new AMQShortString(bouncedMessage.getMessage()));
         }
         _returnMessages.clear();
     }
@@ -937,8 +941,7 @@
         if (isSuspended())
         {
             return true;
-        }
-        else
+        } else
         {
             boolean willSuspend = ((_prefetch_HighWaterMark != 0) && _unacknowledgedMessageMap.size() + 1 > _prefetch_HighWaterMark);
             if (!willSuspend)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MemoryMessageStore.java Wed May  9 01:48:18 2007
@@ -19,49 +19,75 @@
 
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.txn.TransactionRecord;
+import org.apache.qpid.server.txn.MemoryEnqueueRecord;
+import org.apache.qpid.server.txn.MemoryDequeueRecord;
 import org.apache.qpid.server.exception.*;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
 
 import javax.transaction.xa.Xid;
-import java.util.Collection;
+import java.util.*;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
 
 /**
+ * This a simple in-memory implementation of a message store i.e. nothing is persisted
+ * <p/>
  * Created by Arnaud Simon
  * Date: 26-Apr-2007
  * Time: 08:23:45
  */
-public class MemoryMessageStore  implements MessageStore
+public class MemoryMessageStore implements MessageStore
 {
+    //========================================================================
+    // Static Constants
+    //========================================================================
+    // The logger for this class
+    private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
+
+    // The table of message with its corresponding stream containing the message body
+    private Map<StorableMessage, ByteArrayOutputStream> _stagedMessages;
+    // The queue/messages association
+    private Map<StorableQueue, List<StorableMessage>> _queueMap;
+    // the message ID
+    private long _messageID = 0;
+    // The transaction manager
+    private TransactionManager _txm;
+
+    //========================================================================
+    // Interface MessageStore
+    //========================================================================
 
     public void removeExchange(Exchange exchange)
             throws
             InternalErrorException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // do nothing this is inmemory 
     }
 
     public void unbindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
-          throws
-          InternalErrorException
+            throws
+            InternalErrorException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // do nothing this is inmemory
     }
 
     public void createExchange(Exchange exchange)
             throws
             InternalErrorException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // do nothing this is inmemory
     }
 
     public void bindQueue(Exchange exchange, AMQShortString routingKey, StorableQueue queue, FieldTable args)
             throws
             InternalErrorException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // do nothing this is inmemory
     }
 
     public void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
@@ -69,14 +95,21 @@
             InternalErrorException,
             IllegalArgumentException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        _log.info("Configuring memory message store");
+        // Initialise the maps
+        _stagedMessages = new HashMap<StorableMessage, ByteArrayOutputStream>();
+        _queueMap = new HashMap<StorableQueue, List<StorableMessage>>();
+        _txm = tm;
+        _txm.configure(this, "txn", config);
     }
 
     public void close()
             throws
             InternalErrorException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        _log.info("Closing memory message store");
+        _stagedMessages.clear();
+        _queueMap.clear();
     }
 
     public void createQueue(StorableQueue queue)
@@ -84,7 +117,12 @@
             InternalErrorException,
             QueueAlreadyExistsException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        if (_queueMap.containsKey(queue))
+        {
+            throw new QueueAlreadyExistsException("queue " + queue + " already exists");
+        }
+        // add this queue into the map
+        _queueMap.put(queue, new LinkedList<StorableMessage>());
     }
 
     public void destroyQueue(StorableQueue queue)
@@ -92,7 +130,12 @@
             InternalErrorException,
             QueueDoesntExistException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        if (!_queueMap.containsKey(queue))
+        {
+            throw new QueueDoesntExistException("queue " + queue + " does not exist");
+        }
+        // remove this queue from the map
+        _queueMap.remove(queue);
     }
 
     public void stage(StorableMessage m)
@@ -100,7 +143,12 @@
             InternalErrorException,
             MessageAlreadyStagedException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        if (_stagedMessages.containsKey(m))
+        {
+            throw new MessageAlreadyStagedException("message " + m + " already staged");
+        }
+        _stagedMessages.put(m, new ByteArrayOutputStream());
+        m.staged();
     }
 
     public void appendContent(StorableMessage m, byte[] data, int offset, int size)
@@ -108,7 +156,11 @@
             InternalErrorException,
             MessageDoesntExistException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        if (!_stagedMessages.containsKey(m))
+        {
+            throw new MessageDoesntExistException("message " + m + " has not been staged");
+        }
+        _stagedMessages.get(m).write(data, offset, size);
     }
 
     public byte[] loadContent(StorableMessage m, int offset, int size)
@@ -116,7 +168,15 @@
             InternalErrorException,
             MessageDoesntExistException
     {
-        return new byte[0];  //To change body of implemented methods use File | Settings | File Templates.
+        if (!_stagedMessages.containsKey(m))
+        {
+            throw new MessageDoesntExistException("message " + m + " has not been staged");
+        }
+        byte[] result = new byte[size];
+        ByteBuffer buf = ByteBuffer.allocate(size);
+        buf.put(_stagedMessages.get(m).toByteArray(), offset, size);
+        buf.get(result);
+        return result;
     }
 
     public void destroy(StorableMessage m)
@@ -124,7 +184,11 @@
             InternalErrorException,
             MessageDoesntExistException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        if (!_stagedMessages.containsKey(m))
+        {
+            throw new MessageDoesntExistException("message " + m + " has not been staged");
+        }
+        _stagedMessages.remove(m);
     }
 
     public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
@@ -135,7 +199,31 @@
             UnknownXidException,
             MessageDoesntExistException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        if (xid != null)
+        {
+            // this is a tx operation
+            TransactionRecord enqueueRecord = new MemoryEnqueueRecord(m, queue);
+            _txm.getTransaction(xid).addRecord(enqueueRecord);
+        } else
+        {
+            if (!_stagedMessages.containsKey(m))
+            {
+                try
+                {
+                    stage(m);
+                } catch (MessageAlreadyStagedException e)
+                {
+                    throw new InternalErrorException(e);
+                }
+                appendContent(m, m.getData(), 0, m.getPayloadSize());
+            }
+            if (!_queueMap.containsKey(queue))
+            {
+                throw new QueueDoesntExistException("queue " + queue + " dos not exist");
+            }
+            _queueMap.get(queue).add(m);
+            m.enqueue(queue);
+        }
     }
 
     public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
@@ -145,25 +233,43 @@
             InvalidXidException,
             UnknownXidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        if (xid != null)
+        {
+            // this is a tx operation
+            TransactionRecord dequeueRecord = new MemoryDequeueRecord(m, queue);
+            _txm.getTransaction(xid).addRecord(dequeueRecord);
+        } else
+        {
+            if (!_queueMap.containsKey(queue))
+            {
+                throw new QueueDoesntExistException("queue " + queue + " dos not exist");
+            }
+            m.dequeue(queue);
+            _queueMap.get(queue).remove(m);
+            if (!m.isEnqueued())
+            {
+                // we can delete this message
+                _stagedMessages.remove(m);
+            }
+        }
     }
 
     public Collection<StorableQueue> getAllQueues()
             throws
             InternalErrorException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return _queueMap.keySet();
     }
 
     public Collection<StorableMessage> getAllMessages(StorableQueue queue)
             throws
             InternalErrorException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return _queueMap.get(queue);
     }
 
     public long getNewMessageId()
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        return _messageID++;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed May  9 01:48:18 2007
@@ -432,10 +432,10 @@
         // enqueuing the messages ensure that if required the destinations are recorded to a
         // persistent store
 
-        for (AMQQueue q : _transientMessageData.getDestinationQueues())
-        {
-            _messageHandle.enqueue(storeContext, _messageId, q);
-        }
+       // for (AMQQueue q : _transientMessageData.getDestinationQueues())
+       // {
+       //     _messageHandle.enqueue(storeContext, _messageId, q);
+       // }
 
         if (_transientMessageData.getContentHeaderBody().bodySize == 0)
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java Wed May  9 01:48:18 2007
@@ -169,7 +169,10 @@
     {
         try
         {
-            _messageStore.enqueue((Xid) storeContext.getPayload(), _message, queue);
+            if (queue.isDurable())
+            {
+                _messageStore.enqueue((Xid) storeContext.getPayload(), _message, queue);
+            }
         } catch (Exception e)
         {
             throw new AMQException("PRoblem during message enqueue", e);
@@ -182,7 +185,10 @@
     {
         try
         {
-            _messageStore.dequeue((Xid) storeContext.getPayload(), _message, queue);
+            if (queue.isDurable())
+            {
+                _messageStore.dequeue((Xid) storeContext.getPayload(), _message, queue);
+            }
         } catch (Exception e)
         {
             throw new AMQException("PRoblem during message dequeue", e);
@@ -199,8 +205,8 @@
         if (_payload == null)
         {
             int bodySize = (int) _contentHeaderBody.bodySize;
-            _buffer = ByteBuffer.allocate(bodySize);
             _payload = new byte[bodySize];
+            _buffer = ByteBuffer.wrap(_payload);
             for (ContentChunk contentBody : _chunks)
             {
                 int chunkSize = contentBody.getSize();
@@ -208,7 +214,6 @@
                 contentBody.getData().get(chunk);
                 _buffer.put(chunk);
             }
-            _buffer.get(_payload);
         }
         return _payload;
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DequeueRecord.java Wed May  9 01:48:18 2007
@@ -39,7 +39,7 @@
             UnknownXidException,
             MessageDoesntExistException
     {
-        // do nothing
+        // nothing    
     }
 
     public void rollback(MessageStore store)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransactionalContext.java Wed May  9 01:48:18 2007
@@ -46,7 +46,8 @@
     //========================================================================
     // The logger for this class
     private static final Logger _log = Logger.getLogger(DistributedTransactionalContext.class);
-
+    private static final Object _lockXID = new Object();
+    private static int _count = 0;
     //========================================================================
     // Instance Fields
     //========================================================================
@@ -60,7 +61,6 @@
     final private List<RequiredDeliveryException> _returnMessages;
     // for generating xids
     private byte[] _txId = ("txid").getBytes();
-    private int _count = 0;
 
     public DistributedTransactionalContext(TransactionManager transactionManager, MessageStore messageStore, StoreContext storeContext,
                                            List<RequiredDeliveryException> returnMessages)
@@ -75,15 +75,21 @@
             throws
             AMQException
     {
-        // begin the transaction and pass the XID through the context
-        Xid xid = new XidImpl(("branch" + _count++).getBytes(), 1, _txId);
-        try
-        {
-            _transactionManager.begin(xid);
-            _storeContext.setPayload(xid);
-        } catch (Exception e)
+        if (_storeContext.getPayload() == null)
         {
-            throw new AMQException("Problem during transaction begin", e);
+            synchronized (_lockXID)
+            {
+                // begin the transaction and pass the XID through the context
+                Xid xid = new XidImpl(("branch" + _count++).getBytes(), 1, _txId);
+                try
+                {
+                    _transactionManager.begin(xid);
+                    _storeContext.setPayload(xid);
+                } catch (Exception e)
+                {
+                    throw new AMQException("Problem during transaction begin", e);
+                }
+            }
         }
     }
 
@@ -93,11 +99,18 @@
     {
         try
         {
-            _transactionManager.commit_one_phase((Xid) _storeContext.getPayload());
+            if (_storeContext.getPayload() != null)
+            {
+                _transactionManager.commit_one_phase((Xid) _storeContext.getPayload());
+            }
         } catch (Exception e)
         {
             throw new AMQException("Problem during transaction commit", e);
         }
+        finally
+        {
+            _storeContext.setPayload(null);
+        }
     }
 
     public void rollback()
@@ -106,11 +119,18 @@
     {
         try
         {
-            _transactionManager.rollback((Xid) _storeContext.getPayload());
+            if (_storeContext.getPayload() != null)
+            {
+                _transactionManager.rollback((Xid) _storeContext.getPayload());
+            }
         } catch (Exception e)
         {
             throw new AMQException("Problem during transaction rollback", e);
         }
+        finally
+        {
+            _storeContext.setPayload(null);
+        }
     }
 
     public void messageFullyReceived(boolean persistent)
@@ -142,6 +162,7 @@
             throws
             AMQException
     {
+        beginTranIfNecessary();
         if (multiple)
         {
             if (deliveryTag == 0)

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java?view=auto&rev=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java Wed May  9 01:48:18 2007
@@ -0,0 +1,82 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.exception.*;
+import org.apache.log4j.Logger;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-May-2007
+ * Time: 13:59:47
+ */
+public class MemoryDequeueRecord implements TransactionRecord
+{
+    //========================================================================
+    // Static Constants
+    //========================================================================
+    // The logger for this class
+    private static final Logger _log = Logger.getLogger(MemoryDequeueRecord.class);
+    // the queue
+    StorableQueue _queue;
+    // the message
+    StorableMessage _message;
+
+    //========================================================================
+    // Constructor
+    //========================================================================
+    public MemoryDequeueRecord( StorableMessage m, StorableQueue queue)
+    {
+        _queue = queue;
+        _message = m;
+    }
+
+    //========================================================================
+    // Interface TransactionRecord
+    //========================================================================
+
+    public void commit(MessageStore store, Xid xid)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException,
+            MessageDoesntExistException
+    {
+        store.dequeue(null, _message, _queue);
+    }
+
+    public void rollback(MessageStore store)
+            throws
+            InternalErrorException
+    {
+        // do nothing
+    }
+
+    public void prepare(MessageStore store)
+            throws
+            InternalErrorException
+    {
+        // do nothing
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryDequeueRecord.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java?view=auto&rev=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java Wed May  9 01:48:18 2007
@@ -0,0 +1,82 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.exception.*;
+import org.apache.log4j.Logger;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-May-2007
+ * Time: 14:00:04
+ */
+public class MemoryEnqueueRecord implements TransactionRecord
+{
+    //========================================================================
+    // Static Constants
+    //========================================================================
+    // The logger for this class
+    private static final Logger _log = Logger.getLogger(MemoryDequeueRecord.class);
+
+    // the queue
+    StorableQueue _queue;
+    // the message
+    StorableMessage _message;
+
+    //========================================================================
+    // Constructor
+    //========================================================================
+    public MemoryEnqueueRecord(StorableMessage m, StorableQueue queue)
+    {
+        _queue = queue;
+        _message = m;
+    }
+    //========================================================================
+    // Interface TransactionRecord
+    //========================================================================
+
+    public void commit(MessageStore store, Xid xid)
+            throws
+            InternalErrorException,
+            QueueDoesntExistException,
+            InvalidXidException,
+            UnknownXidException,
+            MessageDoesntExistException
+    {
+        store.enqueue(null, _message, _queue);
+    }
+
+    public void rollback(MessageStore store)
+            throws
+            InternalErrorException
+    {
+        // do nothing
+    }
+
+    public void prepare(MessageStore store)
+            throws
+            InternalErrorException
+    {
+        // do nothing 
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryEnqueueRecord.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java?view=auto&rev=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java Wed May  9 01:48:18 2007
@@ -0,0 +1,158 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 03-May-2007
+ * Time: 14:30:41
+ */
+public class MemoryTransaction implements Transaction
+{
+    //========================================================================
+      // Static Constants
+      //========================================================================
+      // The logger for this class
+      private static final Logger _log = Logger.getLogger(MemoryTransaction.class);
+
+      //========================================================================
+      // Instance Fields
+      //========================================================================
+      // Indicates whether this transaction is prepared
+      private boolean _prepared = false;
+      // Indicates that this transaction has heuristically rolled back
+      private boolean _heurRollBack = false;
+      // The list of Abstract records associated with this tx
+      private List<TransactionRecord> _records = new LinkedList<TransactionRecord>();
+      // The date when this tx has been created.
+      private long _dateCreated;
+      // The timeout in seconds
+      private long _timeout;
+
+      //=========================================================
+      // Constructors
+      //=========================================================
+      /**
+       * Create a transaction that wraps a BDB tx and set the creation date.
+       *
+       */
+      public MemoryTransaction()
+      {
+          _dateCreated = System.currentTimeMillis();
+      }
+
+      //=========================================================
+      // Getter and Setter methods
+      //=========================================================
+      /**
+       * Notify that this tx has been prepared
+       */
+      public void prepare()
+      {
+          _prepared = true;
+      }
+
+      /**
+       * Specify whether this transaction is prepared
+       *
+       * @return true if this transaction is prepared, false otherwise
+       */
+      public boolean isPrepared()
+      {
+          return _prepared;
+      }
+
+       /**
+       * Notify that this tx has been heuristically rolled back
+       */
+      public void heurRollback()
+      {
+          _heurRollBack = true;
+      }
+
+      /**
+       * Specify whether this transaction has been heuristically rolled back
+       *
+       * @return true if this transaction has been heuristically rolled back , false otherwise
+       */
+      public boolean isHeurRollback()
+      {
+          return _heurRollBack;
+      }
+
+      /**
+       * Add an abstract record to this tx.
+       *
+       * @param record The record to be added
+       */
+      public void addRecord(TransactionRecord record)
+      {
+          _records.add(record);
+      }
+
+      /**
+       * Get the list of records associated with this tx.
+       *
+       * @return The list of records associated with this tx.
+       */
+      public List<TransactionRecord> getrecords()
+      {
+          return _records;
+      }
+
+      /**
+       * Set this tx timeout
+       *
+       * @param timeout This tx timeout in seconds
+       */
+      public void setTimeout(long timeout)
+      {
+          _timeout = timeout;
+      }
+
+      /**
+       * Get this tx timeout
+       *
+       * @return This tx timeout in seconds
+       */
+      public long getTimeout()
+      {
+          return _timeout;
+      }
+
+      /**
+       * Specify whether this tx has expired
+       *
+       * @return true if this tx has expired, false otherwise
+       */
+      public boolean hasExpired()
+      {
+          long currentDate = System.currentTimeMillis();
+          boolean result = currentDate - _dateCreated > _timeout * 1000;
+          if (_log.isDebugEnabled())
+          {
+              _log.debug("transaction has expired");
+          }
+          return result;
+      }
+  }

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransaction.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/MemoryTransactionManager.java Wed May  9 01:48:18 2007
@@ -18,9 +18,13 @@
 package org.apache.qpid.server.txn;
 
 import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
 
 import javax.transaction.xa.Xid;
 import java.util.Set;
+import java.util.HashMap;
 
 /**
  * Created by Arnaud Simon
@@ -29,13 +33,65 @@
  */
 public class MemoryTransactionManager implements TransactionManager
 {
+    //========================================================================
+    // Static Constants
+    //========================================================================
+    // The logger for this class
+    private static final Logger _log = Logger.getLogger(MemoryTransactionManager.class);
+
+    private static final String ENVIRONMENT_TX_TIMEOUT = "environment-tx-timeout";
+
+    //========================================================================
+    // Instance Fields
+    //========================================================================
+    // The underlying BDB message store
+    private MessageStore _messagStore;
+    // A map of XID/BDBtx
+    private HashMap<Xid, Transaction> _xidMap;
+    // A map of in-doubt txs
+    private HashMap<Xid, MemoryTransaction> _indoubtXidMap;
+
+    // A default tx timeout in sec
+    private int _defaultTimeout;   // set to 10s if not specified in the config
+
+    //========================================================================
+    // Interface TransactionManager
+    //========================================================================
+    public void configure(MessageStore messageStroe, String base, Configuration config)
+    {
+        _messagStore = messageStroe;
+        if (config != null)
+        {
+            _defaultTimeout = config.getInt(base + "." + ENVIRONMENT_TX_TIMEOUT, 10);
+        } else
+        {
+            _defaultTimeout = 10;
+        }
+        _log.info("Using transaction timeout of " + _defaultTimeout + " s");
+        _xidMap = new HashMap<Xid, Transaction>();
+        _indoubtXidMap = new HashMap<Xid, MemoryTransaction>();
+    }
 
     public XAFlag begin(Xid xid)
             throws
             InternalErrorException,
             InvalidXidException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        synchronized (xid)
+        {
+            if (xid == null)
+            {
+                throw new InvalidXidException(xid, "null xid");
+            }
+            if (_xidMap.containsKey(xid))
+            {
+                throw new InvalidXidException(xid, "Xid already exist");
+            }
+            MemoryTransaction tx = new MemoryTransaction();
+            tx.setTimeout(_defaultTimeout);
+            _xidMap.put(xid, tx);
+            return XAFlag.ok;
+        }
     }
 
     public XAFlag prepare(Xid xid)
@@ -44,7 +100,35 @@
             CommandInvalidException,
             UnknownXidException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        synchronized (xid)
+        {
+            // get the transaction
+            MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+            XAFlag result = XAFlag.ok;
+            if (tx.hasExpired())
+            {
+                result = XAFlag.rbtimeout;
+                // rollback this tx branch
+                rollback(xid);
+            } else
+            {
+                if (tx.isPrepared())
+                {
+                    throw new CommandInvalidException("TransactionImpl is already prepared");
+                }
+                if (tx.getrecords().size() == 0)
+                {
+                    // the tx was read only (no work has been done)
+                    _xidMap.remove(xid);
+                    result = XAFlag.rdonly;
+                } else
+                {
+                    // we need to persist the tx records
+                    tx.prepare();
+                }
+            }
+            return result;
+        }
     }
 
     public XAFlag rollback(Xid xid)
@@ -53,7 +137,28 @@
             CommandInvalidException,
             UnknownXidException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        synchronized (xid)
+        {
+            // get the transaction
+            MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+            XAFlag flag = XAFlag.ok;
+            if (tx.isHeurRollback())
+            {
+                flag = XAFlag.heurrb;
+            } else
+            {
+                for (TransactionRecord record : tx.getrecords())
+                {
+                    record.rollback(_messagStore);
+                }
+                _xidMap.remove(xid);
+            }
+            if (tx.hasExpired())
+            {
+                flag = XAFlag.rbtimeout;
+            }
+            return flag;
+        }
     }
 
     public XAFlag commit(Xid xid)
@@ -63,7 +168,44 @@
             UnknownXidException,
             NotPreparedException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        synchronized (xid)
+        {
+            // get the transaction
+            MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+            XAFlag flag = XAFlag.ok;
+            if (tx.isHeurRollback())
+            {
+                flag = XAFlag.heurrb;
+            } else if (tx.hasExpired())
+            {
+                flag = XAFlag.rbtimeout;
+                // rollback this tx branch
+                rollback(xid);
+            } else
+            {
+                if (!tx.isPrepared())
+                {
+                    throw new NotPreparedException("TransactionImpl is not prepared");
+                }
+                for (TransactionRecord record : tx.getrecords())
+                {
+                    try
+                    {
+                        record.commit(_messagStore, xid);
+                    } catch (InvalidXidException e)
+                    {
+                        throw new UnknownXidException(xid, e);
+                    } catch (Exception e)
+                    {
+                        // this should not happen as the queue and the message must exist
+                        _log.error("Error when committing distributed transaction heurmix mode returned: " + xid);
+                        flag = XAFlag.heurmix;
+                    }
+                }
+                _xidMap.remove(xid);
+            }
+            return flag;
+        }
     }
 
     public XAFlag commit_one_phase(Xid xid)
@@ -72,7 +214,47 @@
             CommandInvalidException,
             UnknownXidException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        synchronized (xid)
+        {
+            XAFlag flag = XAFlag.ok;
+            MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+            if (tx.isHeurRollback())
+            {
+                flag = XAFlag.heurrb;
+            } else if (tx.hasExpired())
+            {
+                flag = XAFlag.rbtimeout;
+                // rollback this tx branch
+                rollback(xid);
+            } else
+            {
+                // we need to prepare the tx
+                tx.prepare();
+                try
+                {
+                    for (TransactionRecord record : tx.getrecords())
+                    {
+                        try
+                        {
+                            record.commit(_messagStore, xid);
+                        } catch (InvalidXidException e)
+                        {
+                            throw new UnknownXidException(xid, e);
+                        } catch (Exception e)
+                        {
+                            // this should not happen as the queue and the message must exist
+                            _log.error("Error when committing transaction heurmix mode returned: " + xid);
+                            flag = XAFlag.heurmix;
+                        }
+                    }
+                }               
+                finally
+                {
+                    _xidMap.remove(xid);
+                }
+            }
+            return flag;
+        }
     }
 
     public void forget(Xid xid)
@@ -81,7 +263,10 @@
             CommandInvalidException,
             UnknownXidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        synchronized (xid)
+        {
+            _xidMap.remove(xid);
+        }
     }
 
     public void setTimeout(Xid xid, long timeout)
@@ -89,7 +274,8 @@
             InternalErrorException,
             UnknownXidException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        Transaction tx = getTransaction(xid);
+        tx.setTimeout(timeout);
     }
 
     public long getTimeout(Xid xid)
@@ -97,7 +283,8 @@
             InternalErrorException,
             UnknownXidException
     {
-        return 0;  //To change body of implemented methods use File | Settings | File Templates.
+        Transaction tx = getTransaction(xid);
+        return tx.getTimeout();
     }
 
     public Set<Xid> recover(boolean startscan, boolean endscan)
@@ -105,7 +292,7 @@
             InternalErrorException,
             CommandInvalidException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return _indoubtXidMap.keySet();
     }
 
     public void HeuristicOutcome(Xid xid)
@@ -113,13 +300,32 @@
             UnknownXidException,
             InternalErrorException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        synchronized (xid)
+        {
+            MemoryTransaction tx = (MemoryTransaction) getTransaction(xid);
+            if (!tx.isPrepared())
+            {
+                // heuristically rollback this tx
+                for (TransactionRecord record : tx.getrecords())
+                {
+                    record.rollback(_messagStore);
+                }
+                tx.heurRollback();
+            }
+            // add this branch in the list of indoubt tx
+            _indoubtXidMap.put(xid, tx);
+        }
     }
 
     public Transaction getTransaction(Xid xid)
             throws
             UnknownXidException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        Transaction tx = _xidMap.get(xid);
+        if (tx == null)
+        {
+            throw new UnknownXidException(xid);
+        }
+        return tx;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Wed May  9 01:48:18 2007
@@ -93,6 +93,7 @@
     {
         try
         {
+            message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
             queue.process(_storeContext, message, deliverFirst);
             //following check implements the functionality
             //required by the 'immediate' flag:

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java Wed May  9 01:48:18 2007
@@ -26,10 +26,24 @@
 public interface Transaction
 {
 
-     /**
+    /**
      * Add an abstract record to this tx.
      *
      * @param record The record to be added
      */
     public void addRecord(TransactionRecord record);
+
+    /**
+     * Set this tx timeout
+     *
+     * @param timeout This tx timeout in seconds
+     */
+    public void setTimeout(long timeout);
+
+    /**
+     * Get this tx timeout
+     *
+     * @return This tx timeout in seconds
+     */
+    public long getTimeout();
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionManager.java Wed May  9 01:48:18 2007
@@ -19,6 +19,8 @@
 package org.apache.qpid.server.txn;
 
 import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.commons.configuration.Configuration;
 
 import javax.transaction.xa.Xid;
 import java.util.Set;
@@ -30,6 +32,17 @@
  */
 public interface TransactionManager
 {
+
+    /**
+      * Configure this TM with the Message store implementation
+      *
+      * @param base         The base element identifier from which all configuration items are relative. For example, if the base
+      *                     element is "store", the all elements used by concrete classes will be "store.foo" etc.
+      * @param config       The apache commons configuration object
+      * @param messageStroe the message store associated with the TM
+      */
+     public void configure(MessageStore messageStroe, String base, Configuration config);
+
     /**
      * Begin a transaction branch identified by Xid
      *

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Wed May  9 01:48:18 2007
@@ -0,0 +1,230 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import junit.framework.TestCase;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.messageStore.MemoryMessageStore;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.txn.TransactionalContext;
+
+import java.util.*;
+
+public class TxAckTest extends TestCase
+{
+    private Scenario individual;
+    private Scenario multiple;
+    private Scenario combined;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+
+        //ack only 5th msg
+        individual = new Scenario(10, Arrays.asList(5l), Arrays.asList(1l, 2l, 3l, 4l, 6l, 7l, 8l, 9l, 10l));
+        individual.update(5, false);
+
+        //ack all up to and including 5th msg
+        multiple = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l), Arrays.asList(6l, 7l, 8l, 9l, 10l));
+        multiple.update(5, true);
+
+        //leave only 8th and 9th unacked
+        combined = new Scenario(10, Arrays.asList(1l, 2l, 3l, 4l, 5l, 6l, 7l, 10l), Arrays.asList(8l, 9l));
+        combined.update(3, false);
+        combined.update(5, true);
+        combined.update(7, true);
+        combined.update(2, true);//should be ignored
+        combined.update(1, false);//should be ignored
+        combined.update(10, false);
+    }
+
+    public void testPrepare() throws AMQException
+    {
+        individual.prepare();
+        multiple.prepare();
+        combined.prepare();
+    }
+
+    public void testUndoPrepare() throws AMQException
+    {
+        individual.undoPrepare();
+        multiple.undoPrepare();
+        combined.undoPrepare();
+    }
+
+    public void testCommit() throws AMQException
+    {
+        individual.commit();
+        multiple.commit();
+        combined.commit();
+    }
+
+    public static junit.framework.Test suite()
+    {
+        return new junit.framework.TestSuite(TxAckTest.class);
+    }
+
+    private class Scenario
+    {
+        private final UnacknowledgedMessageMap _map = new UnacknowledgedMessageMapImpl(5000);
+        private final TxAck _op = new TxAck(_map);
+        private final List<Long> _acked;
+        private final List<Long> _unacked;
+        private StoreContext _storeContext = new StoreContext();
+
+        Scenario(int messageCount, List<Long> acked, List<Long> unacked)
+        {
+            TransactionalContext txnContext = new NonTransactionalContext(new MemoryMessageStore(),
+                                                                          _storeContext, null,
+                                                                          new LinkedList<RequiredDeliveryException>(),
+                                                                          new HashSet<Long>());
+            for (int i = 0; i < messageCount; i++)
+            {
+                long deliveryTag = i + 1;
+
+                MessagePublishInfo info = new MessagePublishInfo()
+                {
+
+                    public AMQShortString getExchange()
+                    {
+                        return null;
+                    }
+
+                    public boolean isImmediate()
+                    {
+                        return false;
+                    }
+
+                    public boolean isMandatory()
+                    {
+                        return false;
+                    }
+
+                    public AMQShortString getRoutingKey()
+                    {
+                        return null;
+                    }
+                };
+
+                TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
+                _map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
+            }
+            _acked = acked;
+            _unacked = unacked;
+        }
+
+        void update(long deliverytag, boolean multiple)
+        {
+            _op.update(deliverytag, multiple);
+        }
+
+        private void assertCount(List<Long> tags, int expected)
+        {
+            for (long tag : tags)
+            {
+                UnacknowledgedMessage u = _map.get(tag);
+                assertTrue("Message not found for tag " + tag, u != null);
+                ((TestMessage) u.message).assertCountEquals(expected);
+            }
+        }
+
+        void prepare() throws AMQException
+        {
+            _op.consolidate();
+            _op.prepare(_storeContext);
+
+            assertCount(_acked, -1);
+            assertCount(_unacked, 0);
+
+        }
+
+        void undoPrepare()
+        {
+            _op.consolidate();
+            _op.undoPrepare();
+
+            assertCount(_acked, 0);
+            assertCount(_unacked, 0);
+        }
+
+        void commit()
+        {
+            _op.consolidate();
+            _op.commit(_storeContext);
+
+            //check acked messages are removed from map
+            Set<Long> keys = new HashSet<Long>(_map.getDeliveryTags());
+            keys.retainAll(_acked);
+            assertTrue("Expected messages with following tags to have been removed from map: " + keys, keys.isEmpty());
+            //check unacked messages are still in map
+            keys = new HashSet<Long>(_unacked);
+            keys.removeAll(_map.getDeliveryTags());
+            assertTrue("Expected messages with following tags to still be in map: " + keys, keys.isEmpty());
+        }
+    }
+
+    private class TestMessage extends AMQMessage
+    {
+        private final long _tag;
+        private int _count;
+
+        TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext)
+        {
+            super(messageId, publishBody, txnContext);
+            try
+            {
+                setContentHeaderBody(new ContentHeaderBody()
+                {
+                    public int getSize()
+                    {
+                        return 1;
+                    }
+                });
+            }
+            catch (AMQException e)
+            {
+                // won't happen
+            }
+            _tag = tag;
+        }
+
+        public void incrementReference()
+        {
+            _count++;
+        }
+
+        public void decrementReference(StoreContext context)
+        {
+            _count--;
+        }
+
+        void assertCountEquals(int expected)
+        {
+            assertEquals("Wrong count for message with tag " + _tag, expected, _count);
+        }
+    }
+}

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/channel/MaxChannelsTest.java Wed May  9 01:48:18 2007
@@ -0,0 +1,75 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import junit.framework.TestCase;
+import org.apache.mina.common.IoSession;
+import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.SkeletonMessageStore;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.AMQShortString;
+
+import javax.management.JMException;
+
+/** Test class to test MBean operations for AMQMinaProtocolSession. */
+public class MaxChannelsTest extends TestCase
+{
+//    private MessageStore _messageStore = new SkeletonMessageStore();
+
+    public void testChannels() throws Exception
+    {
+        IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
+        AMQMinaProtocolSession _protocolSession = new AMQMinaProtocolSession(new MockIoSession(),
+                                                                             appRegistry.getVirtualHostRegistry(),
+                                                                             new AMQCodecFactory(true),
+                                                                             null);
+        _protocolSession.setVirtualHost(appRegistry.getVirtualHostRegistry().getVirtualHost("test"));
+
+        // check the channel count is correct
+        int channelCount = _protocolSession.getChannels().size();
+        assertEquals("Initial channel count wrong", 0, channelCount);
+
+        long maxChannels = 10L;
+        _protocolSession.setMaximumNumberOfChannels(maxChannels);
+        assertEquals("Number of channels not correctly set.", new Long(maxChannels), _protocolSession.getMaximumNumberOfChannels());
+
+
+       try
+        {
+            for (long currentChannel = 0L; currentChannel < maxChannels; currentChannel++)
+            {
+                _protocolSession.addChannel(new AMQChannel(_protocolSession, (int) currentChannel, null, null, null));
+            }
+        }
+        catch (AMQException e)
+        {
+            assertEquals("Wrong exception recevied.", e.getErrorCode(), AMQConstant.NOT_ALLOWED);
+        }
+        assertEquals("Maximum number of channels not set.", new Long(maxChannels), new Long(_protocolSession.getChannels().size()));
+    }
+
+}

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Wed May  9 01:48:18 2007
@@ -3,7 +3,6 @@
 import junit.framework.TestCase;
 import org.apache.log4j.Logger;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
 import org.apache.qpid.server.util.NullApplicationRegistry;
 import org.apache.qpid.client.*;
 import org.apache.qpid.client.transport.TransportConnection;
@@ -269,7 +268,14 @@
     public void onException(JMSException jmsException)
     {
 
-        Exception linkedException = jmsException.getLinkedException();
+        Exception linkedException = null;
+        try
+        {
+            linkedException = jmsException.getLinkedException();
+        } catch (Exception e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
         if (linkedException instanceof AMQNoRouteException)
         {
             AMQNoRouteException noRoute = (AMQNoRouteException) linkedException;

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java Wed May  9 01:48:18 2007
@@ -8,7 +8,6 @@
 import org.apache.log4j.Logger;
 
 import javax.jms.JMSException;
-import javax.jms.DeliveryMode;
 import java.io.IOException;
 
 
@@ -17,7 +16,7 @@
 {
     private static final Logger _logger = Logger.getLogger(HeapExhaustion.class);
 
-    protected QpidClientConnection conn;
+    protected QpidClientConnection conn;                         
     protected final String BROKER = "localhost";
     protected final String vhost = "/test";
     protected final String queue = "direct://amq.direct//queue";
@@ -35,7 +34,13 @@
         conn = new QpidClientConnection(BROKER);
         conn.setVirtualHost(vhost);
 
-        conn.connect();
+        try
+        {
+            conn.connect();
+        } catch (JMSException e)
+        {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+        }
         // clear queue
         _logger.debug("setup: clearing test queue");
         conn.consume(queue, 2000);
@@ -55,7 +60,7 @@
      *
      * @throws Exception on error
      */
-    public void testUntilFailureTransient() throws Exception
+    public void testUntilFailure() throws Exception
     {
         int copies = 0;
         int total = 0;
@@ -63,7 +68,7 @@
         int size = payload.getBytes().length;
         while (true)
         {
-            conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
+            conn.put(queue, payload, 1);
             copies++;
             total += size;
             System.out.println("put copy " + copies + " OK for total bytes: " + total);
@@ -75,7 +80,7 @@
      *
      * @throws Exception on error
      */
-    public void testUntilFailureWithDelaysTransient() throws Exception
+    public void testUntilFailureWithDelays() throws Exception
     {
         int copies = 0;
         int total = 0;
@@ -83,7 +88,7 @@
         int size = payload.getBytes().length;
         while (true)
         {
-            conn.put(queue, payload, 1, DeliveryMode.NON_PERSISTENT);
+            conn.put(queue, payload, 1);
             copies++;
             total += size;
             System.out.println("put copy " + copies + " OK for total bytes: " + total);
@@ -110,7 +115,7 @@
             _logger.info("Running testUntilFailure");
             try
             {
-                he.testUntilFailureTransient();
+                he.testUntilFailure();
             }
             catch (FailoverException fe)
             {
@@ -159,7 +164,7 @@
             _logger.info("Running testUntilFailure");
             try
             {
-                he.testUntilFailureWithDelaysTransient();
+                he.testUntilFailureWithDelays();
             }
             catch (FailoverException fe)
             {

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java?view=diff&rev=536458&r1=536457&r2=536458
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/ConcurrencyTestDisabled.java Wed May  9 01:48:18 2007
@@ -38,7 +38,7 @@
 {
     private final Random random = new Random();
 
-    private final int numMessages = 1000;
+    private final int numMessages = 10;
 
     private final List<SubscriptionTestHelper> _subscribers = new ArrayList<SubscriptionTestHelper>();
     private final Set<Subscription> _active = new HashSet<Subscription>();