You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC

svn commit: r1225178 [2/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/ bdbstore/src/test/ bdbstore/src/test/jav...

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Wed Dec 28 13:02:41 2011
@@ -34,6 +34,8 @@ import org.apache.qpid.server.virtualhos
 
 import javax.management.JMException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 
@@ -41,8 +43,52 @@ public class DirectExchange extends Abst
 {
     private static final Logger _logger = Logger.getLogger(DirectExchange.class);
 
-    private final ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>> _bindingsByKey =
-            new ConcurrentHashMap<String, CopyOnWriteArraySet<Binding>>();
+    private static final class BindingSet
+    {
+        private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>();
+        private List<BaseQueue> _queues = new ArrayList<BaseQueue>();
+
+        public synchronized void addBinding(Binding binding)
+        {
+            _bindings.add(binding);
+            recalculateQueues();
+        }
+
+
+        public synchronized void removeBinding(Binding binding)
+        {
+            _bindings.remove(binding);
+            recalculateQueues();
+        }
+
+        private void recalculateQueues()
+        {
+            List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
+
+            for(Binding b : _bindings)
+            {
+                if(!queues.contains(b.getQueue()))
+                {
+                    queues.add(b.getQueue());
+                }
+            }
+            _queues = queues;
+        }
+
+
+        public List<BaseQueue> getQueues()
+        {
+            return _queues;
+        }
+
+        public CopyOnWriteArraySet<Binding> getBindings()
+        {
+            return _bindings;
+        }
+    }
+
+    private final ConcurrentHashMap<String, BindingSet> _bindingsByKey =
+            new ConcurrentHashMap<String, BindingSet>();
 
     public static final ExchangeType<DirectExchange> TYPE = new ExchangeType<DirectExchange>()
     {
@@ -91,33 +137,20 @@ public class DirectExchange extends Abst
     }
 
 
-    public ArrayList<? extends BaseQueue> doRoute(InboundMessage payload)
+    public List<? extends BaseQueue> doRoute(InboundMessage payload)
     {
 
         final String routingKey = payload.getRoutingKey();
 
-        CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey);
+        BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey);
 
         if(bindings != null)
         {
-            final ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>(bindings.size());
-
-            for(Binding binding : bindings)
-            {
-                queues.add(binding.getQueue());
-                binding.incrementMatches();
-            }
-
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Publishing message to queue " + queues);
-            }
-
-            return queues;
+            return bindings.getQueues();
         }
         else
         {
-            return new ArrayList<BaseQueue>(0); 
+            return Collections.emptyList();
         }
 
 
@@ -132,16 +165,10 @@ public class DirectExchange extends Abst
     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
         String bindingKey = (routingKey == null) ? "" : routingKey.toString();
-        CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+        BindingSet bindings = _bindingsByKey.get(bindingKey);
         if(bindings != null)
         {
-            for(Binding binding : bindings)
-            {
-                if(binding.getQueue().equals(queue))
-                {
-                    return true;
-                }
-            }
+            return bindings.getQueues().contains(queue);
         }
         return false;
 
@@ -150,22 +177,20 @@ public class DirectExchange extends Abst
     public boolean isBound(AMQShortString routingKey)
     {
         String bindingKey = (routingKey == null) ? "" : routingKey.toString();
-        CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
-        return bindings != null && !bindings.isEmpty();
+        BindingSet bindings = _bindingsByKey.get(bindingKey);
+        return bindings != null && !bindings.getQueues().isEmpty();
     }
 
     public boolean isBound(AMQQueue queue)
     {
 
-        for (CopyOnWriteArraySet<Binding> bindings : _bindingsByKey.values())
+        for (BindingSet bindings : _bindingsByKey.values())
         {
-            for(Binding binding : bindings)
+            if(bindings.getQueues().contains(queue))
             {
-                if(binding.getQueue().equals(queue))
-                {
-                    return true;
-                }
+                return true;
             }
+
         }
         return false;
     }
@@ -184,19 +209,19 @@ public class DirectExchange extends Abst
         assert queue != null;
         assert routingKey != null;
 
-        CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(bindingKey);
+        BindingSet bindings = _bindingsByKey.get(bindingKey);
 
         if(bindings == null)
         {
-            bindings = new CopyOnWriteArraySet<Binding>();
-            CopyOnWriteArraySet<Binding> newBindings;
+            bindings = new BindingSet();
+            BindingSet newBindings;
             if((newBindings = _bindingsByKey.putIfAbsent(bindingKey, bindings)) != null)
             {
                 bindings = newBindings;
             }
         }
 
-        bindings.add(binding);
+        bindings.addBinding(binding);
 
     }
 
