You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/08 00:11:57 UTC

svn commit: r493872 [3/4] - in /incubator/qpid/trunk/qpid/java: broker/bin/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apa...

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java Sun Jan  7 15:11:53 2007
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * Encapsulates a publish body and a content header. In the context of the message store these are treated as a
+ * single unit.
+ */
+public class MessageMetaData
+{
+    private BasicPublishBody _publishBody;
+
+    private ContentHeaderBody _contentHeaderBody;
+
+    private int _contentChunkCount;
+
+    public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+    {
+        _contentHeaderBody = contentHeaderBody;
+        _publishBody = publishBody;
+        _contentChunkCount = contentChunkCount;
+    }
+
+    public int getContentChunkCount()
+    {
+        return _contentChunkCount;
+    }
+
+    public void setContentChunkCount(int contentChunkCount)
+    {
+        _contentChunkCount = contentChunkCount;
+    }
+
+    public ContentHeaderBody getContentHeaderBody()
+    {
+        return _contentHeaderBody;
+    }
+
+    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+    {
+        _contentHeaderBody = contentHeaderBody;
+    }
+
+    public BasicPublishBody getPublishBody()
+    {
+        return _publishBody;
+    }
+
+    public void setPublishBody(BasicPublishBody publishBody)
+    {
+        _publishBody = publishBody;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java Sun Jan  7 15:11:53 2007
@@ -20,13 +20,8 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.protocol.AMQConstant;
-
-import java.util.List;
+import org.apache.qpid.server.RequiredDeliveryException;
 
 /**
  * Signals that no consumers exist for a message at a given point in time.
@@ -35,19 +30,9 @@
  */
 public class NoConsumersException extends RequiredDeliveryException
 {
-    public NoConsumersException(String queue,
-                                BasicPublishBody publishBody,
-                                ContentHeaderBody contentHeaderBody,
-                                List<ContentBody> contentBodies)
-    {
-        super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
-    }
-
-    public NoConsumersException(BasicPublishBody publishBody,
-                                ContentHeaderBody contentHeaderBody,
-                                List<ContentBody> contentBodies)
+    public NoConsumersException(AMQMessage message)
     {
-        super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies);
+        super("Immediate delivery is not possible.", message);
     }
 
     public int getReplyCode()

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,11 +26,11 @@
 
 public interface Subscription
 {
-    void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
+    void send(AMQMessage msg, AMQQueue queue) throws AMQException;
 
     boolean isSuspended();
 
-    void queueDeleted(AMQQueue queue);
+    void queueDeleted(AMQQueue queue) throws AMQException;
 
     boolean hasFilters();
 
@@ -44,5 +44,5 @@
 
     void close();
 
-    boolean isBrowser();   
+    boolean isBrowser();
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,18 +23,18 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicCancelOkBody;
 import org.apache.qpid.framing.BasicDeliverBody;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.BasicCancelOkBody;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
 
 import java.util.Queue;
 
@@ -201,7 +201,7 @@
      * @param queue
      * @throws AMQException
      */
-    public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+    public void send(AMQMessage msg, AMQQueue queue) throws AMQException
     {
         if (msg != null)
         {
@@ -211,7 +211,7 @@
             }
             else
             {
-                sendToConsumer(msg, queue);
+                sendToConsumer(channel.getStoreContext(), msg, queue);
             }
         }
         else
@@ -220,7 +220,7 @@
         }
     }
 
-    private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+    private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException
     {
         // We don't decrement the reference here as we don't want to consume the message
         // but we do want to send it to the client.
@@ -235,14 +235,12 @@
             {
                 channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
             }
-            ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
-            AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
-
-            protocolSession.writeFrame(frame);
+            msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
         }
     }
 
-    private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+    private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
+            throws AMQException
     {
         try
         {
@@ -257,7 +255,11 @@
             // the message is unacked, it will be lost.
             if (!_acks)
             {
-                queue.dequeue(msg);
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+                }
+                queue.dequeue(storeContext, msg);
             }
             synchronized(channel)
             {
@@ -268,10 +270,7 @@
                     channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
                 }
 
-                ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
-                AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
-
-                protocolSession.writeFrame(frame);
+                msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
             }
         }
         finally
@@ -290,7 +289,7 @@
      *
      * @param queue
      */
-    public void queueDeleted(AMQQueue queue)
+    public void queueDeleted(AMQQueue queue) throws AMQException
     {
         channel.queueDeleted(queue);
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Sun Jan  7 15:11:53 2007
@@ -204,7 +204,7 @@
      *
      * @param queue
      */
-    public void queueDeleted(AMQQueue queue)
+    public void queueDeleted(AMQQueue queue) throws AMQException
     {
         for (Subscription s : _subscriptions)
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
 import org.apache.log4j.Logger;
 
 import java.util.LinkedList;
@@ -41,11 +42,13 @@
      * Holds any queued messages
      */
     private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>();
+
     /**
      * Ensures that only one asynchronous task is running for this manager at
      * any time.
      */
     private final AtomicBoolean _processing = new AtomicBoolean();
+
     /**
      * The subscriptions on the queue to whom messages are delivered
      */
@@ -70,9 +73,9 @@
         _queue = queue;
     }
 
-    private synchronized boolean enqueue(AMQMessage msg)
+    private synchronized boolean enqueue(AMQMessage msg) throws AMQException
     {
-        if (msg.isImmediate())
+        if (msg.getPublishBody().immediate)
         {
             return false;
         }
@@ -90,7 +93,7 @@
         }
     }
 
