You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/25 23:59:05 UTC

svn commit: r829675 [4/11] - in /qpid/trunk/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/ broker/bin/ broker/src/main/java/org/apac...

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java Sun Oct 25 22:58:57 2009
@@ -81,13 +81,13 @@
                 message = channel.getUnacknowledgedMessageMap().remove(deliveryTag);
                 if(message != null)
                 {
-                    message.discard(channel.getStoreContext());
+                    message.discard();
                 }
                 //sendtoDeadLetterQueue(msg)
                 return;
             }
 
-            if (!message.getMessage().isReferenced())
+            if (message.getMessage() == null)
             {
                 _logger.warn("Message as already been purged, unable to Reject.");
                 return;
@@ -96,7 +96,7 @@
 
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage().debugIdentity() +
+                _logger.debug("Rejecting: DT:" + deliveryTag + "-" + message.getMessage() +
                               ": Requeue:" + body.getRequeue() +
                               //": Resend:" + evt.getMethod().resend +
                               " on channel:" + channel.debugIdentity());

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Sun Oct 25 22:58:57 2009
@@ -24,7 +24,6 @@
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.security.access.Permission;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -96,7 +95,7 @@
 
             session.writeFrame(responseBody.generateFrame(channelId));
 
-            
+
         }
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Sun Oct 25 22:58:57 2009
@@ -50,5 +50,6 @@
         }
         stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
         session.initHeartbeats(body.getHeartbeat());
+        session.setMaxFrameSize(body.getFrameMax());
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -92,11 +92,11 @@
                     try
                     {
 
-                    exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
-                                                              body.getType() == null ? null : body.getType().intern(),
-                                                              body.getDurable(),
-                                                              body.getPassive(), body.getTicket());
-                    exchangeRegistry.registerExchange(exchange);
+                        exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
+                                                                  body.getType() == null ? null : body.getType().intern(),
+                                                                  body.getDurable(),
+                                                                  body.getPassive(), body.getTicket());
+                        exchangeRegistry.registerExchange(exchange);
                     }
                     catch(AMQUnknownExchangeType e)
                     {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Sun Oct 25 22:58:57 2009
@@ -40,7 +40,7 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
@@ -60,11 +60,11 @@
 
     public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
+        final AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-        MessageStore store = virtualHost.getMessageStore();
+        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
 
 
         if (!body.getPassive())
@@ -109,11 +109,31 @@
                 else
                 {
                     queue = createQueue(queueName, body, virtualHost, session);
+                    queue.setPrincipalHolder(session);
                     if (queue.isDurable() && !queue.isAutoDelete())
                     {
                         store.createQueue(queue, body.getArguments());
                     }
                     queueRegistry.registerQueue(queue);
+                    if(queue.isExclusive()  && !queue.isAutoDelete())
+                    {
+                        final AMQQueue q = queue;
+                        queue.setExclusiveOwner(session);
+                        final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
+                        {
+                            public void doTask(AMQProtocolSession session) throws AMQException
+                            {
+                                q.setExclusiveOwner(null);
+                            }
+                        };
+                        session.addSessionCloseTask(sessionCloseTask);
+                        queue.addQueueDeleteTask(new AMQQueue.Task() {
+                            public void doTask(AMQQueue queue) throws AMQException
+                            {
+                                session.removeSessionCloseTask(sessionCloseTask);
+                            }
+                        });
+                    }
                     if (autoRegister)
                     {
                         Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
@@ -123,14 +143,18 @@
                     }
                 }
             }
-            else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
+            else if (queue.getPrincipalHolder() != null
+                      && queue.getPrincipalHolder().getPrincipal() != null
+                      && queue.getPrincipalHolder().getPrincipal().getName() != null
+                      && (!queue.getPrincipalHolder().getPrincipal().getName().equals(session.getPrincipal().getName())
+                          || ((!body.getPassive() && queue.getExclusiveOwner() != null && queue.getExclusiveOwner() != session))))
             {
                 throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
                                                                            + " as exclusive queue with same name "
                                                                            + "declared on another client ID('"
-                                                                           + queue.getOwner() + "')");
-            }
+                                                                           + queue.getPrincipalHolder().getPrincipal().getName() + "')");
 
+            }
             AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
@@ -197,7 +221,7 @@
                 }
             });
         }// if exclusive and not durable
-        
+
         return queue;
     }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Sun Oct 25 22:58:57 2009
@@ -31,6 +31,7 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.security.access.Permission;
@@ -62,7 +63,7 @@
         AMQProtocolSession session = stateManager.getProtocolSession();
         VirtualHost virtualHost = session.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-        MessageStore store = virtualHost.getMessageStore();
+        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
 
         AMQQueue queue;
         if (body.getQueue() == null)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Sun Oct 25 22:58:57 2009
@@ -33,7 +33,6 @@
 import org.apache.qpid.server.state.StateAwareMethodListener;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.security.access.Permission;
 
 public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
 {
@@ -106,7 +105,7 @@
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
                 }
 
-                long purged = queue.clearQueue(channel.getStoreContext());
+                long purged = queue.clearQueue();
 
 
                 if(!body.getNowait())

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Sun Oct 25 22:58:57 2009
@@ -61,14 +61,12 @@
             {
                 throw body.getChannelNotFoundException(channelId);
             }
-
             channel.commit();
 
             MethodRegistry methodRegistry = session.getMethodRegistry();
             AMQMethodBody responseBody = methodRegistry.createTxCommitOkBody();
             session.writeFrame(responseBody.generateFrame(channelId));
-            
-            channel.processReturns();
+                        
         }
         catch (AMQException e)
         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Sun Oct 25 22:58:57 2009
