You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/01/29 16:32:14 UTC

svn commit: r501081 - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpid/server/queue/ systests/src/test/java/org/ap...

Author: gsim
Date: Mon Jan 29 07:32:13 2007
New Revision: 501081

URL: http://svn.apache.org/viewvc?view=rev&rev=501081
Log:
Moved across auto deletion functionailty for exclusive, non-durable queues (aka private queues).


Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=501081&r1=501080&r2=501081
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Mon Jan 29 07:32:13 2007
@@ -144,11 +144,42 @@
         return MessageFormat.format("{0,number,0000000000000}", value);
     }
 
-    protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session)
+    protected AMQQueue createQueue(QueueDeclareBody body, final QueueRegistry registry, final AMQProtocolSession session)
             throws AMQException
     {
         String owner = body.exclusive ? session.getContextKey() : null;
         if (owner != null) _log.info("Queue " + body.queue + " is owned by " + owner);
-        return new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry);
+        final AMQQueue queue =  new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry);
+        final String queueName = queue.getName();
+
+        if(body.exclusive && !body.durable)
+        {
+            final AMQProtocolSession.Task deleteQueueTask =
+                new AMQProtocolSession.Task()
+                {
+
+                    public void doTask(AMQProtocolSession session) throws AMQException
+                    {
+                        if(registry.getQueue(queueName) == queue)
+                        {
+                            queue.delete();
+                        }
+
+                    }
+                };
+
+            session.addSessionCloseTask(deleteQueueTask);
+
+            queue.addQueueDeleteTask(new AMQQueue.Task()
+            {
+                public void doTask(AMQQueue queue)
+                {
+                    session.removeSessionCloseTask(deleteQueueTask);
+                }
+            });
+
+
+        }
+        return queue;
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=501081&r1=501080&r2=501081
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 29 07:32:13 2007
@@ -75,6 +75,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class AMQMinaProtocolSession implements AMQProtocolSession,
@@ -121,6 +122,8 @@
     private byte _minor;
     private FieldTable _clientProperties;
 
+    private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+
     // Keeps a tally of connections for logging and debugging
     private static AtomicInteger _ConnectionId;    
     static { _ConnectionId = new AtomicInteger(0); }
@@ -550,6 +553,10 @@
             {
                 _managedObject.unregister();
             }        
+            for(Task task : _taskList)
+            {
+                task.doTask(this);
+            }
         }
     }
 
@@ -719,5 +726,15 @@
     public int getConnectionId()
     {
         return _ConnectionId.get();
+    }
+
+    public void addSessionCloseTask(Task task)
+    {
+        _taskList.add(task);
+    }
+
+    public void removeSessionCloseTask(Task task)
+    {
+        _taskList.remove(task);
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=501081&r1=501080&r2=501081
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Mon Jan 29 07:32:13 2007
@@ -36,6 +36,14 @@
 
 public interface AMQProtocolSession extends AMQProtocolWriter
 {
+
+
+
+    public static interface Task
+    {
+        public void doTask(AMQProtocolSession session) throws AMQException;
+    }
+
     /**
      * Called when a protocol data block is received
      * @param message the data block that has been received
@@ -150,4 +158,8 @@
     boolean versionEquals(byte major, byte minor);
     void checkMethodBodyVersion(AMQMethodBody methodBody);
     int getConnectionId();
+
+    void addSessionCloseTask(Task task);
+
+    void removeSessionCloseTask(Task task);
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=501081&r1=501080&r2=501081
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Jan 29 07:32:13 2007
@@ -34,6 +34,7 @@
 import java.text.MessageFormat;
 import java.util.List;
 import java.util.concurrent.Executor;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -92,6 +93,12 @@
 
     private final AtomicBoolean _isExclusive = new AtomicBoolean();
 
+    private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+
+    private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+
     /**
      * Manages message delivery.
      */
@@ -509,10 +516,19 @@
 
     public void delete() throws AMQException
     {
-        _subscribers.queueDeleted(this);
-        _bindings.deregister();
-        _queueRegistry.unregisterQueue(_name);
-        _managedObject.unregister();
+        if(!_deleted.getAndSet(true))
+        {
+            _subscribers.queueDeleted(this);
+            _bindings.deregister();
+            _queueRegistry.unregisterQueue(_name);
+            _managedObject.unregister();
+            for(Task task : _deleteTaskList)
+            {
+                task.doTask(this);
+            }
+            _deleteTaskList.clear();
+        }
+
     }
 
     protected void autodelete() throws AMQException
@@ -667,6 +683,16 @@
         public void rollback()
         {
         }
+    }
+
+    public static interface Task
+    {
+        public void doTask(AMQQueue queue) throws AMQException;        
+    }
+
+    public void addQueueDeleteTask(Task task)
+    {
+        _deleteTaskList.add(task);
     }
 
 }

Modified: incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=501081&r1=501080&r2=501081
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java Mon Jan 29 07:32:13 2007
@@ -232,4 +232,14 @@
     {
         return _ConnectionId.get();
     }
+
+    public void addSessionCloseTask(Task task)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    public void removeSessionCloseTask(Task task)
+    {
+        //To change body of implemented methods use File | Settings | File Templates.
+    }
 }