-    private synchronized void startQueueing(AMQMessage msg)
+    private synchronized void startQueueing(AMQMessage msg) throws AMQException
     {
         _queueing = true;
         enqueue(msg);
@@ -127,21 +130,21 @@
         //no-op . This DM has no PreDeliveryQueues
     }
 
-    public synchronized void removeAMessageFromTop() throws AMQException
+    public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
     {
         AMQMessage msg = poll();
         if (msg != null)
         {
-            msg.dequeue(_queue);
+            msg.dequeue(storeContext, _queue);
         }
     }
 
-    public synchronized void clearAllMessages() throws AMQException
+    public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
     {
         AMQMessage msg = poll();
         while (msg != null)
         {
-            msg.dequeue(_queue);
+            msg.dequeue(storeContext, _queue);
             msg = poll();
         }
     }
@@ -231,7 +234,7 @@
      * @throws NoConsumersException if there are no active subscribers to deliver
      *                              the message to
      */
-    public void deliver(String name, AMQMessage msg) throws FailedDequeueException
+    public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException
     {
         // first check whether we are queueing, and enqueue if we are
         if (!enqueue(msg))

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java Sun Jan  7 15:11:53 2007
@@ -0,0 +1,118 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Contains data that is only used in AMQMessage transiently, e.g. while the content
+ * body fragments are arriving.
+ *
+ * Having this data stored in a separate class means that the AMQMessage class avoids
+ * the small overhead of numerous guaranteed-null references.
+ *
+ * @author Apache Software Foundation
+ */
+public class TransientMessageData
+{
+    /**
+     * Stored temporarily until the header has been received at which point it is used when
+     * constructing the handle
+     */
+    private BasicPublishBody _publishBody;
+
+    /**
+     * Also stored temporarily.
+     */
+    private ContentHeaderBody _contentHeaderBody;
+
+    /**
+     * Keeps a track of how many bytes we have received in body frames
+     */
+    private long _bodyLengthReceived = 0;
+
+    /**
+     * This is stored during routing, to know the queues to which this message should immediately be
+     * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
+     * by the message handle.
+     */
+    private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+
+    public BasicPublishBody getPublishBody()
+    {
+        return _publishBody;
+    }
+
+    public void setPublishBody(BasicPublishBody publishBody)
+    {
+        _publishBody = publishBody;
+    }
+
+    public List<AMQQueue> getDestinationQueues()
+    {
+        return _destinationQueues;
+    }
+
+    public void setDestinationQueues(List<AMQQueue> destinationQueues)
+    {
+        _destinationQueues = destinationQueues;
+    }
+
+    public ContentHeaderBody getContentHeaderBody()
+    {
+        return _contentHeaderBody;
+    }
+
+    public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+    {
+        _contentHeaderBody = contentHeaderBody;
+    }
+
+    public long getBodyLengthReceived()
+    {
+        return _bodyLengthReceived;
+    }
+
+    public void addBodyLength(int value)
+    {
+        _bodyLengthReceived += value;
+    }
+
+    public boolean isAllContentReceived() throws AMQException
+    {
+        return _bodyLengthReceived == _contentHeaderBody.bodySize;
+    }
+
+    public void addDestinationQueue(AMQQueue queue)
+    {
+        _destinationQueues.add(queue);
+    }
+
+    public boolean isPersistent()
+    {
+        //todo remove literal values to a constant file such as AMQConstants in common
+        return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
+             ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Sun Jan  7 15:11:53 2007
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class WeakReferenceMessageHandle implements AMQMessageHandle
+{
+    private WeakReference<ContentHeaderBody> _contentHeaderBody;
+
+    private WeakReference<BasicPublishBody> _publishBody;
+
+    private List<WeakReference<ContentBody>> _contentBodies;
+
+    private boolean _redelivered;
+
+    private final MessageStore _messageStore;
+
+    public WeakReferenceMessageHandle(MessageStore messageStore)
+    {
+        _messageStore = messageStore;
+    }
+
+    public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException
+    {
+        ContentHeaderBody chb = (_contentHeaderBody != null?_contentHeaderBody.get():null);
+        if (chb == null)
+        {
+            MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+            chb = mmd.getContentHeaderBody();
+            _contentHeaderBody = new WeakReference<ContentHeaderBody>(chb);
+            _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
+        }
+        return chb;
+    }
+
+    public int getBodyCount(long messageId) throws AMQException
+    {
+        if (_contentBodies == null)
+        {
+            MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+            int chunkCount = mmd.getContentChunkCount();
+            _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount);
+            for (int i = 0; i < chunkCount; i++)
+            {
+                _contentBodies.add(new WeakReference<ContentBody>(null));
+            }
+        }
+        return _contentBodies.size();
+    }
+
+    public long getBodySize(long messageId) throws AMQException
+    {
+        return getContentHeaderBody(messageId).bodySize;
+    }
+
+    public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException
+    {
+        if (index > _contentBodies.size() - 1)
+        {
+            throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
+                                               (_contentBodies.size() - 1));
+        }
+        WeakReference<ContentBody> wr = _contentBodies.get(index);
+        ContentBody cb = wr.get();
+        if (cb == null)
+        {
+            cb = _messageStore.getContentBodyChunk(messageId, index);
+            _contentBodies.set(index, new WeakReference<ContentBody>(cb));
+        }
+        return cb;
+    }
+
+    /**
+     * Content bodies are set <i>before</i> the publish and header frames
+     * @param storeContext
+     * @param messageId
+     * @param contentBody
+     * @throws AMQException
+     */
+    public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException
+    {
+        if (_contentBodies == null)
+        {
+            _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+        }
+        _contentBodies.add(new WeakReference<ContentBody>(contentBody));
+        _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody);
+    }
+
+    public BasicPublishBody getPublishBody(long messageId) throws AMQException
+    {
+        BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null);
+        if (bpb == null)
+        {
+            MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+            bpb = mmd.getPublishBody();
+            _publishBody = new WeakReference<BasicPublishBody>(bpb);
+            _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
+        }
+        return bpb;
+    }
+
+    public boolean isRedelivered()
+    {
+        return _redelivered;
+    }
+
+    public void setRedelivered(boolean redelivered)
+    {
+        _redelivered = redelivered;
+    }
+
+    public boolean isPersistent(long messageId) throws AMQException
+    {
+        //todo remove literal values to a constant file such as AMQConstants in common
+        ContentHeaderBody chb = getContentHeaderBody(messageId);
+        return chb.properties instanceof BasicContentHeaderProperties &&
+               ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+    }
+
+    /**
+     * This is called when all the content has been received.
+     * @param publishBody
+     * @param contentHeaderBody
+     * @throws AMQException
+     */
+    public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
+                                               ContentHeaderBody contentHeaderBody)
+            throws AMQException
+    {
+        // if there are no content bodies the list will be null so we must
+        // create en empty list here
+        if (contentHeaderBody.bodySize == 0)
+        {
+            _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+        }
+        _messageStore.storeMessageMetaData(storeContext, messageId, new MessageMetaData(publishBody, contentHeaderBody,
+                                                                                        _contentBodies.size()));
+        _publishBody = new WeakReference<BasicPublishBody>(publishBody);
+        _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody);
+    }
+
+    public void removeMessage(StoreContext storeContext, long messageId) throws AMQException
+    {
+        _messageStore.removeMessage(storeContext, messageId);
+    }
+
+    public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+    {
+        _messageStore.enqueueMessage(storeContext, queue.getName(), messageId);
+    }
+
+    public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+    {
+        _messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,10 +23,12 @@
 import org.apache.commons.configuration.Configuration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.QueueRegistry;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -43,21 +45,25 @@
 
     private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
 
-    protected ConcurrentMap<Long, AMQMessage> _messageMap;
+    protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
+
+    protected ConcurrentMap<Long, List<ContentBody>> _contentBodyMap;
 
     private final AtomicLong _messageId = new AtomicLong(1);
 
     public void configure()
     {
-        _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table");
-        _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
+        _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
+        _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
+        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(DEFAULT_HASHTABLE_CAPACITY);
     }
 
     public void configure(String base, Configuration config)
     {
         int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
-        _log.info("Using capacity " + hashtableCapacity + " for hash table");
-        _messageMap = new ConcurrentHashMap<Long, AMQMessage>(hashtableCapacity);
+        _log.info("Using capacity " + hashtableCapacity + " for hash tables");
+        _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
+        _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity);
     }
 
     public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