@@ -44,9 +44,9 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, int channelId) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, final int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
+        final AMQProtocolSession session = stateManager.getProtocolSession();
 
         try
         {
@@ -57,17 +57,22 @@
                 throw body.getChannelNotFoundException(channelId);
             }
 
-            channel.rollback();
 
-            MethodRegistry methodRegistry = session.getMethodRegistry();
-            AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
-            session.writeFrame(responseBody.generateFrame(channelId));
 
+            final MethodRegistry methodRegistry = session.getMethodRegistry();
+            final AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+
+            Runnable task = new Runnable()
+            {
+
+                public void run()
+                {
+                    session.writeFrame(responseBody.generateFrame(channelId));
+                }
+            };
+
+            channel.rollback(task);
             
-            //Now resend all the unacknowledged messages back to the original subscribers.
-            //(Must be done after the TxnRollback-ok response).
-            // Why, are we not allowed to send messages back to client before the ok method?
-            channel.resend(false);
         }
         catch (AMQException e)
         {

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPChannelActor.java Sun Oct 25 22:58:57 2009
@@ -68,7 +68,7 @@
          */
         _logString = "[" + MessageFormat.format(ChannelLogSubject.CHANNEL_FORMAT,
                                                session.getSessionID(),
-                                               session.getAuthorizedID().getName(),
+                                               session.getPrincipal().getName(),
                                                session.getRemoteAddress(),
                                                session.getVirtualHost().getName(),
                                                channel.getChannelId())

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/AMQPConnectionActor.java Sun Oct 25 22:58:57 2009
@@ -75,7 +75,7 @@
     {
         _logString = "[" + MessageFormat.format(USER_FORMAT,
                                                 session.getSessionID(),
-                                                session.getAuthorizedID().getName(),
+                                                session.getPrincipal().getName(),
                                                 session.getRemoteAddress())
                      + "] ";
 
@@ -105,7 +105,7 @@
          */
         _logString = "[" + MessageFormat.format(ConnectionLogSubject.CONNECTION_FORMAT,
                                                 session.getSessionID(),
-                                                session.getAuthorizedID().getName(),
+                                                session.getPrincipal().getName(),
                                                 session.getRemoteAddress(),
                                                 session.getVirtualHost().getName())
                      + "] ";

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java Sun Oct 25 22:58:57 2009
@@ -21,6 +21,9 @@
 package org.apache.qpid.server.logging.actors;
 
 import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.RootMessageLogger;
 
 import java.util.EmptyStackException;
 import java.util.Stack;
@@ -66,6 +69,8 @@
         }
     };
 
