You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/05/07 17:09:43 UTC

svn commit: r942101 - in /qpid/trunk/qpid/java/broker/src: main/java/org/apache/qpid/server/ main/java/org/apache/qpid/server/handler/ main/java/org/apache/qpid/server/protocol/ main/java/org/apache/qpid/server/queue/ main/java/org/apache/qpid/server/t...

Author: ritchiem
Date: Fri May  7 15:09:42 2010
New Revision: 942101

URL: http://svn.apache.org/viewvc?rev=942101&view=rev
Log:
QPID-2575 : Create Connection and Session models to correctly expose the Owning Session. Addressed issue where getPrincipal was used in error to identify queue owner. Session model now allows access to this in a protocol independent way.

Added:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
Modified:
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri May  7 15:09:42 2010
@@ -56,6 +56,8 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.protocol.AMQProtocolEngine;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.IncomingMessage;
@@ -86,7 +88,7 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class AMQChannel implements SessionConfig
+public class AMQChannel implements SessionConfig, AMQSessionModel
 {
     public static final int DEFAULT_PREFETCH = 5000;
 
@@ -1058,6 +1060,16 @@ public class AMQChannel implements Sessi
 
     }
 
+    public Object getID()
+    {
+        return _channelId;
+    }
+
+    public AMQConnectionModel getConnectionModel()
+    {
+        return _session;
+    }
+
     private class MessageDeliveryAction implements ServerTransaction.Action
     {
         private IncomingMessage _incommingMessage;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Fri May  7 15:09:42 2010
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -48,14 +49,14 @@ public class BasicConsumeMethodHandler i
 
     public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
+        AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
 
 
 
 
-        AMQChannel channel = session.getChannel(channelId);
+        AMQChannel channel = protocolConnection.getChannel(channelId);
 
-        VirtualHost vHost = session.getVirtualHost();
+        VirtualHost vHost = protocolConnection.getVirtualHost();
 
         if (channel == null)
         {
@@ -96,16 +97,20 @@ public class BasicConsumeMethodHandler i
                 final AMQShortString consumerTagName;
 
                 // Check authz
-                if (!vHost.getAccessManager().authoriseConsume(session,
+                if (!vHost.getAccessManager().authoriseConsume(protocolConnection,
                         body.getExclusive(), body.getNoAck(),
                         body.getNoLocal(), body.getNowait(), queue))
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
                 }
-                else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+                else if (queue.isExclusive() && !queue.isDurable())
                 {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+                    AMQSessionModel session = queue.getExclusiveOwningSession();
+                    if (session == null || session.getConnectionModel() != protocolConnection)
+                    {
+                        throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                          "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+                    }
                 }
 
                 if (body.getConsumerTag() != null)
@@ -126,9 +131,9 @@ public class BasicConsumeMethodHandler i
                                                                               body.getArguments(), body.getNoLocal(), body.getExclusive());
                         if (!body.getNowait())
                         {
-                            MethodRegistry methodRegistry = session.getMethodRegistry();
+                            MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
                             AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
-                            session.writeFrame(responseBody.generateFrame(channelId));
+                            protocolConnection.writeFrame(responseBody.generateFrame(channelId));
 
                         }
                     }
@@ -136,12 +141,12 @@ public class BasicConsumeMethodHandler i
                     {
                         AMQShortString msg = new AMQShortString("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
 
-                        MethodRegistry methodRegistry = session.getMethodRegistry();
+                        MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
                         AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),    // replyCode
                                                                  msg,               // replytext
                                                                  body.getClazz(),
                                                                  body.getMethod());
-                        session.writeFrame(responseBody.generateFrame(0));
+                        protocolConnection.writeFrame(responseBody.generateFrame(0));
                     }
 
                 }
@@ -149,12 +154,12 @@ public class BasicConsumeMethodHandler i
                 {
                     _logger.debug("Closing connection due to invalid selector");
 
-                    MethodRegistry methodRegistry = session.getMethodRegistry();
+                    MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
                     AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.INVALID_ARGUMENT.getCode(),
                                                                                        new AMQShortString(ise.getMessage()),
                                                                                        body.getClazz(),
                                                                                        body.getMethod());