@@ -67,70 +73,71 @@
 
     public void close() throws Exception
     {
-        if (_messageMap != null)
+        if (_metaDataMap != null)
         {
-            _messageMap.clear();
-            _messageMap = null;
+            _metaDataMap.clear();
+            _metaDataMap = null;
+        }
+        if (_contentBodyMap != null)
+        {
+            _contentBodyMap.clear();
+            _contentBodyMap = null;
         }
     }
 
-    public void put(AMQMessage msg)
-    {
-        _messageMap.put(msg.getMessageId(), msg);
-    }
-
-    public void removeMessage(long messageId)
+    public void removeMessage(StoreContext context, long messageId)
     {
         if (_log.isDebugEnabled())
         {
             _log.debug("Removing message with id " + messageId);
         }
-        _messageMap.remove(messageId);
+        _metaDataMap.remove(messageId);
+        _contentBodyMap.remove(messageId);
     }
 
     public void createQueue(AMQQueue queue) throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // Not required to do anything
     }
 
     public void removeQueue(String name) throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // Not required to do anything
     }
 
-    public void enqueueMessage(String name, long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // Not required to do anything
     }
 
-    public void dequeueMessage(String name, long messageId) throws AMQException
+    public void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // Not required to do anything
     }
 
-    public void beginTran() throws AMQException
+    public void beginTran(StoreContext context) throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // Not required to do anything
     }
 