+    private static LogActor _defaultActor;
+
     /**
      * Set a new LogActor to be the Current Actor
      * <p/>
@@ -105,7 +110,12 @@
         }
         catch (EmptyStackException ese)
         {
-            return null;
+            return _defaultActor;
         }
     }
+
+    public static void setDefault(LogActor defaultActor)
+    {
+        _defaultActor = defaultActor;
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties Sun Oct 25 22:58:57 2009
@@ -61,13 +61,32 @@
 # 0 - path
 MST-1002 = Store location : {0}
 MST-1003 = Closed
+MST-1004 = Recovery Start
+MST-1005 = Recovered {0,number} messages
+MST-1006 = Recovery Complete
+
+#ConfigStore
+# 0 - name
+CFG-1001 = Created : {0}
+# 0 - path
+CFG-1002 = Store location : {0}
+CFG-1003 = Closed
+CFG-1004 = Recovery Start
+CFG-1005 = Recovery Complete
+
+#TransactionLog
+# 0 - name
+TXN-1001 = Created : {0}
+# 0 - path
+TXN-1002 = Store location : {0}
+TXN-1003 = Closed
 # 0 - queue name
-MST-1004 = Recovery Start[ : {0}]
+TXN-1004 = Recovery Start[ : {0}]
 # 0 - count
 # 1 - queue count
-MST-1005 = Recovered {0,number} messages for queue {1}
+TXN-1005 = Recovered {0,number} messages for queue {1}
 # 0 - queue name
-MST-1006 = Recovery Complete[ : {0}]
+TXN-1006 = Recovery Complete[ : {0}]
 
 #Connection
 # 0 - Client id
@@ -83,12 +102,18 @@
 # 0 - bytes allowed in prefetch
 # 1 - number of messagse.
 CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+# 0 - queue causing flow control
+CHN-1005 = Flow Control Enforced (Queue {0})
+CHN-1006 = Flow Control Removed
 
 #Queue
 # 0 - owner
 # 1 - priority
 QUE-1001 = Create :[ Owner: {0}][ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
 QUE-1002 = Deleted
+QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
+QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
+
 
 #Exchange
 # 0 - type
@@ -104,4 +129,4 @@
 SUB-1001 = Create[ : Durable][ : Arguments : {0}]
 SUB-1002 = Close
 # 0 - The current subscription state
-SUB-1003 = State : {0}
\ No newline at end of file
+SUB-1003 = State : {0}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties Sun Oct 25 22:58:57 2009
@@ -16,173 +16,7 @@
 #  specific language governing permissions and limitations
 #  under the License.
 #
-# LogMessages used within the Java Broker as originally defined on the wiki:
-#
-# http://cwiki.apache.org/confluence/display/qpid/Status+Update+Design#StatusUpdateDesign-InitialStatusMessages
-#  
-# Technical Notes:
-#  This is a standard Java Properties file so white space is respected at the
-#  end of the lines. This file is processed in a number of ways.
-# 1) ResourceBundle
-#   This file is loaded through a ResourceBundle named LogMessages. the en_US
-#   addition to the file is the localisation. Additional localisations can be
-#   provided and will automatically be selected based on the <locale> value in
-#   the config.xml. The default is en_US.
-#
-# 2) MessasgeFormat
-#  Each entry is prepared with the Java Core MessageFormat methods. Therefore
-#  most functionality you can do via MessageFormat can be done here:
-#
-#  http://java.sun.com/javase/6/docs/api/java/text/MessageFormat.html
-#
-#  The cavet here is that only default String and number FormatTypes can be used.
-#  This is due to the processing described in 3 below. If support for date, time
-#  or choice is requried then the GenerateLogMessages class should be updated to
-#  provide support.
-#
-#  Format Note:
-#   As mentioned earlier white space in this file is very important. One thing
-#  in particular to note is the way MessageFormat peforms its replacements.
-#  The replacement text will totally replace the {xxx} section so there will be
-#  no addtion of white space or removal e.g.
-#     MSG = Text----{0}----
-#  When given parameter 'Hello' result in text:
-#     Text----Hello----
-#
-#  For simple arguments this is expected however when using Style formats then
-#  it can be a little unexepcted. In particular a common pattern is used for
-#  number replacements : {0,number,#}. This is used in the Broker to display an
-#  Integer simply as the Integer with no formating. e.g new Integer(1234567)
-#  becomes the String "1234567" which is can be contrasted with the pattern
-#  without a style format field : {0,number} which becomes string "1,234,567".
-#
-#  What you may not expect is that {0,number, #} would produce the String " 1234567"
-#  note the space after the ','   here      /\   has resulted in a space  /\ in
-#  the output.
-#
-#  More details on the SubformatPattern can be found on the API link above.
-#
-# 3) GenerateLogMessage/Velocity Macro
-#  This is the first and final stage of processing that this file goes through.
-#   1) Class Generation:
-#      The GenerateLogMessage processes this file and uses the velocity Macro
-#      to create classes with static methods to perform the logging and give us
-#      compile time validation.
-#
-#   2) Property Processing:
-#      During the class generation the message properties ({x}) are identified
-#      and used to create the method signature.
-#
-#   3) Option Processing:
-#      The Classes perform final formatting of the messages at runtime based on
-#      optional parameters that are defined within the message. Optional
-#      paramters are enclosed in square brackets e.g. [optional].
-#
-#  To provide fixed log messages as required by the Technical Specification:
-#  http://cwiki.apache.org/confluence/display/qpid/Operational+Logging+-+Status+Update+-+Technical+Specification#OperationalLogging-StatusUpdate-TechnicalSpecification-Howtoprovidefixedlogmessages
-#
-#  This file is processed by Velocity to create a number of classes that contain
-#  static methods that provide LogMessages in the code to provide compile time
-#  validation.
-#
-#  For details of what processing is done see GenerateLogMessages.
-#
-#  What a localiser or developer need know is the following:
-#
-#  The Property structure is important is it defines how the class and methods
-#  will be built.
-#
-#  Class Generation:
-#  =================
-#
-#  Each class of messages will be split in to their own <Class>Messages.java
-#  Currently the following classes are created and are populated with the
-#  messages that bear their 3-digit type identifier:
-#
-#        Class              | Type
-#      ---------------------|--------
-#        Broker             |  BKR
-#        ManagementConsole  |  MNG
-#        VirtualHost        |  VHT
-#        MessageStore       |  MST
-#        Connection         |  CON
-#        Channel            |  CHN
-#        Queue              |  QUE
-#        Exchange           |  EXH
-#        Binding            |  BND
-#        Subscription       |  SUB
-#
-#  Property Processing:
-#  ====================
-#
-#   Each property is then processed by the GenerateLogMessages class to identify
-#   The number and type of parameters, {x} entries. Parameters are defaulted to
-#   String types but the use of FormatType number (e.g.{0,number}) will result
-#   in a Number type being used. These parameters are then used to build the
-#   method parameter list. e.g:
-#   Property:
-#    BRK-1003 = Shuting down : {0} port {1,number,#}
-#   becomes Method:
-#    public static LogMessage BRK_1003(String param1, Number param2)
-#
-#   This improves our compile time validation of log message content and
-#   ensures that change in the message format does not accidentally cause
-#   erroneous messages.
-#
-#  Option Processing:
-#  ====================
-#
-#  Options are identified in the log message as being surrounded by square
-#  brackets ([ ]). These optional values can themselves contain paramters
-#  however nesting of options is not permitted. Identification is performed on
-#  first matchings so give the message:
-#   Msg = Log Message [option1] [option2]
-#  Two options will be identifed and enabled to select text 'option1 and
-#  'option2'.
-#
-#  The nesting of a options is not supported and will provide
-#  unexpected results. e.g. Using Message:
-#   Msg = Log Message [option1 [sub-option2]]
-#
-#  The options will be 'option1 [sub-option2' and 'sub-option2'. The first
-#  option includes the second option as the nesting is not detected.
-#
-#  The detected options are presented in the method signature as boolean options
-#  numerically identified by their position in the message. e.g.
-#   Property:
-#    CON-1001 = Open : Client ID {0} [: Protocol Version : {1}]
-#  becomes Method:
-#    public static LogMessage CON_1001(String param1, String param2, boolean opt1)
-#
-#  The value of 'opt1' will show/hide the option in the message. Note that
-#  'param2' is still required however a null value can be used if the optional
-#  section is not desired.
-#
-#  Again here the importance of white space needs to be highlighted.
-#  Looking at the QUE-1001 message as an example. The first thought on how this
-#  would look would be as follows:
-# QUE-1001 = Create : Owner: {0} [AutoDelete] [Durable] [Transient] [Priority: {1,number,#}]
-#  Each option is correctly defined so the text that is defined will appear when
-#  selected. e.g. 'AutoDelete'. However, what may not be immediately apparent is
-#  the white space. Using the above definition of QUE-1001 if we were to print
-#  the message with only the Priority option displayed it would appear as this:
-#  "Create : Owner: guest    Priority: 1"
-#  Note the spaces here   /\ This is because only the text between the brackets
-#  has been removed.
-#
-#  Each option needs to include white space to correctly format the message. So
-#  the correct definition of QUE-1001 is as follows:
-# QUE-1001 = Create : Owner: {0}[ AutoDelete][ Durable][ Transient][ Priority: {1,number,#}]
-#  Note that white space is included with each option and there is no extra
-#  white space between the options. As a result the output with just Priority
-#  enabled is as follows:
-#  "Create : Owner: guest Priority: 1"
-#
-#  The final processing that is done in the generation is the conversion of the
-#  property name. As a '-' is an illegal character in the method name it is
-#  converted to '_' This processing gives the final method signature as follows:
-#   <Class>Message.<Type>_<Number>(<parmaters>,<options>)
-#
+# Default File used for all non-defined locales.
 #Broker
 # 0 - Version
 # 1 = Build
@@ -227,13 +61,32 @@
 # 0 - path
 MST-1002 = Store location : {0}
 MST-1003 = Closed
+MST-1004 = Recovery Start
+MST-1005 = Recovered {0,number} messages
+MST-1006 = Recovery Complete
+
+#ConfigStore
+# 0 - name
+CFG-1001 = Created : {0}
+# 0 - path
+CFG-1002 = Store location : {0}
+CFG-1003 = Closed
+CFG-1004 = Recovery Start
+CFG-1005 = Recovery Complete
+
+#TransactionLog
+# 0 - name
+TXN-1001 = Created : {0}
+# 0 - path
+TXN-1002 = Store location : {0}
+TXN-1003 = Closed
 # 0 - queue name
-MST-1004 = Recovery Start[ : {0}]
+TXN-1004 = Recovery Start[ : {0}]
 # 0 - count
 # 1 - queue count
-MST-1005 = Recovered {0,number} messages for queue {1}
+TXN-1005 = Recovered {0,number} messages for queue {1}
 # 0 - queue name
-MST-1006 = Recovery Complete[ : {0}]
+TXN-1006 = Recovery Complete[ : {0}]
 
 #Connection
 # 0 - Client id
@@ -247,8 +100,9 @@
 CHN-1002 = Flow {0}
 CHN-1003 = Close
 # 0 - bytes allowed in prefetch
-# 1 - number of messagse. 
+# 1 - number of messagse.
 CHN-1004 = Prefetch Size (bytes) {0,number} : Count {1,number}
+# 0 - queue causing flow control
 CHN-1005 = Flow Control Enforced (Queue {0})
 CHN-1006 = Flow Control Removed
 
@@ -260,6 +114,7 @@
 QUE-1003 = Overfull : Size : {0,number} bytes, Capacity : {1,number}
 QUE-1004 = Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
 
+
 #Exchange
 # 0 - type
 # 1 - name
@@ -273,4 +128,5 @@
 #Subscription
 SUB-1001 = Create[ : Durable][ : Arguments : {0}]
 SUB-1002 = Close
+# 0 - The current subscription state
 SUB-1003 = State : {0}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ChannelLogSubject.java Sun Oct 25 22:58:57 2009
@@ -46,7 +46,7 @@
         // Provide the value for the 4th replacement.
         setLogStringWithFormat(CHANNEL_FORMAT,
               session.getSessionID(),
-              session.getAuthorizedID().getName(),
+              session.getPrincipal().getName(),
               session.getRemoteAddress(),
               session.getVirtualHost().getName(),
               channel.getChannelId());

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/ConnectionLogSubject.java Sun Oct 25 22:58:57 2009
@@ -41,7 +41,7 @@
     public ConnectionLogSubject(AMQProtocolSession session)
     {
         setLogStringWithFormat(CONNECTION_FORMAT, session.getSessionID(),
-               session.getAuthorizedID().getName(),
+               session.getPrincipal().getName(),
                session.getRemoteAddress(),
                session.getVirtualHost().getName());
     }

Propchange: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sun Oct 25 22:58:57 2009
@@ -1,3 +1,5 @@
 /qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/management:757268
+/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/management:805429-821809
 /qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/management:787599
 /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:753219-753220,753253,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -65,16 +65,9 @@
         return null;
     }
 
-    public void register() throws AMQException
+    public void register() throws JMException
     {
-        try
-        {
-            getManagedObjectRegistry().registerObject(this);
-        }
-        catch (JMException e)
-        {
-            throw new AMQException("Error registering managed object " + this + ": " + e, e);
-        }
+        getManagedObjectRegistry().registerObject(this);
     }
 
     protected ManagedObjectRegistry getManagedObjectRegistry()
@@ -98,7 +91,7 @@
     {
         return getObjectInstanceName() + "[" + getType() + "]";
     }
-    
+
 
     /**
      * Created the ObjectName as per the JMX Specs
@@ -140,7 +133,7 @@
 
         objectName.append(",");
         objectName.append("version=").append(_version);
-        
+
         return new ObjectName(objectName.toString());
     }
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Sun Oct 25 22:58:57 2009
@@ -54,6 +54,7 @@
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.UnknownHostException;
 import java.rmi.AlreadyBoundException;
 import java.rmi.RemoteException;
 import java.rmi.registry.LocateRegistry;
@@ -236,8 +237,17 @@
          * The registry is exported on the defined management port 'port'. We will export the RMIConnectorServer
          * on 'port +1'. Use of these two well-defined ports will ease any navigation through firewall's. 
          */