-                    session.writeFrame(responseBody.generateFrame(channelId));
+                    protocolConnection.writeFrame(responseBody.generateFrame(channelId));
 
 
                 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Fri May  7 15:09:42 2010
@@ -39,6 +39,7 @@ import org.apache.qpid.server.subscripti
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -62,12 +63,12 @@ public class BasicGetMethodHandler imple
 
     public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
+        AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
 
 
-        VirtualHost vHost = session.getVirtualHost();
+        VirtualHost vHost = protocolConnection.getVirtualHost();
 
-        AMQChannel channel = session.getChannel(channelId);
+        AMQChannel channel = protocolConnection.getChannel(channelId);
         if (channel == null)
         {
             throw body.getChannelNotFoundException(channelId);
@@ -93,24 +94,28 @@ public class BasicGetMethodHandler imple
             {
 
                 //Perform ACLs
-                if (!vHost.getAccessManager().authoriseConsume(session, body.getNoAck(), queue))
+                if (!vHost.getAccessManager().authoriseConsume(protocolConnection, body.getNoAck(), queue))
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
                 }
-                else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+                else if (queue.isExclusive())
                 {
-                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                      "Queue is exclusive, but not created on this Connection.");
+                    AMQSessionModel session = queue.getExclusiveOwningSession();
+                    if (session == null || session.getConnectionModel() != protocolConnection)
+                    {
+                        throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                          "Queue is exclusive, but not created on this Connection.");
+                    }
                 }
 
-                if (!performGet(queue,session, channel, !body.getNoAck()))
+                if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
                 {
-                    MethodRegistry methodRegistry = session.getMethodRegistry();
+                    MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
                     // TODO - set clusterId
                     BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
 
 
-                    session.writeFrame(responseBody.generateFrame(channelId));
+                    protocolConnection.writeFrame(responseBody.generateFrame(channelId));
                 }
             }
         }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Fri May  7 15:09:42 2010
@@ -34,6 +34,7 @@ import org.apache.qpid.server.binding.Bi
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -59,8 +60,8 @@ public class QueueBindHandler implements
 
     public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-        VirtualHost virtualHost = session.getVirtualHost();