-    public void commitTran() throws AMQException
+    public void commitTran(StoreContext context) throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // Not required to do anything
     }
 
-    public void abortTran() throws AMQException
+    public void abortTran(StoreContext context) throws AMQException
     {
-        //To change body of implemented methods use File | Settings | File Templates.
+        // Not required to do anything
     }
 
-    public boolean inTran()
+    public boolean inTran(StoreContext context)
     {
         return false;
     }
 
     public List<AMQQueue> createQueues() throws AMQException
     {
-        return null;  //To change body of implemented methods use File | Settings | File Templates.
+        return null;
     }
 
     public long getNewMessageId()
@@ -138,8 +145,33 @@
         return _messageId.getAndIncrement();
     }
 
-    public AMQMessage getMessage(long messageId)
+    public void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody)
+            throws AMQException
+    {
+        List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+        if (bodyList == null)
+        {
+            bodyList = new ArrayList<ContentBody>();
+            _contentBodyMap.put(messageId, bodyList);
+        }
+
+        bodyList.add(index, contentBody);
+    }
+
+    public void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData)
+            throws AMQException
+    {
+        _metaDataMap.put(messageId, messageMetaData);
+    }
+
+    public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+    {
+        return _metaDataMap.get(messageId);
+    }
+
+    public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
     {
-        return _messageMap.get(messageId);
+        List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+        return bodyList.get(index);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,8 +22,9 @@
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.QueueRegistry;
 
 import java.util.List;
@@ -37,34 +38,33 @@
      * @param base the base element identifier from which all configuration items are relative. For example, if the base
      * element is "store", the all elements used by concrete classes will be "store.foo" etc.
      * @param config the apache commons configuration object
+     * @throws Exception if an error occurs that means the store is unable to configure itself
      */
     void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception;
 
     /**
      * Called to close and cleanup any resources used by the message store.
-     * @throws Exception
+     * @throws Exception if close fails
      */
     void close() throws Exception;
 
-    void put(AMQMessage msg) throws AMQException;
-
-    void removeMessage(long messageId) throws AMQException;
+    void removeMessage(StoreContext storeContext, long messageId) throws AMQException;
 
     void createQueue(AMQQueue queue) throws AMQException;
 
     void removeQueue(String name) throws AMQException;
 
-    void enqueueMessage(String name, long messageId) throws AMQException;
+    void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException;
 
-    void dequeueMessage(String name, long messageId) throws AMQException;
+    void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException;
 
-    void beginTran() throws AMQException;
+    void beginTran(StoreContext context) throws AMQException;
 
-    void commitTran() throws AMQException;
+    void commitTran(StoreContext context) throws AMQException;
 
-    void abortTran() throws AMQException;
+    void abortTran(StoreContext context) throws AMQException;
 
-    boolean inTran();
+    boolean inTran(StoreContext context);
 
     /**
      * Recreate all queues that were persisted, including re-enqueuing of existing messages
@@ -78,6 +78,13 @@
      * @return a message id
      */
     long getNewMessageId();
-}
 
+    void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) throws AMQException;
 