-        final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(port+PORT_EXPORT_OFFSET, csf, ssf, env); 
-        final String hostname = InetAddress.getLocalHost().getHostName();
+        final RMIServerImpl rmiConnectorServerStub = new RMIJRMPServerImpl(port+PORT_EXPORT_OFFSET, csf, ssf, env);
+        String localHost;
+        try
+        {
+            localHost = InetAddress.getLocalHost().getHostName();
+        }
+        catch(UnknownHostException ex)
+        {
+            localHost="127.0.0.1";
+        }
+        final String hostname = localHost;
         final JMXServiceURL externalUrl = new JMXServiceURL(
                 "service:jmx:rmi://"+hostname+":"+(port+PORT_EXPORT_OFFSET)+"/jndi/rmi://"+hostname+":"+port+"/jmxrmi");
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java Sun Oct 25 22:58:57 2009
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,6 +22,7 @@
 
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.management.JMException;
 
 import org.apache.qpid.AMQException;
 
@@ -45,7 +46,7 @@
 
     ManagedObject getParentObject();
 
-    void register() throws AMQException;
+    void register() throws AMQException, JMException;
 
     void unregister() throws AMQException;
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java Sun Oct 25 22:58:57 2009
@@ -26,10 +26,13 @@
  */
 package org.apache.qpid.server.output;
 
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.MessageContentSource;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.AMQException;
 
 public interface ProtocolOutputConverter