@@ -204,10 +229,10 @@ public class DirectExchange extends Abst
     {
         assert binding != null;
 
-        CopyOnWriteArraySet<Binding> bindings = _bindingsByKey.get(binding.getBindingKey());
+        BindingSet bindings = _bindingsByKey.get(binding.getBindingKey());
         if(bindings != null)
         {
-            bindings.remove(binding);
+            bindings.removeBinding(binding);
         }
 
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Wed Dec 28 13:02:41 2011
@@ -36,6 +36,7 @@ import org.apache.qpid.server.configurat
 import javax.management.JMException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 
 public interface Exchange extends ExchangeReferrer, ExchangeConfig
 {
@@ -70,7 +71,7 @@ public interface Exchange extends Exchan
      *
      * @return list of queues to which to route the message.
      */
-    ArrayList<? extends BaseQueue> route(InboundMessage message);
+    List<? extends BaseQueue> route(InboundMessage message);
 
 
     /**

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Wed Dec 28 13:02:41 2011
@@ -37,8 +37,11 @@ import org.apache.qpid.server.filter.JMS
 import org.apache.qpid.server.message.InboundMessage;
 
 import javax.management.JMException;
+import java.sql.Array;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
 import java.lang.ref.WeakReference;
 
 public class TopicExchange extends AbstractExchange
@@ -77,8 +80,6 @@ public class TopicExchange extends Abstr
 
     private static final Logger _logger = Logger.getLogger(TopicExchange.class);
 
-
-
     private final TopicParser _parser = new TopicParser();
 
     private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
@@ -175,7 +176,6 @@ public class TopicExchange extends Abstr
             _bindings.put(binding, args);
         }
 
-
     }
 
     private JMSSelectorFilter createSelectorFilter(final FieldTable args) throws AMQInvalidArgumentException
@@ -201,14 +201,23 @@ public class TopicExchange extends Abstr
     public ArrayList<BaseQueue> doRoute(InboundMessage payload)
     {
 
-        final AMQShortString routingKey = payload.getRoutingKey() == null
+        final AMQShortString routingKey = payload.getRoutingKeyShortString() == null
                                           ? AMQShortString.EMPTY_STRING
-                                          : new AMQShortString(payload.getRoutingKey());
+                                          : payload.getRoutingKeyShortString();
+
+        final Collection<AMQQueue> matchedQueues = getMatchedQueues(payload, routingKey);
 
-        // The copy here is unfortunate, but not too bad relevant to the amount of
-        // things created and copied in getMatchedQueues
-        ArrayList<BaseQueue> queues = new ArrayList<BaseQueue>();
-        queues.addAll(getMatchedQueues(payload, routingKey));
+        ArrayList<BaseQueue> queues;
+
+        if(matchedQueues.getClass() == ArrayList.class)
+        {
+            queues = (ArrayList) matchedQueues;
+        }
+        else
+        {
+            queues = new ArrayList<BaseQueue>();
+            queues.addAll(matchedQueues);
+        }
 
         if(queues == null || queues.isEmpty())
         {
@@ -325,25 +334,28 @@ public class TopicExchange extends Abstr
     {
 
         Collection<TopicMatcherResult> results = _parser.parse(routingKey);
-        if(results.isEmpty())
-        {
-            return Collections.EMPTY_SET;
-        }
-        else
+        switch(results.size())
         {
-            Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
-            for(TopicMatcherResult result : results)
-            {
-                TopicExchangeResult res = (TopicExchangeResult)result;
+            case 0:
+                return Collections.EMPTY_SET;
+            case 1:
+                TopicMatcherResult[] resultQueues = new TopicMatcherResult[1];
+                results.toArray(resultQueues);
+                return ((TopicExchangeResult)resultQueues[0]).processMessage(message, null);
+            default:
+                Collection<AMQQueue> queues = new HashSet<AMQQueue>();
+                for(TopicMatcherResult result : results)
+                {
+                    TopicExchangeResult res = (TopicExchangeResult)result;
+
+                    for(Binding b : res.getBindings())
+                    {
+                        b.incrementMatches();
+                    }
 
-                for(Binding b : res.getBindings())
-                {
-                    b.incrementMatches();
+                    queues = res.processMessage(message, queues);
                 }
-                
-                queues = res.processMessage(message, queues);
-            }
-            return queues;
+                return queues;
         }
 
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java Wed Dec 28 13:02:41 2011
@@ -39,6 +39,7 @@ public final class TopicExchangeResult i
     private final List<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
     private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
     private final ConcurrentHashMap<AMQQueue, Map<MessageFilter,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter, Integer>>();
+    private volatile ArrayList<AMQQueue> _unfilteredQueueList = new ArrayList<AMQQueue>(0);
 
     public void addUnfilteredQueue(AMQQueue queue)
     {
@@ -46,6 +47,9 @@ public final class TopicExchangeResult i
         if(instances == null)
         {
             _unfilteredQueues.put(queue, 1);
+            ArrayList<AMQQueue> newList = new ArrayList<AMQQueue>(_unfilteredQueueList);
+            newList.add(queue);
+            _unfilteredQueueList = newList;
         }
         else
         {
@@ -59,6 +63,10 @@ public final class TopicExchangeResult i
         if(instances == 1)
         {
             _unfilteredQueues.remove(queue);
+            ArrayList<AMQQueue> newList = new ArrayList<AMQQueue>(_unfilteredQueueList);
+            newList.remove(queue);
+            _unfilteredQueueList = newList;
+
         }
         else
         {
@@ -166,7 +174,7 @@ public final class TopicExchangeResult i
         {
             if(_filteredQueues.isEmpty())
             {
-                return new ArrayList<AMQQueue>(_unfilteredQueues.keySet());
+                return _unfilteredQueueList;
             }
             else
             {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/topic/TopicMatcherDFAState.java Wed Dec 28 13:02:41 2011
@@ -77,7 +77,7 @@ public class TopicMatcherDFAState
         }
         if(nextState == null)
         {
-            return Collections.EMPTY_SET;
+            return Collections.EMPTY_LIST;
         }
         // Shortcut if we are at a looping terminal state
         if((nextState == this) && (_nextStateMap.size() == 1) && _nextStateMap.containsKey(TopicWord.ANY_WORD))

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java Wed Dec 28 13:02:41 2011
@@ -44,27 +44,9 @@ import org.apache.qpid.server.transport.
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageAcceptMode;
-import org.apache.qpid.transport.MessageAcquireMode;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageReject;
-import org.apache.qpid.transport.MessageRejectCode;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionException;
-import org.apache.qpid.transport.SessionListener;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import org.apache.qpid.transport.*;
+
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -365,7 +347,8 @@ public class Bridge implements BridgeCon
             // TODO - deal with exchange not existing
 
             DeliveryProperties delvProps = null;
-            if(xfr.getHeader() != null && (delvProps = xfr.getHeader().get(DeliveryProperties.class)) != null && delvProps.hasTtl() && !delvProps.hasExpiration())
+            if(xfr.getHeader() != null && (delvProps = xfr.getHeader().getDeliveryProperties()) != null && delvProps.hasTtl() &&
+               !delvProps.hasExpiration())
             {
                 delvProps.setExpiration(System.currentTimeMillis() + delvProps.getTtl());
             }
@@ -377,7 +360,7 @@ public class Bridge implements BridgeCon
             storeMessage.flushToStore();
             MessageTransferMessage message = new MessageTransferMessage(storeMessage, ((ServerSession)_session).getReference());
 
-            ArrayList<? extends BaseQueue> queues = exchange.route(message);
+            List<? extends BaseQueue> queues = exchange.route(message);
 
 
 
@@ -391,7 +374,7 @@ public class Bridge implements BridgeCon
                 {
                     if(xfr.getAcceptMode() == MessageAcceptMode.EXPLICIT)
                     {
-                        RangeSet rejects = new RangeSet();
+                        RangeSet rejects = RangeSetFactory.createRangeSet();
                         rejects.add(xfr.getId());
                         MessageReject reject = new MessageReject(rejects, MessageRejectCode.UNROUTABLE, "Unroutable");
                         ssn.invoke(reject);
@@ -428,7 +411,7 @@ public class Bridge implements BridgeCon
         }
 
 
-        private void enqueue(final ServerMessage message, final ArrayList<? extends BaseQueue> queues)
+        private void enqueue(final ServerMessage message, final List<? extends BaseQueue> queues)
         {
             _transaction.enqueue(queues,message, new ServerTransaction.Action()
                         {
@@ -456,8 +439,7 @@ public class Bridge implements BridgeCon
                             {
                                 // NO-OP
                             }
-                        });
-
+                        }, 0L);
         }
 
         public void exception(final Session session, final SessionException exception)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/AbstractFlowCreditManager.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
 */
 package org.apache.qpid.server.flow;
 
+import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.Set;
 import java.util.HashSet;
@@ -27,13 +28,16 @@ import java.util.HashSet;
 public abstract class AbstractFlowCreditManager implements FlowCreditManager
 {
     protected final AtomicBoolean _suspended = new AtomicBoolean(false);
-    private final Set<FlowCreditManagerListener> _listeners = new HashSet<FlowCreditManagerListener>();
+    private final ArrayList<FlowCreditManagerListener> _listeners = new ArrayList<FlowCreditManagerListener>();
 
     public final void addStateListener(FlowCreditManagerListener listener)
     {
         synchronized(_listeners)
         {
-            _listeners.add(listener);
+            if(!_listeners.contains(listener))
+            {
+                _listeners.add(listener);
+            }
         }
     }
 
@@ -49,9 +53,10 @@ public abstract class AbstractFlowCredit
     {
         synchronized(_listeners)
         {
-            for(FlowCreditManagerListener listener : _listeners)
+            final int size = _listeners.size();
+            for(int i = 0; i<size; i++)
             {
-                listener.creditStateChanged(!suspended);
+                _listeners.get(i).creditStateChanged(!suspended);
             }
         }
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Wed Dec 28 13:02:41 2011
@@ -52,7 +52,6 @@ public class BasicConsumeMethodHandler i
         AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
 
         AMQChannel channel = protocolConnection.getChannel(channelId);
-
         VirtualHost vHost = protocolConnection.getVirtualHost();
 
         if (channel == null)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Wed Dec 28 13:02:41 2011
@@ -65,7 +65,6 @@ public class ChannelCloseHandler impleme
         {
             throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
         }
-
         session.closeChannel(channelId);
         // Client requested closure so we don't wait for ok we send it
         stateManager.getProtocolSession().closeChannelOk(channelId);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Wed Dec 28 13:02:41 2011
@@ -55,7 +55,6 @@ public class ChannelFlowHandler implemen
         {
             throw body.getChannelNotFoundException(channelId);
         }
-
         channel.setSuspended(!body.getActive());
         _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Wed Dec 28 13:02:41 2011
@@ -49,7 +49,6 @@ public class ConnectionCloseMethodHandle
     public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-
         if (_logger.isInfoEnabled())
         {
             _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Wed Dec 28 13:02:41 2011
@@ -65,7 +65,6 @@ public class ExchangeBoundHandler implem
     public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-        
         VirtualHost virtualHost = session.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
         MethodRegistry methodRegistry = session.getMethodRegistry();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Wed Dec 28 13:02:41 2011
@@ -85,6 +85,13 @@ public class QueueDeclareHandler impleme
         
         //TODO: do we need to check that the queue already exists with exactly the same "configuration"?
 
+        AMQChannel channel = protocolConnection.getChannel(channelId);
+
+        if (channel == null)
+        {
+            throw body.getChannelNotFoundException(channelId);
+        }
+
         synchronized (queueRegistry)
         {
             queue = queueRegistry.getQueue(queueName);
@@ -183,12 +190,6 @@ public class QueueDeclareHandler impleme
             }
 
 
-            AMQChannel channel = protocolConnection.getChannel(channelId);
-
-            if (channel == null)
-            {
-                throw body.getChannelNotFoundException(channelId);
-            }
 
             //set this as the default queue on the channel:
             channel.setDefaultQueue(queue);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Wed Dec 28 13:02:41 2011
@@ -64,15 +64,17 @@ public class QueueDeleteHandler implemen
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
         DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
 
+
+        AMQChannel channel = protocolConnection.getChannel(channelId);
+
+        if (channel == null)
+        {
+            throw body.getChannelNotFoundException(channelId);
+        }
+
         AMQQueue queue;
         if (body.getQueue() == null)
         {
-            AMQChannel channel = protocolConnection.getChannel(channelId);
-
-            if (channel == null)
-            {
-                throw body.getChannelNotFoundException(channelId);
-            }
 
             //get the default queue on the channel:
             queue = channel.getDefaultQueue();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Wed Dec 28 13:02:41 2011
@@ -63,17 +63,14 @@ public class QueuePurgeHandler implement
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
         AMQChannel channel = protocolConnection.getChannel(channelId);
-
-
+        if (channel == null)
+        {
+            throw body.getChannelNotFoundException(channelId);
+        }
         AMQQueue queue;
         if(body.getQueue() == null)
         {
 
-           if (channel == null)
-           {
-               throw body.getChannelNotFoundException(channelId);
-           }
-
            //get the default queue on the channel:
            queue = channel.getDefaultQueue();
             

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Wed Dec 28 13:02:41 2011
@@ -66,14 +66,15 @@ public class QueueUnbindHandler implemen
         final AMQQueue queue;
         final AMQShortString routingKey;
 
-        if (body.getQueue() == null)
+
+        AMQChannel channel = session.getChannel(channelId);
+        if (channel == null)
         {
-            AMQChannel channel = session.getChannel(channelId);
+            throw body.getChannelNotFoundException(channelId);
+        }
 
-            if (channel == null)
-            {
-                throw body.getChannelNotFoundException(channelId);
-            }
+        if (body.getQueue() == null)
+        {
 
             queue = channel.getDefaultQueue();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java Wed Dec 28 13:02:41 2011
@@ -37,7 +37,7 @@ import java.nio.ByteBuffer;
 /**
  * A deliverable message.
  */
-public class AMQMessage extends AbstractServerMessageImpl
+public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
 {
     /** Used for debugging purposes. */
     private static final Logger _log = Logger.getLogger(AMQMessage.class);
@@ -62,10 +62,6 @@ public class AMQMessage extends Abstract
     private Object _sessionIdentifier;
     private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
 
-    private final StoredMessage<MessageMetaData> _handle;
-
-    WeakReference<AMQChannel> _channelRef;
-
     public AMQMessage(StoredMessage<MessageMetaData> handle)
     {
         this(handle, null);
@@ -75,7 +71,7 @@ public class AMQMessage extends Abstract
     {
         super(handle);
 
-        _handle = handle;
+
         final MessageMetaData metaData = handle.getMetaData();
         _size = metaData.getContentSize();
         final MessagePublishInfo messagePublishInfo = metaData.getMessagePublishInfo();
@@ -84,8 +80,6 @@ public class AMQMessage extends Abstract
         {
             _flags |= IMMEDIATE;
         }
-        
-        _channelRef = channelRef;
     }
 
     public void setExpiration(final long expiration)
@@ -97,7 +91,7 @@ public class AMQMessage extends Abstract
 
     public MessageMetaData getMessageMetaData()
     {
-        return _handle.getMetaData();
+        return getStoredMessage().getMetaData();
     }
 
     public ContentHeaderBody getContentHeaderBody() throws AMQException
@@ -107,7 +101,7 @@ public class AMQMessage extends Abstract
 
     public Long getMessageId()
     {
-        return _handle.getMessageNumber();
+        return getStoredMessage().getMessageNumber();
     }
 
     /**
@@ -219,9 +213,9 @@ public class AMQMessage extends Abstract
         return new AMQMessageReference(this);
     }
 
-    public Long getMessageNumber()
+    public long getMessageNumber()
     {
-        return getMessageId();
+        return getStoredMessage().getMessageNumber();
     }
 
 
@@ -248,16 +242,13 @@ public class AMQMessage extends Abstract
 
     public int getContent(ByteBuffer buf, int offset)
     {
-        return _handle.getContent(offset, buf);
+        return getStoredMessage().getContent(offset, buf);
     }
 
-    public StoredMessage<MessageMetaData> getStoredMessage()
+
+    public ByteBuffer getContent(int offset, int size)
     {
-        return _handle;
+        return getStoredMessage().getContent(offset, size);
     }
 
-    public SessionConfig getSessionConfig()
-    {
-        return _channelRef == null ? null : ((SessionConfig) _channelRef.get());
-   }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java Wed Dec 28 13:02:41 2011
@@ -21,19 +21,30 @@
 package org.apache.qpid.server.message;
 
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoredMessage;
 
-public abstract class AbstractServerMessageImpl implements ServerMessage
+public abstract class AbstractServerMessageImpl<T extends StorableMessageMetaData> implements ServerMessage<T>
 {
-    private final AtomicInteger _referenceCount = new AtomicInteger(0);
-    private final StoredMessage<?> _handle;
 
-    public AbstractServerMessageImpl(StoredMessage<?> handle)
+    private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
+            AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
+
+    private volatile int _referenceCount = 0;
+    private final StoredMessage<T> _handle;
+
+    public AbstractServerMessageImpl(StoredMessage<T> handle)
     {
         _handle = handle;
     }
 
+    public StoredMessage<T> getStoredMessage()
+    {
+        return _handle;
+    }
+
     public boolean incrementReference()
     {
         return incrementReference(1);
@@ -41,9 +52,9 @@ public abstract class AbstractServerMess
 
     public boolean incrementReference(int count)
     {
-        if(_referenceCount.addAndGet(count) <= 0)
+        if(_refCountUpdater.addAndGet(this, count) <= 0)
         {
-            _referenceCount.addAndGet(-count);
+            _refCountUpdater.addAndGet(this, -count);
             return false;
         }
         else
@@ -62,7 +73,7 @@ public abstract class AbstractServerMess
      */
     public void decrementReference()
     {
-        int count = _referenceCount.decrementAndGet();
+        int count = _refCountUpdater.decrementAndGet(this);
 
         // note that the operation of decrementing the reference count and then removing the message does not
         // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
@@ -73,7 +84,7 @@ public abstract class AbstractServerMess
             // set the reference count way below 0 so that we can detect that the message has been deleted
             // this is to guard against the message being spontaneously recreated (from the mgmt console)
             // by copying from other queues at the same time as it is being removed.
-            _referenceCount.set(Integer.MIN_VALUE/2);
+            _refCountUpdater.set(this,Integer.MIN_VALUE/2);
 
             // must check if the handle is null since there may be cases where we decide to throw away a message
             // and the handle has not yet been constructed
@@ -99,6 +110,6 @@ public abstract class AbstractServerMess
 
     protected int getReferenceCount()
     {
-        return _referenceCount.get();
+        return _referenceCount;
     }
 }
\ No newline at end of file

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/EnqueableMessage.java Wed Dec 28 13:02:41 2011
@@ -20,8 +20,11 @@
 */
 package org.apache.qpid.server.message;
 
+import org.apache.qpid.server.store.StoredMessage;
+
 public interface EnqueableMessage
 {
-    Long getMessageNumber();
+    long getMessageNumber();
     boolean isPersistent();
+    StoredMessage getStoredMessage();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/InboundMessage.java Wed Dec 28 13:02:41 2011
@@ -22,10 +22,12 @@ package org.apache.qpid.server.message;
 
 
 import org.apache.qpid.server.queue.Filterable;
+import org.apache.qpid.framing.AMQShortString;
 
 public interface InboundMessage extends Filterable
 {
     String getRoutingKey();
+    AMQShortString getRoutingKeyShortString();
 
     AMQMessageHeader getMessageHeader();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java Wed Dec 28 13:02:41 2011
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 public interface MessageContentSource
 {
     public int getContent(ByteBuffer buf, int offset);
+    public ByteBuffer getContent(int offset, int size);
 
     long getSize();
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java Wed Dec 28 13:02:41 2011
@@ -30,9 +30,12 @@ import org.apache.qpid.transport.Message
 import org.apache.qpid.transport.Struct;
 import org.apache.qpid.transport.codec.BBEncoder;
 import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.framing.AMQShortString;
 
 import java.nio.ByteBuffer;
 import java.lang.ref.SoftReference;
+import java.util.ArrayList;
+import java.util.List;
 
 public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMessage
 {
@@ -42,7 +45,6 @@ public class MessageMetaData_0_10 implem
     private MessageTransferHeader _messageHeader;
     private long _arrivalTime;
     private int _bodySize;
-    private volatile SoftReference<ByteBuffer> _body;
 
     private static final int ENCODER_SIZE = 1 << 10;
 
@@ -53,21 +55,16 @@ public class MessageMetaData_0_10 implem
 
     public MessageMetaData_0_10(MessageTransfer xfr)
     {
-        this(xfr.getHeader(), xfr.getBodySize(), xfr.getBody(), System.currentTimeMillis());
+        this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis());
     }
 
     private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
     {
-        this(header, bodySize, null, arrivalTime);
-    }
-
-    private MessageMetaData_0_10(Header header, int bodySize, ByteBuffer xfrBody, long arrivalTime)
-    {
         _header = header;
         if(_header != null)
         {
-            _deliveryProps = _header.get(DeliveryProperties.class);
-            _messageProps = _header.get(MessageProperties.class);
+            _deliveryProps = _header.getDeliveryProperties();
+            _messageProps = _header.getMessageProperties();
         }
         else
         {
@@ -78,21 +75,6 @@ public class MessageMetaData_0_10 implem
         _arrivalTime = arrivalTime;
         _bodySize = bodySize;
 
-
-
-        if(xfrBody == null)
-        {
-            _body = null;
-        }
-        else
-        {
-            ByteBuffer body = ByteBuffer.allocate(_bodySize);
-            body.put(xfrBody);
-            body.flip();
-            _body = new SoftReference<ByteBuffer>(body);
-        }
-
-
     }
 
 
@@ -122,16 +104,39 @@ public class MessageMetaData_0_10 implem
 
         encoder.writeInt64(_arrivalTime);
         encoder.writeInt32(_bodySize);
-        Struct[] headers = _header == null ? new Struct[0] : _header.getStructs();
-        encoder.writeInt32(headers.length);
+        int headersLength = 0;
+        if(_header.getDeliveryProperties() != null)
+        {
+            headersLength++;
+        }
+        if(_header.getMessageProperties() != null)
+        {
+            headersLength++;
+        }
+        if(_header.getNonStandardProperties() != null)
+        {
+            headersLength += _header.getNonStandardProperties().size();
+        }
 
+        encoder.writeInt32(headersLength);
 
-        for(Struct header : headers)
+        if(_header.getDeliveryProperties() != null)
         {
-            encoder.writeStruct32(header);
-
+            encoder.writeStruct32(_header.getDeliveryProperties());
+        }
+        if(_header.getMessageProperties() != null)
+        {
+            encoder.writeStruct32(_header.getMessageProperties());
         }
+        if(_header.getNonStandardProperties() != null)
+        {
 
+            for(Struct header : _header.getNonStandardProperties())
+            {
+                encoder.writeStruct32(header);
+            }
+
+        }
         ByteBuffer buf = encoder.buffer();
         return buf;
     }
@@ -173,6 +178,11 @@ public class MessageMetaData_0_10 implem
         return _deliveryProps == null ? null : _deliveryProps.getRoutingKey();
     }
 
+    public AMQShortString getRoutingKeyShortString()
+    {
+        return AMQShortString.valueOf(getRoutingKey());
+    }
+
     public AMQMessageHeader getMessageHeader()
     {
         return _messageHeader;
@@ -210,17 +220,6 @@ public class MessageMetaData_0_10 implem
         return _header;
     }
 
-    public ByteBuffer getBody()
-    {
-        ByteBuffer body = _body == null ? null : _body.get();
-        return body;
-    }
-
-    public void setBody(ByteBuffer body)
-    {
-        _body = new SoftReference<ByteBuffer>(body);
-    }
-
     private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
     {
         public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
@@ -232,14 +231,32 @@ public class MessageMetaData_0_10 implem
             int bodySize = decoder.readInt32();
             int headerCount = decoder.readInt32();
 
-            Struct[] headers = new Struct[headerCount];
+            DeliveryProperties deliveryProperties = null;
+            MessageProperties messageProperties = null;
+            List<Struct> otherProps = null;
 
             for(int i = 0 ; i < headerCount; i++)
             {
-                headers[i] = decoder.readStruct32();
+                Struct struct = decoder.readStruct32();
+                if(struct instanceof DeliveryProperties && deliveryProperties == null)
+                {
+                    deliveryProperties = (DeliveryProperties) struct;
+                }
+                else if(struct instanceof MessageProperties && messageProperties == null)
+                {
+                    messageProperties = (MessageProperties) struct;
+                }
+                else
+                {
+                    if(otherProps == null)
+                    {
+                        otherProps = new ArrayList<Struct>();
+
+                    }
+                    otherProps.add(struct);
+                }
             }
-
-            Header header = new Header(headers);
+            Header header = new Header(deliveryProperties,messageProperties,otherProps);
 
             return new MessageMetaData_0_10(header, bodySize, arrivalTime);
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java Wed Dec 28 13:02:41 2011
@@ -24,32 +24,35 @@ import org.apache.qpid.transport.*;
 import org.apache.qpid.server.configuration.SessionConfig;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.transport.ServerSession;
+import org.apache.qpid.framing.AMQShortString;
 
 import java.nio.ByteBuffer;
-import java.lang.ref.WeakReference;
 
 
-public class MessageTransferMessage extends AbstractServerMessageImpl implements InboundMessage
+public class MessageTransferMessage extends AbstractServerMessageImpl<MessageMetaData_0_10> implements InboundMessage
 {
-    private StoredMessage<MessageMetaData_0_10> _storeMessage;
-    private WeakReference<Session> _sessionRef;
 
-    public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, WeakReference<Session> sessionRef)
+    private Object _connectionRef;
+
+    public MessageTransferMessage(StoredMessage<MessageMetaData_0_10> storeMessage, Object connectionRef)
     {
         super(storeMessage);
-        _storeMessage = storeMessage;
-        _sessionRef = sessionRef;
+        _connectionRef = connectionRef;
     }
 
     private MessageMetaData_0_10 getMetaData()
     {
-        return _storeMessage.getMetaData();
+        return getStoredMessage().getMetaData();
     }
 
     public String getRoutingKey()
     {
         return getMetaData().getRoutingKey();
+    }
 
+    public AMQShortString getRoutingKeyShortString()
+    {
+        return AMQShortString.valueOf(getRoutingKey());
     }
 
     public AMQMessageHeader getMessageHeader()
@@ -91,9 +94,9 @@ public class MessageTransferMessage exte
         return new TransferMessageReference(this);
     }
 
-    public Long getMessageNumber()
+    public long getMessageNumber()
     {
-        return _storeMessage.getMessageNumber();
+        return getStoredMessage().getMessageNumber();
     }
 
     public long getArrivalTime()
@@ -103,7 +106,13 @@ public class MessageTransferMessage exte
 
     public int getContent(ByteBuffer buf, int offset)
     {
-        return _storeMessage.getContent(offset, buf);
+        return getStoredMessage().getContent(offset, buf);
+    }
+
+
+    public ByteBuffer getContent(int offset, int size)
+    {
+        return getStoredMessage().getContent(offset,size);
     }
 
     public Header getHeader()
@@ -113,32 +122,13 @@ public class MessageTransferMessage exte
 
     public ByteBuffer getBody()
     {
-        ByteBuffer body = getMetaData().getBody();
-        if(body == null && getSize() != 0l)
-        {
-            final int size = (int) getSize();
-            int pos = 0;
-            body = ByteBuffer.allocate(size);
-
-            while(pos < size)
-            {
-                pos += getContent(body, pos);
-            }
-
-            body.flip();
 
-            getMetaData().setBody(body.duplicate());
-        }
-        return body;
+        return  getContent(0, (int)getSize());
     }
 
-    public Session getSession()
+    public Object getConnectionReference()
     {
-        return _sessionRef == null ? null : _sessionRef.get();
+        return _connectionRef;
     }
 
-    public SessionConfig getSessionConfig()
-    {
-        return _sessionRef == null ? null : (ServerSession) _sessionRef.get();
-    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java Wed Dec 28 13:02:41 2011
@@ -23,13 +23,17 @@ package org.apache.qpid.server.message;
 import java.nio.ByteBuffer;
 
 import org.apache.qpid.server.configuration.SessionConfig;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
 
-public interface ServerMessage extends EnqueableMessage, MessageContentSource
+public interface ServerMessage<T extends StorableMessageMetaData> extends EnqueableMessage, MessageContentSource
 {
     String getRoutingKey();
 
     AMQMessageHeader getMessageHeader();
 
+    public StoredMessage<T> getStoredMessage();
+
     boolean isPersistent();
 
     long getSize();
@@ -40,11 +44,12 @@ public interface ServerMessage extends E
 
     MessageReference newReference();
 
-    Long getMessageNumber();
+    long getMessageNumber();
 
     long getArrivalTime();
 
     public int getContent(ByteBuffer buf, int offset);
 
-    SessionConfig getSessionConfig();
+    public ByteBuffer getContent(int offset, int size);
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java Wed Dec 28 13:02:41 2011
@@ -40,8 +40,8 @@ public class HeaderPropertiesConverter
         BasicContentHeaderProperties props = new BasicContentHeaderProperties();
 
         Header header = messageTransferMessage.getHeader();
-        DeliveryProperties deliveryProps = header.get(DeliveryProperties.class);
-        MessageProperties messageProps = header.get(MessageProperties.class);
+        DeliveryProperties deliveryProps = header.getDeliveryProperties();
+        MessageProperties messageProps = header.getMessageProperties();
 
         if(deliveryProps != null)
         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Wed Dec 28 13:02:41 2011
@@ -1,420 +1,420 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/*
- * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
- * Supported AMQP versions:
- *   8-0
- */
-package org.apache.qpid.server.output.amqp0_8;
-
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.message.AMQMessage;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
-import org.apache.qpid.server.output.HeaderPropertiesConverter;
-import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.transport.DeliveryProperties;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
-{
-
-    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
-
-    public static Factory getInstanceFactory()
-    {
-        return new Factory()
-        {
-
-            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
-            {
-                return new ProtocolOutputConverterImpl(session);
-            }
-        };
-    }
-
-
-    private final AMQProtocolSession _protocolSession;
-
-    private ProtocolOutputConverterImpl(AMQProtocolSession session)
-    {
-        _protocolSession = session;
-    }
-
-
-    public AMQProtocolSession getProtocolSession()
-    {
-        return _protocolSession;
-    }
-
-    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
-            throws AMQException
-    {
-        AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
-        writeMessageDelivery(entry, channelId, deliverBody);
-    }
-
-
-    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
-            throws AMQException
-    {
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
-        }
-        else
-        {
-            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
-            ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
-            chb.bodySize = message.getSize();
-            return chb;
-        }
-    }
-
-
-    private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
-            throws AMQException
-    {
-        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
-    }
-
-    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
-            throws AMQException
-    {
-
-
-        int bodySize = (int) message.getSize();
-
-        if(bodySize == 0)
-        {
-            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
-                                                                             contentHeaderBody);
-
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
-
-
-            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
-
-            int writtenSize = capacity;
-
-            AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
-
-            CompositeAMQBodyBlock
-                    compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
-            writeFrame(compositeBlock);
-
-            while(writtenSize < bodySize)
-            {
-                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
-                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
-                writtenSize += capacity;
-
-                writeFrame(new AMQFrame(channelId, body));
-            }
-        }
-    }
-
-    private class MessageContentSourceBody implements AMQBody
-    {
-        public static final byte TYPE = 3;
-        private int _length;
-        private MessageContentSource _message;
-        private int _offset;
-
-        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
-        {
-            _message = message;
-            _offset = offset;
-            _length = length;
-        }
-
-        public byte getFrameType()
-        {
-            return TYPE;
-        }
-
-        public int getSize()
-        {
-            return _length;
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            byte[] data = new byte[_length];
-
-            _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
-
-            buffer.write(data);
-        }
-
-        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
-
-    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
-    {
-
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      contentHeaderBody);
-        return contentHeader;
-    }
-
-
-    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
-    {
-        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
-        writeMessageDelivery(entry, channelId, deliver);
-    }
-
-
-    private AMQBody createEncodedDeliverBody(QueueEntry entry,
-                                              final long deliveryTag,
-                                              final AMQShortString consumerTag)
-            throws AMQException
-    {
-
-        final AMQShortString exchangeName;
-        final AMQShortString routingKey;
-
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            final AMQMessage message = (AMQMessage) entry.getMessage();
-            final MessagePublishInfo pb = message.getMessagePublishInfo();
-            exchangeName = pb.getExchange();
-            routingKey = pb.getRoutingKey();
-        }
-        else
-        {
-            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
-            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
-            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
-        }
-
-        final boolean isRedelivered = entry.isRedelivered();
-
-        final AMQBody returnBlock = new AMQBody()
-        {
-
-            public AMQBody _underlyingBody;
-
-            public AMQBody createAMQBody()
-            {
-                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
-                                                              deliveryTag,
-                                                              isRedelivered,
-                                                              exchangeName,
-                                                              routingKey);
-
-
-
-
-
-            }
-
-            public byte getFrameType()
-            {
-                return AMQMethodBody.TYPE;
-            }
-
-            public int getSize()
-            {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                return _underlyingBody.getSize();
-            }
-
-            public void writePayload(DataOutputStream buffer) throws IOException
-            {
-                if(_underlyingBody == null)
-                {
-                    _underlyingBody = createAMQBody();
-                }
-                _underlyingBody.writePayload(buffer);
-            }
-
-            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
-                throws AMQException
-            {
-                throw new AMQException("This block should never be dispatched!");
-            }
-        };
-        return returnBlock;
-    }
-
-    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
-            throws AMQException
-    {
-        final AMQShortString exchangeName;
-        final AMQShortString routingKey;
-
-        if(entry.getMessage() instanceof AMQMessage)
-        {
-            final AMQMessage message = (AMQMessage) entry.getMessage();
-            final MessagePublishInfo pb = message.getMessagePublishInfo();
-            exchangeName = pb.getExchange();
-            routingKey = pb.getRoutingKey();
-        }
-        else
-        {
-            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
-            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
-            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
-            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
-        }
-
-        final boolean isRedelivered = entry.isRedelivered();
-
-        BasicGetOkBody getOkBody =
-                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
-                                                    isRedelivered,
-                                                    exchangeName,
-                                                    routingKey,
-                                                    queueSize);
-
-        return getOkBody;
-    }
-
-    public byte getProtocolMinorVersion()
-    {
-        return getProtocolSession().getProtocolMinorVersion();
-    }
-
-    public byte getProtocolMajorVersion()
-    {
-        return getProtocolSession().getProtocolMajorVersion();
-    }
-
-    private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
-                                             int replyCode,
-                                             AMQShortString replyText) throws AMQException
-    {
-
-        BasicReturnBody basicReturnBody =
-                METHOD_REGISTRY.createBasicReturnBody(replyCode,
-                        replyText,
-                        messagePublishInfo.getExchange(),
-                        messagePublishInfo.getRoutingKey());
-
-
-        return basicReturnBody;
-    }
-
-    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
-            throws AMQException
-    {
-
-        AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
-
-        writeMessageDelivery(message, header, channelId, returnFrame);
-    }
-
-
-    public void writeFrame(AMQDataBlock block)
-    {
-        getProtocolSession().writeFrame(block);
-    }
-
-
-    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
-    {
-
-        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
-        writeFrame(basicCancelOkBody.generateFrame(channelId));
-
-    }
-
-
-    public static final class CompositeAMQBodyBlock extends AMQDataBlock
-    {
-        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
-
-        private final AMQBody _methodBody;
-        private final AMQBody _headerBody;
-        private final AMQBody _contentBody;
-        private final int _channel;
-
-
-        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
-        {
-            _channel = channel;
-            _methodBody = methodBody;
-            _headerBody = headerBody;
-            _contentBody = contentBody;
-
-        }
-
-        public long getSize()
-        {
-            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
-        }
-    }
-
-    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
-    {
-        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
-
-        private final AMQBody _methodBody;
-        private final AMQBody _headerBody;
-        private final int _channel;
-
-
-        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
-        {
-            _channel = channel;
-            _methodBody = methodBody;
-            _headerBody = headerBody;
-
-        }
-
-        public long getSize()
-        {
-            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
-        }
-
-        public void writePayload(DataOutputStream buffer) throws IOException
-        {
-            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
-        }
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+/*
+ * This file is auto-generated by Qpid Gentools v.0.1 - do not modify.
+ * Supported AMQP versions:
+ *   8-0
+ */
+package org.apache.qpid.server.output.amqp0_8;
+
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+
+    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+
+    public static Factory getInstanceFactory()
+    {
+        return new Factory()
+        {
+
+            public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+            {
+                return new ProtocolOutputConverterImpl(session);
+            }
+        };
+    }
+
+
+    private final AMQProtocolSession _protocolSession;
+
+    private ProtocolOutputConverterImpl(AMQProtocolSession session)
+    {
+        _protocolSession = session;
+    }
+
+
+    public AMQProtocolSession getProtocolSession()
+    {
+        return _protocolSession;
+    }
+
+    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+            throws AMQException
+    {
+        AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+        writeMessageDelivery(entry, channelId, deliverBody);
+    }
+
+
+    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+            throws AMQException
+    {
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+        }
+        else
+        {
+            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+            ContentHeaderBody chb = new ContentHeaderBody(props, org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl.CLASS_ID);
+            chb.bodySize = message.getSize();
+            return chb;
+        }
+    }
+
+
+    private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
+        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+    }
+
+    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
+
+
+        int bodySize = (int) message.getSize();
+
+        if(bodySize == 0)
+        {
+            SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+                                                                             contentHeaderBody);
+
+            writeFrame(compositeBlock);
+        }
+        else
+        {
+            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+            int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+
+            int writtenSize = capacity;
+
+            AMQBody firstContentBody = new MessageContentSourceBody(message,0,capacity);
+
+            CompositeAMQBodyBlock
+                    compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+            writeFrame(compositeBlock);
+
+            while(writtenSize < bodySize)
+            {
+                capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
+                MessageContentSourceBody body = new MessageContentSourceBody(message, writtenSize, capacity);
+                writtenSize += capacity;
+
+                writeFrame(new AMQFrame(channelId, body));
+            }
+        }
+    }
+
+    private class MessageContentSourceBody implements AMQBody
+    {
+        public static final byte TYPE = 3;
+        private int _length;
+        private MessageContentSource _message;
+        private int _offset;
+
+        public MessageContentSourceBody(MessageContentSource message, int offset, int length)
+        {
+            _message = message;
+            _offset = offset;
+            _length = length;
+        }
+
+        public byte getFrameType()
+        {
+            return TYPE;
+        }
+
+        public int getSize()
+        {
+            return _length;
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            byte[] data = new byte[_length];
+
+            _message.getContent(java.nio.ByteBuffer.wrap(data), _offset);
+
+            buffer.write(data);
+        }
+
+        public void handle(int channelId, AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException
+        {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+    {
+
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+                                                                      contentHeaderBody);
+        return contentHeader;
+    }
+
+
+    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+    {
+        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+        writeMessageDelivery(entry, channelId, deliver);
+    }
+
+
+    private AMQBody createEncodedDeliverBody(QueueEntry entry,
+                                              final long deliveryTag,
+                                              final AMQShortString consumerTag)
+            throws AMQException
+    {
+
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
+
+        final AMQBody returnBlock = new AMQBody()
+        {
+
+            public AMQBody _underlyingBody;
+
+            public AMQBody createAMQBody()
+            {
+                return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+                                                              deliveryTag,
+                                                              isRedelivered,
+                                                              exchangeName,
+                                                              routingKey);
+
+
+
+
+
+            }
+
+            public byte getFrameType()
+            {
+                return AMQMethodBody.TYPE;
+            }
+
+            public int getSize()
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                return _underlyingBody.getSize();
+            }
+
+            public void writePayload(DataOutput buffer) throws IOException
+            {
+                if(_underlyingBody == null)
+                {
+                    _underlyingBody = createAMQBody();
+                }
+                _underlyingBody.writePayload(buffer);
+            }
+
+            public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+                throws AMQException
+            {
+                throw new AMQException("This block should never be dispatched!");
+            }
+        };
+        return returnBlock;
+    }
+
+    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+            throws AMQException
+    {
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
+
+        BasicGetOkBody getOkBody =
+                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+                                                    isRedelivered,
+                                                    exchangeName,
+                                                    routingKey,
+                                                    queueSize);
+
+        return getOkBody;
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return getProtocolSession().getProtocolMinorVersion();
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return getProtocolSession().getProtocolMajorVersion();
+    }
+
+    private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+                                             int replyCode,
+                                             AMQShortString replyText) throws AMQException
+    {
+
+        BasicReturnBody basicReturnBody =
+                METHOD_REGISTRY.createBasicReturnBody(replyCode,
+                        replyText,
+                        messagePublishInfo.getExchange(),
+                        messagePublishInfo.getRoutingKey());
+
+
+        return basicReturnBody;
+    }
+
+    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+            throws AMQException
+    {
+
+        AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+        writeMessageDelivery(message, header, channelId, returnFrame);
+    }
+
+
+    public void writeFrame(AMQDataBlock block)
+    {
+        getProtocolSession().writeFrame(block);
+    }
+
+
+    public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+    {
+
+        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+        writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+    }
+
+
+    public static final class CompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final AMQBody _contentBody;
+        private final int _channel;
+
+
+        public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+            _contentBody = contentBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+        }
+    }
+
+    public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+    {
+        public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+        private final AMQBody _methodBody;
+        private final AMQBody _headerBody;
+        private final int _channel;
+
+
+        public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+        {
+            _channel = channel;
+            _methodBody = methodBody;
+            _headerBody = headerBody;
+
+        }
+
+        public long getSize()
+        {
+            return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+        }
+
+        public void writePayload(DataOutput buffer) throws IOException
+        {
+            AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+        }
+    }
+}

Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org