+    void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) throws AMQException;
+
+    MessageMetaData getMessageMetaData(long messageId) throws AMQException;
+
+    ContentBody getContentBodyChunk(long messageId, int index) throws AMQException;
+
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Sun Jan  7 15:11:53 2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+/**
+ * A context that the store can use to associate with a transactional context. For example, it could store
+ * some kind of txn id.
+ * 
+ * @author Apache Software Foundation
+ */
+public class StoreContext
+{
+    private Object _payload;
+
+    public Object getPayload()
+    {
+        return _payload;
+    }
+
+    public void setPayload(Object payload)
+    {
+        _payload = payload;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java Sun Jan  7 15:11:53 2007
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.List;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CleanupMessageOperation implements TxnOp
+{
+    private static final Logger _log = Logger.getLogger(CleanupMessageOperation.class);
+
+    private final AMQMessage _msg;
+
+    private final List<RequiredDeliveryException> _returns;
+
+    public CleanupMessageOperation(AMQMessage msg, List<RequiredDeliveryException> returns)
+    {
+        _msg = msg;
+        _returns = returns;
+    }
+
+    public void prepare(StoreContext context) throws AMQException
+    {
+    }
+
+    public void undoPrepare()
+    {
+        //don't need to do anything here, if the store's txn failed
+        //when processing prepare then the message was not stored
+        //or enqueued on any queues and can be discarded
+    }
+
+    public void commit(StoreContext context)
+    {
+        //The routers reference can now be released.  This is done
+        //here to ensure that it happens after the queues that
+        //enqueue it have incremented their counts (which as a
+        //memory only operation is done in the commit phase).
+        try
+        {
+            _msg.decrementReference(context);
+        }
+        catch (AMQException e)
+        {
+            _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
+        }
+        try
+        {
+            _msg.checkDeliveredToConsumer();
+        }
+        catch (NoConsumersException e)
+        {
+            //TODO: store this for delivery after the commit-ok
+            _returns.add(e);
+        }
+        catch (AMQException e)
+        {
+            _log.error("On commiting transaction, unable to determine whether delivered to a consumer immediately: " +
+                       e, e);
+        }
+    }
+
+    public void rollback(StoreContext context)
+    {
+        // NO OP
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java Sun Jan  7 15:11:53 2007
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class DeliverMessageOperation implements TxnOp
+{
+    private static final Logger _logger = Logger.getLogger(DeliverMessageOperation.class);
+
+    private final AMQMessage _msg;
+
+    private final AMQQueue _queue;
+
+    public DeliverMessageOperation(AMQMessage msg, AMQQueue queue)
+    {
+        _msg = msg;
+        _queue = queue;
+        _msg.incrementReference();
+    }
+
+    public void prepare(StoreContext context) throws AMQException
+    {
+    }
+
+    public void undoPrepare()
+    {
+    }
+
+    public void commit(StoreContext context)
+    {
+        //do the memeory part of the record()
+        _msg.incrementReference();
+        //then process the message
+        try
+        {
+            _queue.process(context, _msg);
+        }
+        catch (AMQException e)
+        {
+            //TODO: is there anything else we can do here? I think not...
+            _logger.error("Error during commit of a queue delivery: " + e, e);
+        }
+    }
+
+    public void rollback(StoreContext storeContext)
+    {
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Sun Jan  7 15:11:53 2007
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.TxAck;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.List;
+
+/**
+ * A transactional context that only supports local transactions.
+ */
+public class LocalTransactionalContext implements TransactionalContext
+{
+    private final TxnBuffer _txnBuffer;
+
+    /**
+     * We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are
+     * consolidated into a single operation
+     */
+    private TxAck _ackOp;
+
+    private List<RequiredDeliveryException> _returnMessages;
+
+    private final MessageStore _messageStore;
+
+    private final StoreContext _storeContext;
+
+    private boolean _inTran = false;
+
+    public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext,
+                                     TxnBuffer txnBuffer, List<RequiredDeliveryException> returnMessages)
+    {
+        _messageStore = messageStore;
+        _storeContext = storeContext;
+        _txnBuffer = txnBuffer;
+        _returnMessages = returnMessages;
+        _txnBuffer.enlist(new StoreMessageOperation(messageStore));
+    }
+
+    public void rollback() throws AMQException
+    {
+        _txnBuffer.rollback(_storeContext);
+    }
+
+    public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+    {
+        // A publication will result in the enlisting of several
+        // TxnOps. The first is an op that will store the message.
+        // Following that (and ordering is important), an op will
+        // be added for every queue onto which the message is
+        // enqueued. Finally a cleanup op will be added to decrement
+        // the reference associated with the routing.
+
+        _txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+        _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+    }
+
+    private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
+    {
+        if (!unacknowledgedMessageMap.contains(deliveryTag))
+        {
+            throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
+        }
+    }
+
+    public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
+                                   UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
+    {
+        //check that the tag exists to give early failure
+        if (!multiple || deliveryTag > 0)
+        {
+            checkAck(deliveryTag, unacknowledgedMessageMap);
+        }
+        //we use a single txn op for all acks and update this op
+        //as new acks come in. If this is the first ack in the txn
+        //we will need to create and enlist the op.
+        if (_ackOp == null)
+        {
+            _ackOp = new TxAck(unacknowledgedMessageMap);
+            _txnBuffer.enlist(_ackOp);
+        }
+        // update the op to include this ack request
+        if (multiple && deliveryTag == 0)
+        {
+            // if have signalled to ack all, that refers only
+            // to all at this time
+            _ackOp.update(lastDeliveryTag, multiple);
+        }
+        else
+        {
+            _ackOp.update(deliveryTag, multiple);
+        }
+    }
+
+    public void messageFullyReceived(boolean persistent) throws AMQException
+    {
+        // Not required in this transactional context
+    }
+
+    public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException
+    {
+        // Not required in this transactional context
+    }
+
+    public void beginTranIfNecessary() throws AMQException
+    {
+        if (!_inTran)
+        {
+            _messageStore.beginTran(_storeContext);
+            _inTran = true;
+        }
+    }
+
+    public void commit() throws AMQException
+    {
+        if (_ackOp != null)
+        {
+            _ackOp.consolidate();
+            //already enlisted, after commit will reset regardless of outcome
+            _ackOp = null;
+        }
+
+        _txnBuffer.commit(_storeContext);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Sun Jan  7 15:11:53 2007
@@ -0,0 +1,208 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class NonTransactionalContext implements TransactionalContext
+{
+    private static final Logger _log = Logger.getLogger(NonTransactionalContext.class);
+
+    /**
+     * Channel is useful for logging
+     */
+    private final AMQChannel _channel;
+
+    /**
+     * Where to put undeliverable messages
+     */
+    private final List<RequiredDeliveryException> _returnMessages;
+
+    private Set<Long> _browsedAcks;
+
+    private final MessageStore _messageStore;
+
+    private StoreContext _storeContext;
+
+    /**
+     * Whether we are in a transaction
+     */
+    private boolean _inTran;
+
+    public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
+                                   List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks)
+    {
+        _channel = channel;
+        _storeContext = storeContext;
+        _returnMessages = returnMessages;
+        _messageStore = messageStore;
+        _browsedAcks = browsedAcks;
+    }
+
+    public void beginTranIfNecessary() throws AMQException
+    {
+        if (!_inTran)
+        {
+            _messageStore.beginTran(_storeContext);
+            _inTran = true;
+        }
+    }
+
+    public void commit() throws AMQException
+    {
+        // Does not apply to this context
+    }
+
+    public void rollback() throws AMQException
+    {
+        // Does not apply to this context
+    }
+
+    public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+    {
+        try
+        {
+            message.incrementReference();
+            queue.process(_storeContext, message);
+            //following check implements the functionality
+            //required by the 'immediate' flag:
+            message.checkDeliveredToConsumer();
+        }
+        catch (NoConsumersException e)
+        {
+            _returnMessages.add(e);
+        }
+    }
+
+    public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag,
+                                   boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap)
+            throws AMQException
+    {
+        if (multiple)
+        {
+            if (deliveryTag == 0)
+            {
+
+                //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero,
+                // tells the server to acknowledge all outstanding mesages.
+                _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" +
+                          unacknowledgedMessageMap.size());
+                unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+                {
+                    public boolean callback(UnacknowledgedMessage message) throws AMQException
+                    {
+                        if (!_browsedAcks.contains(deliveryTag))
+                        {
+                            if (_log.isDebugEnabled())
+                            {
+                                _log.debug("Discarding message: " + message.message.getMessageId());
+                            }
+                            message.discard(_storeContext);
+                        }
+                        else
+                        {
+                            _browsedAcks.remove(deliveryTag);
+                        }
+                        return false;
+                    }
+
+                    public void visitComplete()
+                    {
+                        unacknowledgedMessageMap.clear();
+                    }
+                });
+            }
+            else
+            {
+                if (!unacknowledgedMessageMap.contains(deliveryTag))
+                {
+                    throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+                }
+
+                LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
+                unacknowledgedMessageMap.drainTo(acked, deliveryTag);
+                for (UnacknowledgedMessage msg : acked)
+                {
+                    if (!_browsedAcks.contains(deliveryTag))
+                    {
+                        if (_log.isDebugEnabled())
+                        {
+                            _log.debug("Discarding message: " + msg.message.getMessageId());
+                        }
+                        msg.discard(_storeContext);
+                    }
+                    else
+                    {
+                        _browsedAcks.remove(deliveryTag);
+                    }
+                }
+            }
+        }
+        else
+        {
+            UnacknowledgedMessage msg;
+            msg = unacknowledgedMessageMap.remove(deliveryTag);
+
+            if (msg == null)
+            {
+                _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
+                          _channel.getChannelId());
+                throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
+                                       _channel.getChannelId());
+            }
+            msg.discard(_storeContext);
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
+                           msg.message.getMessageId());
+            }
+        }
+    }
+
+    public void messageFullyReceived(boolean persistent) throws AMQException
+    {
+        if (persistent)
+        {
+            _messageStore.commitTran(_storeContext);
+            _inTran = false;
+        }
+    }
+
+    public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException
+    {
+        _channel.processReturns(protocolSession);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java Sun Jan  7 15:11:53 2007
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+/**
+ * A transactional operation to store messages in an underlying persistent store. When this operation
+ * commits it will do everything to ensure that all messages are safely committed to persistent
+ * storage.
+ */
+public class StoreMessageOperation implements TxnOp
+{
+    private final MessageStore _messsageStore;
+
+    public StoreMessageOperation(MessageStore messageStore)
+    {
+        _messsageStore = messageStore;
+    }
+
+    public void prepare(StoreContext context) throws AMQException
+    {
+    }
+
+    public void undoPrepare()
+    {
+    }
+
+    public void commit(StoreContext context) throws AMQException
+    {
+        _messsageStore.commitTran(context);
+    }
+
+    public void rollback(StoreContext context) throws AMQException
+    {
+        _messsageStore.abortTran(context);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Sun Jan  7 15:11:53 2007
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public interface TransactionalContext
+{
+    void beginTranIfNecessary() throws AMQException;
+
+    void commit() throws AMQException;
+
+    void rollback() throws AMQException;
+
+    void deliver(AMQMessage message, AMQQueue queue) throws AMQException;
+
+    void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
+                            UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
+
+    void messageFullyReceived(boolean persistent) throws AMQException;
+
+    void messageProcessed(AMQProtocolSession protocolSession) throws AMQException;
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,91 +22,63 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
 
 import java.util.ArrayList;
 import java.util.List;
 
 /**
  * Holds a list of TxnOp instance representing transactional
- * operations. 
+ * operations.
  */
 public class TxnBuffer
 {
-    private boolean _containsPersistentChanges = false;
-    private final MessageStore _store;
     private final List<TxnOp> _ops = new ArrayList<TxnOp>();
     private static final Logger _log = Logger.getLogger(TxnBuffer.class);
 
-    public TxnBuffer(MessageStore store)
-    {
-        _store = store;
-    }
-
-    public void containsPersistentChanges()
+    public TxnBuffer()
     {
-        _containsPersistentChanges = true;
     }
 
-    public void commit() throws AMQException
+    public void commit(StoreContext context) throws AMQException
     {
-        if (_containsPersistentChanges)
+        if (prepare(context))
         {
-            _log.debug("Begin Transaction.");
-            _store.beginTran();
-            if(prepare())
+            for (TxnOp op : _ops)
             {
-                _log.debug("Transaction Succeeded");
-                _store.commitTran();
-                for (TxnOp op : _ops)
-                {
-                    op.commit();
-                }
+                op.commit(context);
             }
-            else
-            {
-                _log.debug("Transaction Failed");
-                _store.abortTran();
-            }
-        }else{
-            if(prepare())
-            {
-                for (TxnOp op : _ops)
-                {
-                    op.commit();
-                }
-            }            
         }
         _ops.clear();
     }
 
-    private boolean prepare() 
-    {        
+    private boolean prepare(StoreContext context)
+    {
         for (int i = 0; i < _ops.size(); i++)
         {
             TxnOp op = _ops.get(i);
             try
             {
-                op.prepare();
+                op.prepare(context);
             }
-            catch(Exception e)
+            catch (Exception e)
             {
                 //compensate previously prepared ops
                 for(int j = 0; j < i; j++)
                 {
                     _ops.get(j).undoPrepare();
-                }    
+                }
                 return false;
             }
         }
         return true;
-    }   
+    }
 
-    public void rollback() throws AMQException
+    public void rollback(StoreContext context) throws AMQException
     {
         for (TxnOp op : _ops)
         {
-            op.rollback();
+            op.rollback(context);
         }
         _ops.clear();
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.txn;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
 
 /**
  * This provides the abstraction of an individual operation within a
@@ -29,14 +30,14 @@
 public interface TxnOp
 {
     /**
-     * Do the part of the operation that updates persistent state 
+     * Do the part of the operation that updates persistent state
      */
-    public void prepare() throws AMQException;
+    public void prepare(StoreContext context) throws AMQException;
     /**
      * Complete the operation started by prepare. Can now update in
      * memory state or make netork transfers.
      */
-    public void commit();
+    public void commit(StoreContext context) throws AMQException;
     /**
      * This is not the same as rollback. Unfortunately the use of an
      * in memory reference count as a locking mechanism and a test for
@@ -50,5 +51,5 @@
     /**
      * Rolls back the operation.
      */
-    public void rollback();
+    public void rollback(StoreContext context) throws AMQException;
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java Sun Jan  7 15:11:53 2007
@@ -56,7 +56,7 @@
                 }
                 catch (FailoverException e)
                 {
-                    _log.info("Failover exception caught during operation");
+                    _log.info("Failover exception caught during operation: " + e, e);
                 }
             }
         }

Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -254,7 +254,7 @@
 
     static BasicDeliverBody createBasicDeliverBody()
     {
-        BasicDeliverBody body = new BasicDeliverBody();
+        BasicDeliverBody body = new BasicDeliverBody((byte)8, (byte)0);
         body.consumerTag = "myConsumerTag";
         body.deliveryTag = 1;
         body.exchange = "myExchange";

Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.framing;
 
+import junit.framework.TestCase;
 import org.apache.mina.common.ByteBuffer;
 
 import java.io.BufferedReader;
@@ -29,8 +30,6 @@
 import java.util.Enumeration;
 import java.util.Properties;
 
-import junit.framework.TestCase;
-
 public class FieldTableTest extends TestCase
 {
 
@@ -47,7 +46,7 @@
                    EncodingUtils.encodedLongStringLength(value);
 
         assertEquals(table.getEncodedSize(), size);
-        
+
         key = "Integer";
         Integer number = new Integer(60);
         table.put(key, number);
@@ -87,7 +86,7 @@
         doTestEncoding(load("FieldTableTest2.properties"));
     }
     */
-    void doTestEncoding(FieldTable table) throws AMQFrameDecodingException
+    void doTestEncoding(FieldTable table) throws AMQFrameDecodingException, AMQProtocolVersionException
     {
         assertEquivalent(table, encodeThenDecode(table));
     }
@@ -102,7 +101,7 @@
         }
     }
 
-    FieldTable encodeThenDecode(FieldTable table) throws AMQFrameDecodingException
+    FieldTable encodeThenDecode(FieldTable table) throws AMQFrameDecodingException, AMQProtocolVersionException
     {
         ContentHeaderBody header = new ContentHeaderBody();
         header.classId = 6;

Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/requestreply1/VmRequestReply.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/requestreply1/VmRequestReply.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/requestreply1/VmRequestReply.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/requestreply1/VmRequestReply.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,12 +20,9 @@
  */
 package org.apache.qpid.requestreply1;
 
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.test.VMBrokerSetup;
-import org.apache.log4j.Logger;
-
 import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.testutil.VMBrokerSetup;
 
 public class VmRequestReply extends TestCase
 {

Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,14 +20,11 @@
  */
 package org.apache.qpid.test.unit.jndi.referenceabletest;
 
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.test.VMBrokerSetup;
+import junit.framework.TestCase;
+import org.apache.qpid.testutil.VMBrokerSetup;
 
 import javax.naming.NameAlreadyBoundException;
 import javax.naming.NoInitialContextException;
-
-import junit.framework.TestCase;
 
 /**
  * Usage: To run these you need to have the sun JNDI SPI for the FileSystem.

Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java Sun Jan  7 15:11:53 2007
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,20 +20,13 @@
  */
 package org.apache.qpid.weblogic;
 
-import org.apache.qpid.jms.*;
 import org.apache.log4j.Logger;
 
-import javax.naming.NamingException;
-import javax.naming.InitialContext;
-import javax.naming.Context;
 import javax.jms.*;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
 import java.util.Hashtable;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.Reader;
-import java.io.FileReader;
 
 /**
  * Created by IntelliJ IDEA.

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Sun Jan  7 15:11:53 2007
@@ -24,7 +24,7 @@
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.client.transport.TransportConnection;
 
 import javax.jms.*;
 
@@ -41,6 +41,7 @@
     protected void setUp() throws Exception
     {
         super.setUp();
+        TransportConnection.createVMBroker(1);
         _connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test");
         _topic = new AMQTopic("mytopic");
         _queue = new AMQQueue("myqueue");
@@ -48,6 +49,7 @@
 
     protected void tearDown() throws Exception
     {
+        super.tearDown();
         try
         {
             _connection.close();
@@ -55,8 +57,8 @@
         catch (JMSException e)
         {
             //ignore 
-        }
-        super.tearDown();
+        }        
+        TransportConnection.killAllVMBrokers();
     }
 
     /**
@@ -195,6 +197,6 @@
 
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new junit.framework.TestSuite(AMQConnectionTest.class));
+        return new junit.framework.TestSuite(AMQConnectionTest.class);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java Sun Jan  7 15:11:53 2007
@@ -20,17 +20,17 @@
  */
 package org.apache.qpid.test.unit.client.channelclose;
 
+import junit.framework.TestCase;
+import junit.textui.TestRunner;
+import org.apache.log4j.Logger;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.testutil.VMBrokerSetup;
-import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
 
 import javax.jms.*;
 import java.util.ArrayList;
 import java.util.List;
 
-import junit.framework.TestCase;
-import junit.textui.TestRunner;
 
 /**
  * Due to bizarre exception handling all sessions are closed if you get
@@ -64,6 +64,7 @@
     {
         super.setUp();
 
+        TransportConnection.createVMBroker(1);
         _connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path");
 
         _destination1 = new AMQQueue("q1", true);
@@ -192,7 +193,15 @@
         {
             while (received.size() < count)
             {
-                received.wait();
+                try
+                {
+                    received.wait();
+                }
+                catch (InterruptedException e)
+                {
+                    _log.info("Interrupted: " + e);
+                    throw e;
+                }
             }
         }
     }
@@ -209,6 +218,6 @@
 
     public static junit.framework.Test suite()
     {
-        return new VMBrokerSetup(new junit.framework.TestSuite(ChannelCloseOkTest.class));
+        return new junit.framework.TestSuite(ChannelCloseOkTest.class);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Sun Jan  7 15:11:53 2007
@@ -1,11 +1,23 @@
-/**
- * User: Robert Greig
- * Date: 12-Dec-2006
- ******************************************************************************
- * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
- * this program may be photocopied reproduced or translated to another
- * program language without prior written consent of JP Morgan Chase Ltd
- ******************************************************************************/
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.test.unit.message;
 
 import junit.framework.TestCase;