@@ -41,16 +44,16 @@
         ProtocolOutputConverter newInstance(AMQProtocolSession session);
     }
 
-    void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+    void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException;
 
-    void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException;
+    void writeGetOk(QueueEntry message, int channelId, long deliveryTag, int queueSize) throws AMQException;
 
     byte getProtocolMinorVersion();
 
     byte getProtocolMajorVersion();
 
-    void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+    void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource msgContent,  int channelId, int replyCode, AMQShortString replyText)
                     throws AMQException;
 
     void writeFrame(AMQDataBlock block);

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Sun Oct 25 22:58:57 2009
@@ -27,28 +27,34 @@
 package org.apache.qpid.server.output.amqp0_8;
 
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.amqp_8_0.BasicGetBodyImpl;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
 
-import org.apache.mina.common.ByteBuffer;
-
-import java.util.Iterator;
+import java.nio.ByteBuffer;
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
 
+    private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+
+    private static final ProtocolVersionMethodConverter PROTOCOL_CONVERTER =
+            METHOD_REGISTRY.getProtocolVersionMethodConverter();
 
     public static Factory getInstanceFactory()
     {
         return new Factory()
         {
-    
+
             public ProtocolOutputConverter newInstance(AMQProtocolSession session)
             {
                 return new ProtocolOutputConverterImpl(session);
@@ -69,71 +75,47 @@
         return _protocolSession;
     }
 
-    public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
-        AMQDataBlock deliver = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      message.getContentHeaderBody());
-
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
-
-
-        final int bodyCount = messageHandle.getBodyCount(storeContext);
+        AMQDataBlock deliver = createEncodedDeliverFrame(entry, channelId, deliveryTag, consumerTag);
+        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+    }
 
-        if(bodyCount == 0)
+    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+            throws AMQException
+    {
+        if(entry.getMessage() instanceof AMQMessage)
         {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                             contentHeader);
-
-            writeFrame(compositeBlock);
+            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
         }
         else
         {
-
-
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
-
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
-            {
-                cb = messageHandle.getContentChunk(storeContext, i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
-            }
-
-
+            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+            ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+            chb.bodySize = message.getSize(); 
+            return chb;
         }
-
-
     }
 
 
-    public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
     {
+        AMQDataBlock deliver = createEncodedGetOkFrame(entry, channelId, deliveryTag, queueSize);
+        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliver);
+    }
 
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
+    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody chb, int channelId, AMQDataBlock deliver)
+            throws AMQException
+    {
 
-        AMQDataBlock deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
 
+        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId, chb);
 
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      message.getContentHeaderBody());
 
-        final int bodyCount = messageHandle.getBodyCount(storeContext);
-        if(bodyCount == 0)
+        final int bodySize = (int) message.getSize();
+        if(bodySize == 0)
         {
             SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
                                                                              contentHeader);
@@ -141,66 +123,97 @@
         }
         else
         {
+            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
 
+            final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+            ByteBuffer buf = ByteBuffer.allocate(capacity);
 
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+            int writtenSize = 0;
 
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+            writtenSize += message.getContent(buf, writtenSize);
+            buf.flip();
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf));
             AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
             writeFrame(compositeBlock);
 
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
+            while(writtenSize < bodySize)
             {
-                cb = messageHandle.getContentChunk(storeContext, i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                buf = java.nio.ByteBuffer.allocate(capacity);
+                writtenSize += message.getContent(buf, writtenSize);
+                buf.flip();
+                writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
             }
 
-
         }
-
-
     }
 
 