+        AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+        VirtualHost virtualHost = protocolConnection.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
@@ -70,7 +71,7 @@ public class QueueBindHandler implements
 
         if (body.getQueue() == null)
         {
-            AMQChannel channel = session.getChannel(channelId);
+            AMQChannel channel = protocolConnection.getChannel(channelId);
 
             if (channel == null)
             {
@@ -114,15 +115,19 @@ public class QueueBindHandler implements
         {
 
             //Perform ACLs
-            if (!virtualHost.getAccessManager().authoriseBind(session, exch,
+            if (!virtualHost.getAccessManager().authoriseBind(protocolConnection, exch,
                     queue, routingKey))
             {
                 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
             }
-            else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+            else if (queue.isExclusive() && !queue.isDurable())
             {
-                throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
-                                                  "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+                AMQSessionModel session = queue.getExclusiveOwningSession();
+                if (session == null || session.getConnectionModel() != protocolConnection)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
+                                                      "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
+                }
             }
 
             if (!exch.isBound(routingKey, body.getArguments(), queue))
@@ -153,9 +158,9 @@ public class QueueBindHandler implements
         }
         if (!body.getNowait())
         {
-            MethodRegistry methodRegistry = session.getMethodRegistry();
+            MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
             AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
-            session.writeFrame(responseBody.generateFrame(channelId));
+            protocolConnection.writeFrame(responseBody.generateFrame(channelId));
 
         }
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Fri May  7 15:09:42 2010
@@ -35,6 +35,7 @@ import org.apache.qpid.server.AMQChannel
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.QueueRegistry;
@@ -61,8 +62,8 @@ public class QueueDeclareHandler impleme
 
     public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
     {
-        final AMQProtocolSession session = stateManager.getProtocolSession();
-        VirtualHost virtualHost = session.getVirtualHost();
+        final AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+        VirtualHost virtualHost = protocolConnection.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
         DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
@@ -71,7 +72,7 @@ public class QueueDeclareHandler impleme
         if (!body.getPassive())
         {
             // Perform ACL if request is not passive
-            if (!virtualHost.getAccessManager().authoriseCreateQueue(session, body.getAutoDelete(), body.getDurable(),
+            if (!virtualHost.getAccessManager().authoriseCreateQueue(protocolConnection, body.getAutoDelete(), body.getDurable(),
                     body.getExclusive(), body.getNowait(), body.getPassive(), body.getQueue()))
             {
                 throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
@@ -97,9 +98,16 @@ public class QueueDeclareHandler impleme
         synchronized (queueRegistry)
         {
 
+            queue = queueRegistry.getQueue(queueName);
 
+            AMQSessionModel session = null;
 
-            if (((queue = queueRegistry.getQueue(queueName)) == null))
+            if (queue != null)
+            {
+                session = queue.getExclusiveOwningSession();
+            }
+
+            if (queue == null)
             {
 
                 if (body.getPassive())
@@ -109,8 +117,8 @@ public class QueueDeclareHandler impleme
                 }
                 else
                 {
-                    queue = createQueue(queueName, body, virtualHost, session);
-                    queue.setPrincipalHolder(session);
+                    queue = createQueue(queueName, body, virtualHost, protocolConnection);
+                    queue.setPrincipalHolder(protocolConnection);
                     if (queue.isDurable() && !queue.isAutoDelete())
                     {
                         store.createQueue(queue, body.getArguments());
@@ -122,26 +130,25 @@ public class QueueDeclareHandler impleme
                     queueRegistry.registerQueue(queue);
                     if(body.getExclusive())
                     {
-                        if(body.getDurable())
-                        {
-                            queue.setExclusiveOwner(session.getPrincipal().getName());
-                        }
-                        else
+
+                        queue.setExclusiveOwningSession(protocolConnection.getChannel(channelId));
+                        queue.setPrincipalHolder(protocolConnection);
+
+                        if(!body.getDurable())
                         {
                             final AMQQueue q = queue;
-                            queue.setExclusiveOwner(session);
                             final AMQProtocolSession.Task sessionCloseTask = new AMQProtocolSession.Task()
                             {
                                 public void doTask(AMQProtocolSession session) throws AMQException
                                 {
-                                    q.setExclusiveOwner(null);
+                                    q.setExclusiveOwningSession(null);
                                 }
                             };
-                            session.addSessionCloseTask(sessionCloseTask);
+                            protocolConnection.addSessionCloseTask(sessionCloseTask);
                             queue.addQueueDeleteTask(new AMQQueue.Task() {
                                 public void doTask(AMQQueue queue) throws AMQException
                                 {
-                                    session.removeSessionCloseTask(sessionCloseTask);
+                                    protocolConnection.removeSessionCloseTask(sessionCloseTask);
                                 }
                             });
                         }
@@ -156,7 +163,7 @@ public class QueueDeclareHandler impleme
                     }
                 }
             }
-            else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+            else if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection))
             {
                 throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
                                                   "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
@@ -168,12 +175,13 @@ public class QueueDeclareHandler impleme
                                                   "Cannot re-declare queue '" + queue.getNameShortString() + "' with different exclusivity (was: "
                                                     + queue.isExclusive() + " requested " + body.getExclusive() + ")");
             }
-            else if (!body.getPassive() && body.getExclusive() && !queue.getExclusiveOwner().equals(queue.isDurable() ? session.getPrincipal().getName() : session))
+
+            else if (!body.getPassive() && body.getExclusive() && !(queue.isDurable() ? queue.getOwner().equals(protocolConnection.getPrincipal().getName()) : (session == null || session.getConnectionModel() != protocolConnection)))
             {
-                throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
-                                                                           + " as exclusive queue with same name "
+                throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'), "
+                                                                           + "as exclusive queue with same name "
                                                                            + "declared on another client ID('"
-                                                                           + queue.getPrincipalHolder().getPrincipal().getName() + "')");
+                                                                           + queue.getOwner() + "')");
 
             }
             else if(!body.getPassive() && queue.isAutoDelete() != body.getAutoDelete())
@@ -190,7 +198,7 @@ public class QueueDeclareHandler impleme
             }
 
 
-            AMQChannel channel = session.getChannel(channelId);
+            AMQChannel channel = protocolConnection.getChannel(channelId);
 
             if (channel == null)
             {
@@ -203,12 +211,12 @@ public class QueueDeclareHandler impleme
 
         if (!body.getNowait())
         {
-            MethodRegistry methodRegistry = session.getMethodRegistry();
+            MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
             QueueDeclareOkBody responseBody =
                     methodRegistry.createQueueDeclareOkBody(queueName,
                                                             queue.getMessageCount(),
                                                             queue.getConsumerCount());
-            session.writeFrame(responseBody.generateFrame(channelId));
+            protocolConnection.writeFrame(responseBody.generateFrame(channelId));
 
             _logger.info("Queue " + queueName + " declared successfully");
         }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Fri May  7 15:09:42 2010
@@ -26,6 +26,7 @@ import org.apache.qpid.framing.QueueDele
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -58,15 +59,15 @@ public class QueueDeleteHandler implemen
 
     public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-        VirtualHost virtualHost = session.getVirtualHost();
+        AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+        VirtualHost virtualHost = protocolConnection.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
         DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
 
         AMQQueue queue;
         if (body.getQueue() == null)
         {
-            AMQChannel channel = session.getChannel(channelId);
+            AMQChannel channel = protocolConnection.getChannel(channelId);
 
             if (channel == null)
             {
@@ -103,12 +104,13 @@ public class QueueDeleteHandler implemen
             else
             {
 
+                AMQSessionModel session = queue.getExclusiveOwningSession();
                 //Perform ACLs
-                if (!virtualHost.getAccessManager().authoriseDelete(session, queue))
+                if (!virtualHost.getAccessManager().authoriseDelete(protocolConnection, queue))
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
                 }
-                else if (queue.isExclusive() && !queue.isDurable() && queue.getExclusiveOwner() != session)
+                else if (queue.isExclusive() && !queue.isDurable() && (session == null || session.getConnectionModel() != protocolConnection))
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
                                                       "Queue " + queue.getNameShortString() + " is exclusive, but not created on this Connection.");
@@ -121,9 +123,9 @@ public class QueueDeleteHandler implemen
                     store.removeQueue(queue);
                 }
 
-                MethodRegistry methodRegistry = session.getMethodRegistry();
+                MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
                 QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
-                session.writeFrame(responseBody.generateFrame(channelId));
+                protocolConnection.writeFrame(responseBody.generateFrame(channelId));
             }
         }
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Fri May  7 15:09:42 2010
@@ -27,6 +27,7 @@ import org.apache.qpid.framing.MethodReg
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -57,11 +58,11 @@ public class QueuePurgeHandler implement
 
     public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
     {
-        AMQProtocolSession session = stateManager.getProtocolSession();
-        VirtualHost virtualHost = session.getVirtualHost();
+        AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+        VirtualHost virtualHost = protocolConnection.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
 
-        AMQChannel channel = session.getChannel(channelId);
+        AMQChannel channel = protocolConnection.getChannel(channelId);
 
 
         AMQQueue queue;
@@ -98,13 +99,14 @@ public class QueuePurgeHandler implement
         }
         else
         {
+                AMQSessionModel session = queue.getExclusiveOwningSession();
 
                 //Perform ACLs
-                if (!virtualHost.getAccessManager().authorisePurge(session, queue))
+                if (!virtualHost.getAccessManager().authorisePurge(protocolConnection, queue))
                 {
                     throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "Permission denied");
-                }            
-                else if (queue.isExclusive() && queue.getExclusiveOwner() != session)
+                }
+                else if (queue.isExclusive() && (session == null || session.getConnectionModel() != protocolConnection))
                 {
                     throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
                                                       "Queue is exclusive, but not created on this Connection.");
@@ -116,10 +118,10 @@ public class QueuePurgeHandler implement
                 if(!body.getNowait())
                 {
 
-                    MethodRegistry methodRegistry = session.getMethodRegistry();
+                    MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
                     AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
-                    session.writeFrame(responseBody.generateFrame(channelId));
-                    
+                    protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+
                 }
         }
     }

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=942101&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Fri May  7 15:09:42 2010
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
+
+public interface AMQConnectionModel
+{
+
+    /**
+     * Close the given requested Session
+     * @param session
+     * @param cause
+     * @param message
+     * @throws org.apache.qpid.AMQException
+     */
+    public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
+
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri May  7 15:09:42 2010
@@ -62,7 +62,6 @@ import org.apache.qpid.transport.Network
 import org.apache.qpid.transport.Sender;
 
 import javax.management.JMException;
-import javax.management.MBeanException;
 import javax.security.sasl.SaslServer;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -502,7 +501,7 @@ public class AMQProtocolEngine implement
         return channel;
     }
 
