You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/01/29 16:32:14 UTC
svn commit: r501081 - in /incubator/qpid/branches/qpid.0-9/java:
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/server/protocol/
broker/src/main/java/org/apache/qpid/server/queue/
systests/src/test/java/org/ap...
Author: gsim
Date: Mon Jan 29 07:32:13 2007
New Revision: 501081
URL: http://svn.apache.org/viewvc?view=rev&rev=501081
Log:
Moved across auto deletion functionailty for exclusive, non-durable queues (aka private queues).
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=501081&r1=501080&r2=501081
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Mon Jan 29 07:32:13 2007
@@ -144,11 +144,42 @@
return MessageFormat.format("{0,number,0000000000000}", value);
}
- protected AMQQueue createQueue(QueueDeclareBody body, QueueRegistry registry, AMQProtocolSession session)
+ protected AMQQueue createQueue(QueueDeclareBody body, final QueueRegistry registry, final AMQProtocolSession session)
throws AMQException
{
String owner = body.exclusive ? session.getContextKey() : null;
if (owner != null) _log.info("Queue " + body.queue + " is owned by " + owner);
- return new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry);
+ final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, registry);
+ final String queueName = queue.getName();
+
+ if(body.exclusive && !body.durable)
+ {
+ final AMQProtocolSession.Task deleteQueueTask =
+ new AMQProtocolSession.Task()
+ {
+
+ public void doTask(AMQProtocolSession session) throws AMQException
+ {
+ if(registry.getQueue(queueName) == queue)
+ {
+ queue.delete();
+ }
+
+ }
+ };
+
+ session.addSessionCloseTask(deleteQueueTask);
+
+ queue.addQueueDeleteTask(new AMQQueue.Task()
+ {
+ public void doTask(AMQQueue queue)
+ {
+ session.removeSessionCloseTask(deleteQueueTask);
+ }
+ });
+
+
+ }
+ return queue;
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=501081&r1=501080&r2=501081
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 29 07:32:13 2007
@@ -75,6 +75,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
public class AMQMinaProtocolSession implements AMQProtocolSession,
@@ -121,6 +122,8 @@
private byte _minor;
private FieldTable _clientProperties;
+ private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+
// Keeps a tally of connections for logging and debugging
private static AtomicInteger _ConnectionId;
static { _ConnectionId = new AtomicInteger(0); }
@@ -550,6 +553,10 @@
{
_managedObject.unregister();
}
+ for(Task task : _taskList)
+ {
+ task.doTask(this);
+ }
}
}
@@ -719,5 +726,15 @@
public int getConnectionId()
{
return _ConnectionId.get();
+ }
+
+ public void addSessionCloseTask(Task task)
+ {
+ _taskList.add(task);
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ _taskList.remove(task);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=501081&r1=501080&r2=501081
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Mon Jan 29 07:32:13 2007
@@ -36,6 +36,14 @@
public interface AMQProtocolSession extends AMQProtocolWriter
{
+
+
+
+ public static interface Task
+ {
+ public void doTask(AMQProtocolSession session) throws AMQException;
+ }
+
/**
* Called when a protocol data block is received
* @param message the data block that has been received
@@ -150,4 +158,8 @@
boolean versionEquals(byte major, byte minor);
void checkMethodBodyVersion(AMQMethodBody methodBody);
int getConnectionId();
+
+ void addSessionCloseTask(Task task);
+
+ void removeSessionCloseTask(Task task);
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=501081&r1=501080&r2=501081
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Jan 29 07:32:13 2007
@@ -34,6 +34,7 @@
import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -92,6 +93,12 @@
private final AtomicBoolean _isExclusive = new AtomicBoolean();
+ private final AtomicBoolean _deleted = new AtomicBoolean(false);
+
+
+
+ private List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+
/**
* Manages message delivery.
*/
@@ -509,10 +516,19 @@
public void delete() throws AMQException
{
- _subscribers.queueDeleted(this);
- _bindings.deregister();
- _queueRegistry.unregisterQueue(_name);
- _managedObject.unregister();
+ if(!_deleted.getAndSet(true))
+ {
+ _subscribers.queueDeleted(this);
+ _bindings.deregister();
+ _queueRegistry.unregisterQueue(_name);
+ _managedObject.unregister();
+ for(Task task : _deleteTaskList)
+ {
+ task.doTask(this);
+ }
+ _deleteTaskList.clear();
+ }
+
}
protected void autodelete() throws AMQException
@@ -667,6 +683,16 @@
public void rollback()
{
}
+ }
+
+ public static interface Task
+ {
+ public void doTask(AMQQueue queue) throws AMQException;
+ }
+
+ public void addQueueDeleteTask(Task task)
+ {
+ _deleteTaskList.add(task);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=501081&r1=501080&r2=501081
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java Mon Jan 29 07:32:13 2007
@@ -232,4 +232,14 @@
{
return _ConnectionId.get();
}
+
+ public void addSessionCloseTask(Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeSessionCloseTask(Task task)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
}