-    private AMQDataBlock createEncodedDeliverFrame(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+    private AMQDataBlock createEncodedDeliverFrame(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
             throws AMQException
     {
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
+
 
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
         BasicDeliverBody deliverBody =
-                methodRegistry.createBasicDeliverBody(consumerTag,
+                METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
                                                       deliveryTag,
-                                                      messageHandle.isRedelivered(),
-                                                      pb.getExchange(),
-                                                      pb.getRoutingKey());
+                                                      isRedelivered,
+                                                      exchangeName,
+                                                      routingKey);
+
         AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
 
 
         return deliverFrame;
     }
 
-    private AMQDataBlock createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+    private AMQDataBlock createEncodedGetOkFrame(QueueEntry entry, int channelId, long deliveryTag, int queueSize)
             throws AMQException
     {
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
+
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
 
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
         BasicGetOkBody getOkBody =
-                methodRegistry.createBasicGetOkBody(deliveryTag,
-                                                    messageHandle.isRedelivered(),
-                                                    pb.getExchange(),
-                                                    pb.getRoutingKey(),
+                METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+                                                    isRedelivered,
+                                                    exchangeName,
+                                                    routingKey,
                                                     queueSize);
         AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
 
@@ -217,54 +230,31 @@
         return getProtocolSession().getProtocolMajorVersion();
     }
 
-    private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+    private AMQDataBlock createEncodedReturnFrame(MessagePublishInfo messagePublishInfo, int channelId, int replyCode, AMQShortString replyText) throws AMQException
     {
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
         BasicReturnBody basicReturnBody =
-                methodRegistry.createBasicReturnBody(replyCode,
+                METHOD_REGISTRY.createBasicReturnBody(replyCode,
                                                      replyText,
-                                                     message.getMessagePublishInfo().getExchange(),
-                                                     message.getMessagePublishInfo().getRoutingKey());
+                                                     messagePublishInfo.getExchange(),
+                                                     messagePublishInfo.getRoutingKey());
         AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
 
         return returnFrame;
     }
 
-    public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+    public void writeReturn(MessagePublishInfo messagePublishInfo,
+                            ContentHeaderBody header,
+                            MessageContentSource content,
+                            int channelId,
+                            int replyCode,
+                            AMQShortString replyText)
             throws AMQException
     {
-        AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
-
-        AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                      message.getContentHeaderBody());
 
-        Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
-        //
-        // Optimise the case where we have a single content body. In that case we create a composite block
-        // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-        //
-        if (bodyFrameIterator.hasNext())
-        {
-            AMQDataBlock firstContentBody = bodyFrameIterator.next();
-            AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
+        AMQDataBlock returnFrame = createEncodedReturnFrame(messagePublishInfo, channelId, replyCode, replyText);
 
-            writeFrame(compositeBlock);
-        }
+        writeMessageDelivery(content, header, channelId, returnFrame);
 
-        //
-        // Now start writing out the other content bodies
-        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
-        //
-        while (bodyFrameIterator.hasNext())
-        {
-            writeFrame(bodyFrameIterator.next());
-        }
     }
 
 
@@ -276,8 +266,7 @@
 
     public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
     {
-        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
-        BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+        BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
         writeFrame(basicCancelOkBody.generateFrame(channelId));
 
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java Sun Oct 25 22:58:57 2009
@@ -1,46 +1,48 @@
 package org.apache.qpid.server.output.amqp0_9;
-/*
- * 
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- * 
- */
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 
-import org.apache.mina.common.ByteBuffer;
 
-import java.util.Iterator;
+import org.apache.mina.common.ByteBuffer;
 
 import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.queue.AMQMessageHandle;
-import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.amqp_0_9.BasicGetBodyImpl;
 import org.apache.qpid.framing.abstraction.MessagePublishInfo;
 import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
     private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
-    private static final ProtocolVersionMethodConverter PROTOCOL_METHOD_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
+    private static final ProtocolVersionMethodConverter
+            PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
 
 
     public static Factory getInstanceFactory()
@@ -68,20 +70,46 @@
         return _protocolSession;
     }
 
-    public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag)
+    public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+            throws AMQException
+    {
+        AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+        writeMessageDelivery(entry, channelId, deliverBody);
+    }
+
+
+    private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
             throws AMQException
     {
-        AMQBody deliverBody = createEncodedDeliverFrame(message, channelId, deliveryTag, consumerTag);
-        final ContentHeaderBody contentHeaderBody = message.getContentHeaderBody();
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+        }
+        else
+        {
+            final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+            ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+            chb.bodySize = message.getSize();
+            return chb;
+        }
+    }
+
 
+    private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
+        writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+    }
 
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
+    private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+            throws AMQException
+    {
 
 
-        final int bodyCount = messageHandle.getBodyCount(storeContext);
+        int bodySize = (int) message.getSize();
 
-        if(bodyCount == 0)
+        if(bodySize == 0)
         {
             SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
                                                                              contentHeaderBody);
@@ -90,101 +118,75 @@
         }
         else
         {
+            int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
 
+            final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+            java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
 
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
+            int writtenSize = 0;
 
-            AMQBody firstContentBody = PROTOCOL_METHOD_CONVERTER.convertToBody(cb);
 
-            CompositeAMQBodyBlock compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+            writtenSize += message.getContent(buf, writtenSize);
+            buf.flip();
+            AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+
+            CompositeAMQBodyBlock
+                    compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
             writeFrame(compositeBlock);
 
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
+            while(writtenSize < bodySize)
             {
-                cb = messageHandle.getContentChunk(storeContext, i);
-                writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
-            }
-
+                buf = java.nio.ByteBuffer.allocate(capacity);
 
+                writtenSize += message.getContent(buf, writtenSize);
+                buf.flip();
+                writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+            }
         }
-        
     }
 
     private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
     {
-        
+
         AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
                                                                       contentHeaderBody);
         return contentHeader;
     }
 
 
-    public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException
+    public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
     {
+        AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+        writeMessageDelivery(entry, channelId, deliver);
+    }
 
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-        final StoreContext storeContext = message.getStoreContext();
-
-        AMQFrame deliver = createEncodedGetOkFrame(message, channelId, deliveryTag, queueSize);
 
+    private AMQBody createEncodedDeliverBody(QueueEntry entry,
+                                              final long deliveryTag,
+                                              final AMQShortString consumerTag)
+            throws AMQException
+    {
 
-        AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
 
-        final int bodyCount = messageHandle.getBodyCount(storeContext);
-        if(bodyCount == 0)
+        if(entry.getMessage() instanceof AMQMessage)
         {
-            SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                             contentHeader);
-            writeFrame(compositeBlock);
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
         }
         else
         {
-
-
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            ContentChunk cb = messageHandle.getContentChunk(storeContext, 0);
-
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb));
-            AMQDataBlock[] blocks = new AMQDataBlock[]{deliver, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-
-            //
-            // Now start writing out the other content bodies
-            //
-            for(int i = 1; i < bodyCount; i++)
-            {
-                cb = messageHandle.getContentChunk(storeContext, i);
-                writeFrame(new AMQFrame(channelId, PROTOCOL_METHOD_CONVERTER.convertToBody(cb)));
-            }
-
-
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
         }
 