-    public AMQChannel getChannel(int channelId) throws AMQException
+    public AMQChannel getChannel(int channelId)
     {
         final AMQChannel channel =
                 ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
@@ -1231,4 +1230,20 @@ public class AMQProtocolEngine implement
             }
         }
     }
+
+
+    public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+    {
+
+        closeChannel((Integer)session.getID());
+
+        MethodRegistry methodRegistry = getMethodRegistry();
+        ChannelCloseBody responseBody =
+                methodRegistry.createChannelCloseBody(
+                        cause.getCode(),
+                        new AMQShortString(message),
+                        0,0);
+
+        writeFrame(responseBody.generateFrame((Integer)session.getID()));       
+    }       
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri May  7 15:09:42 2010
@@ -37,7 +37,7 @@ import java.security.Principal;
 import java.util.List;
 
 
-public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession, PrincipalHolder, AMQConnectionModel
 {
     long getSessionID();
 
@@ -109,7 +109,7 @@ public interface AMQProtocolSession exte
      *
      * @return null if no channel exists, the channel otherwise
      */
-    AMQChannel getChannel(int channelId) throws AMQException;
+    AMQChannel getChannel(int channelId);
 
     /**
      * Associate a channel with this session.

Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=942101&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri May  7 15:09:42 2010
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+public interface AMQSessionModel
+{
+    Object getID();
+
+    AMQConnectionModel getConnectionModel();
+}

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri May  7 15:09:42 2010
@@ -23,6 +23,8 @@ package org.apache.qpid.server.queue;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.QueueConfig;
 import org.apache.qpid.server.configuration.QueueConfiguration;
@@ -68,8 +70,8 @@ public interface AMQQueue extends Managa
     PrincipalHolder getPrincipalHolder();
     void setPrincipalHolder(PrincipalHolder principalHolder);
 
-    void setExclusiveOwner(Object owner);
-    Object getExclusiveOwner();
+    void setExclusiveOwningSession(AMQSessionModel owner);
+    AMQSessionModel getExclusiveOwningSession();
 
     VirtualHost getVirtualHost();
 

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri May  7 15:09:42 2010
@@ -7,6 +7,8 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.pool.ReadWriteRunnable;
 import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.configuration.ConfigStore;
 import org.apache.qpid.server.configuration.ConfiguredObject;
@@ -83,7 +85,7 @@ public class SimpleAMQQueue implements A
 
     private PrincipalHolder _prinicpalHolder;
 
-    private Object _exclusiveOwner;
+    private AMQSessionModel _exclusiveOwner;
 
 
     private final boolean _durable;
@@ -2045,12 +2047,12 @@ public class SimpleAMQQueue implements A
         return ids;
     }
 
-    public Object getExclusiveOwner()
+    public AMQSessionModel getExclusiveOwningSession()
     {
         return _exclusiveOwner;
     }
 
-    public void setExclusiveOwner(Object exclusiveOwner)
+    public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner)
     {
         _exclusiveOwner = exclusiveOwner;
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Fri May  7 15:09:42 2010
@@ -22,10 +22,23 @@ package org.apache.qpid.server.transport
 
 import org.apache.qpid.server.configuration.ConnectionConfig;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.ConnectionCloseCode;
+import org.apache.qpid.transport.Session;
+import org.apache.qpid.transport.SessionDetachCode;
+import org.apache.qpid.transport.SessionDetach;
+import org.apache.qpid.transport.Binary;
+import org.apache.qpid.transport.SessionDetached;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.ExecutionException;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.AMQException;
 
-public class ServerConnection extends Connection
+public class ServerConnection extends Connection implements AMQConnectionModel
 {
     private ConnectionConfig _config;
     private Runnable _onOpenTask;
@@ -88,4 +101,15 @@ public class ServerConnection extends Co
     {
         _onOpenTask = task;
     }
+
+    public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+    {
+        ExecutionException ex = new ExecutionException();
+        ex.setErrorCode(ExecutionErrorCode.RESOURCE_LIMIT_EXCEEDED);
+        ex.setDescription(message);
+        ((ServerSession)session).invoke(ex);
+
+        ((ServerSession)session).close();
+    }
+   
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Fri May  7 15:09:42 2010
@@ -39,6 +39,8 @@ import org.apache.qpid.server.txn.AutoCo
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.transport.Binary;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.MessageTransfer;
@@ -63,7 +65,7 @@ import java.util.concurrent.ConcurrentSk
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicLong;
 
-public class ServerSession extends Session implements PrincipalHolder, SessionConfig
+public class ServerSession extends Session implements PrincipalHolder, SessionConfig, AMQSessionModel
 {
     private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
 
@@ -310,7 +312,7 @@ public class ServerSession extends Sessi
         }
     }
 
-    public void removeDispositionListener(Method method)
+    public void removeDispositionListener(Method method)                               
     {
         _messageDispositionListenerMap.remove(method.getId());
     }
@@ -552,4 +554,15 @@ public class ServerSession extends Sessi
     {
         close();
     }
+
+    public Object getID()
+    {
+       return getName();
+    }
+
+    public AMQConnectionModel getConnectionModel()
+    {
+        return (ServerConnection) getConnection();
+    }
+
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Fri May  7 15:09:42 2010
@@ -36,6 +36,7 @@ import org.apache.qpid.server.flow.*;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUnknownExchangeType;
 import org.apache.qpid.framing.*;
@@ -813,7 +814,7 @@ public class ServerSessionDelegate exten
                         if(method.getExclusive())
                         {
                             queue.setPrincipalHolder((ServerSession)session);
-                            queue.setExclusiveOwner(session);
+                            queue.setExclusiveOwningSession((AMQSessionModel) session);
                         }
                         else if(method.getAutoDelete())
                         {

Modified: qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=942101&r1=942100&r2=942101&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Fri May  7 15:09:42 2010
@@ -30,6 +30,7 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.AMQException;
@@ -47,7 +48,7 @@ public class MockAMQQueue implements AMQ
 
     private PrincipalHolder _principalHolder;
 
-    private Object _exclusiveOwner;
+    private AMQSessionModel _exclusiveOwner;
 
     public MockAMQQueue(String name)
     {
@@ -527,12 +528,12 @@ public class MockAMQQueue implements AMQ
         _principalHolder = principalHolder;
     }
 
-    public Object getExclusiveOwner()
+    public AMQSessionModel getExclusiveOwningSession()
     {
         return _exclusiveOwner;
     }
 
-    public void setExclusiveOwner(Object exclusiveOwner)
+    public void setExclusiveOwningSession(AMQSessionModel exclusiveOwner)
     {
         _exclusiveOwner = exclusiveOwner;
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org