-
-    }
-
-
-    private AMQBody createEncodedDeliverFrame(AMQMessage message, final int channelId, final long deliveryTag, final AMQShortString consumerTag)
-            throws AMQException
-    {
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
-
-
-        final boolean isRedelivered = messageHandle.isRedelivered();
-        final AMQShortString exchangeName = pb.getExchange();
-        final AMQShortString routingKey = pb.getRoutingKey();
+        final boolean isRedelivered = entry.isRedelivered();
 
         final AMQBody returnBlock = new AMQBody()
         {
@@ -237,22 +239,37 @@
         return returnBlock;
     }
 
-    private AMQFrame createEncodedGetOkFrame(AMQMessage message, int channelId, long deliveryTag, int queueSize)
+    private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
             throws AMQException
     {
-        final MessagePublishInfo pb = message.getMessagePublishInfo();
-        final AMQMessageHandle messageHandle = message.getMessageHandle();
+        final AMQShortString exchangeName;
+        final AMQShortString routingKey;
 
+        if(entry.getMessage() instanceof AMQMessage)
+        {
+            final AMQMessage message = (AMQMessage) entry.getMessage();
+            final MessagePublishInfo pb = message.getMessagePublishInfo();
+            exchangeName = pb.getExchange();
+            routingKey = pb.getRoutingKey();
+        }
+        else
+        {
+            MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+            DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+            exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+            routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+        }
+
+        final boolean isRedelivered = entry.isRedelivered();
 
         BasicGetOkBody getOkBody =
                 METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
-                                                    messageHandle.isRedelivered(),
-                                                    pb.getExchange(),
-                                                    pb.getRoutingKey(),
+                                                    isRedelivered,
+                                                    exchangeName,
+                                                    routingKey,
                                                     queueSize);
-        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
 
-        return getOkFrame;
+        return getOkBody;
     }
 
     public byte getProtocolMinorVersion()
@@ -265,53 +282,28 @@
         return getProtocolSession().getProtocolMajorVersion();
     }
 
-    private AMQDataBlock createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
+    private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+                                             int replyCode,
+                                             AMQShortString replyText) throws AMQException
     {
 
         BasicReturnBody basicReturnBody =
                 METHOD_REGISTRY.createBasicReturnBody(replyCode,
                                                      replyText,
-                                                     message.getMessagePublishInfo().getExchange(),
-                                                     message.getMessagePublishInfo().getRoutingKey());
-        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
+                                                     messagePublishInfo.getExchange(),
+                                                     messagePublishInfo.getRoutingKey());
 
-        return returnFrame;
+
+        return basicReturnBody;
     }
 
-    public void writeReturn(AMQMessage message, int channelId, int replyCode, AMQShortString replyText)
+    public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
             throws AMQException
     {
-        AMQDataBlock returnFrame = createEncodedReturnFrame(message, channelId, replyCode, replyText);
 
-        AMQDataBlock contentHeader = createContentHeaderBlock(channelId, message.getContentHeaderBody());
+        AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
 
-        Iterator<AMQDataBlock> bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId);
-        //
-        // Optimise the case where we have a single content body. In that case we create a composite block
-        // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-        //
-        if (bodyFrameIterator.hasNext())
-        {
-            AMQDataBlock firstContentBody = bodyFrameIterator.next();
-            AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody};
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks);
-            writeFrame(compositeBlock);
-        }
-        else
-        {
-            CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader});
-
-            writeFrame(compositeBlock);
-        }
-
-        //
-        // Now start writing out the other content bodies
-        // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
-        //
-        while (bodyFrameIterator.hasNext())
-        {
-            writeFrame(bodyFrameIterator.next());
-        }
+        writeMessageDelivery(message, header, channelId, returnFrame);
     }
 
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Sun Oct 25 22:58:57 2009
@@ -34,6 +34,8 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicMarkableReference;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.management.JMException;
 import javax.security.sasl.SaslServer;
@@ -83,8 +85,8 @@
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.state.AMQState;
 import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.NetworkDriver;
 import org.apache.qpid.transport.Sender;
 
@@ -124,7 +126,7 @@
 
     private Object _lastSent;
 
-    protected boolean _closed;
+    protected volatile boolean _closed;
     // maximum number of channels this session should have
     private long _maxNoOfChannels = 1000;
 
@@ -139,7 +141,7 @@
     private Principal _authorizedID;
     private MethodDispatcher _dispatcher;
     private ProtocolSessionIdentifier _sessionIdentifier;
-    
+
     // Create a simple ID that increments for ever new Session
     private final long _sessionID = idGenerator.getAndIncrement();
 
@@ -152,11 +154,13 @@
 
     private long _writtenBytes;
     private long _readBytes;
-    
+
     private Job _readJob;
     private Job _writeJob;
 
     private ReferenceCountingExecutorService _poolReference = ReferenceCountingExecutorService.getInstance();
+    private long _maxFrameSize;
+    private final AtomicBoolean _closing = new AtomicBoolean(false);
 
     public ManagedObject getManagedObject()
     {
@@ -167,7 +171,7 @@
     {
         _stateManager = new AMQStateManager(virtualHostRegistry, this);
         _networkDriver = driver;
-        
+
         _codecFactory = new AMQCodecFactory(true, this);
         _poolReference.acquireExecutorService();
         _readJob = new Job(_poolReference, Job.MAX_JOB_EVENTS, true);
@@ -178,17 +182,9 @@
 
     }
 
-    private AMQProtocolSessionMBean createMBean() throws AMQException
+    private AMQProtocolSessionMBean createMBean() throws JMException
     {
-        try
-        {
-            return new AMQProtocolSessionMBean(this);
-        }
-        catch (JMException ex)
-        {
-            _logger.error("AMQProtocolSession MBean creation has failed ", ex);
-            throw new AMQException("AMQProtocolSession MBean creation has failed ", ex);
-        }
+        return new AMQProtocolSessionMBean(this);
     }
 
     public long getSessionID()
@@ -201,6 +197,21 @@
         return _actor;
     }
 
+    public void setMaxFrameSize(long frameMax)
+    {
+        _maxFrameSize = frameMax;
+    }
+
+    public long getMaxFrameSize()
+    {
+        return _maxFrameSize;
+    }
+
+    public boolean isClosing()
+    {
+        return _closing.get();
+    }
+
     public void received(final ByteBuffer msg)
     {
         _lastIoTime = System.currentTimeMillis();
@@ -290,7 +301,7 @@
                 else
                 {
                     // The channel has been told to close, we don't process any more frames until
-                    // it's closed. 
+                    // it's closed.
                     return;
                 }
             }
@@ -545,7 +556,7 @@
     private void checkForNotification()
     {
         int channelsCount = _channelMap.size();
-        if (channelsCount >= _maxNoOfChannels)
+        if (_managedObject != null && channelsCount >= _maxNoOfChannels)
         {
             _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
         }
@@ -560,7 +571,7 @@
     {
         _maxNoOfChannels = value;
     }
-    
+
     public void commitTransactions(AMQChannel channel) throws AMQException
     {
         if ((channel != null) && channel.isTransactional())
@@ -576,7 +587,7 @@
             channel.rollback();
         }
     }
-    
+
     /**
      * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
      * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
@@ -676,34 +687,58 @@
     /** This must be called when the session is _closed in order to free up any resources managed by the session. */
     public void closeSession() throws AMQException
     {
-        // REMOVE THIS SHOULD NOT BE HERE. 
-        if (CurrentActor.get() == null)
-        {
-            CurrentActor.set(_actor);
-        }
-        if (!_closed)
+        if(_closing.compareAndSet(false,true))
         {
-            if (_virtualHost != null)
+            // REMOVE THIS SHOULD NOT BE HERE.
+            if (CurrentActor.get() == null)
             {
-                _virtualHost.getConnectionRegistry().deregisterConnection(this);
+                CurrentActor.set(_actor);
             }
-
-            closeAllChannels();
-            if (_managedObject != null)
+            if (!_closed)
             {
-                _managedObject.unregister();
-                // Ensure we only do this once.
-                _managedObject = null;
-            }
+                if (_virtualHost != null)
+                {
+                    _virtualHost.getConnectionRegistry().deregisterConnection(this);
+                }
+
+                closeAllChannels();
+                if (_managedObject != null)
+                {
+                    _managedObject.unregister();
+                    // Ensure we only do this once.
+                    _managedObject = null;
+                }
 
-            for (Task task : _taskList)
+                for (Task task : _taskList)
+                {
+                    task.doTask(this);
+                }
+
+                synchronized(this)
+                {
+                    _closed = true;
+                    notifyAll();
+                }
+                _poolReference.releaseExecutorService();
+                CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
+            }
+        }
+        else
+        {
+            synchronized(this)
             {
-                task.doTask(this);
+                while(!_closed)
+                {
+                    try
+                    {
+                        wait(1000);
+                    }
+                    catch (InterruptedException e)
+                    {
+
+                    }
+                }
             }
-            
-            _closed = true;
-            _poolReference.releaseExecutorService();
-            CurrentActor.get().message(_logSubject, ConnectionMessages.CON_1002());
         }
     }
 
@@ -778,7 +813,7 @@
             throw new IllegalArgumentException("Unsupported socket address class: " + address);
         }
     }
-    
+
     public SaslServer getSaslServer()
     {
         return _saslServer;
@@ -868,8 +903,15 @@
 
         _virtualHost.getConnectionRegistry().registerConnection(this);
 
-        _managedObject = createMBean();
-        _managedObject.register();
+        try
+        {
+            _managedObject = createMBean();
+            _managedObject.register();
+        }
+        catch (JMException e)
+        {
+            _logger.error(e);
+        }
     }
 
     public void addSessionCloseTask(Task task)
@@ -881,7 +923,7 @@
     {
         _taskList.remove(task);
     }
-    
+
     public ProtocolOutputConverter getProtocolOutputConverter()
     {
         return _protocolOutputConverter;
@@ -900,10 +942,15 @@
         return _authorizedID;
     }
 
+    public Principal getPrincipal()
+    {
+        return _authorizedID;
+    }
+
     public SocketAddress getRemoteAddress()
     {
         return _networkDriver.getRemoteAddress();
-    }    
+    }
 
     public SocketAddress getLocalAddress()
     {
@@ -939,7 +986,7 @@
 
     public void setNetworkDriver(NetworkDriver driver)
     {
-        _networkDriver = driver;        
+        _networkDriver = driver;
     }
 
     public void writerIdle()
@@ -968,7 +1015,7 @@
 
             MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(getProtocolVersion());
             ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
-                        
+
             writeFrame(closeBody.generateFrame(0));
 
             _networkDriver.close();
@@ -984,7 +1031,7 @@
     {
         // Do nothing
     }
-    
+
     public long getReadBytes()
     {
         return _readBytes;
@@ -1004,7 +1051,7 @@
     {
         return _sessionIdentifier;
     }
-    
+
     public String getClientVersion()
     {
         return (_clientVersion == null) ? null : _clientVersion.toString();
@@ -1022,5 +1069,5 @@
             }
         }
     }
-    
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org