You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/22 11:26:03 UTC

svn commit: r498574 - in /incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler: BasicGetMethodHandler.java QueuePurgeHandler.java

Author: rgreig
Date: Mon Jan 22 02:26:03 2007
New Revision: 498574

URL: http://svn.apache.org/viewvc?view=rev&rev=498574
Log:
QPID-275 : Patch supplied by Rob Godfrey - Add support for get / purge / qos size / default exchanges and some other small fixes highlighted by the python tests

Added:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java?view=auto&rev=498574
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java Mon Jan 22 02:26:03 2007
@@ -0,0 +1,77 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+
+public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetBody>
+{
+    private static final Logger _log = Logger.getLogger(BasicGetMethodHandler.class);
+
+    private static final BasicGetMethodHandler _instance = new BasicGetMethodHandler();
+
+    public static BasicGetMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private BasicGetMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+                               ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
+                               AMQMethodEvent<BasicGetBody> evt) throws AMQException
+    {
+        BasicGetBody body = evt.getMethod();
+        final int channelId = evt.getChannelId();
+
+        AMQChannel channel = session.getChannel(channelId);
+        if (channel == null)
+        {
+            _log.error("Channel " + channelId + " not found");
+            // TODO: either alert or error that the
+        }
+        else
+        {
+            AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+
+            if (queue == null)
+            {
+                _log.info("No queue for '" + body.queue + "'");
+                if(body.queue!=null)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(),
+                                                      "No such queue, '" + body.queue + "'");
+                }
+                else
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),
+                                                      "No queue name provided, no default queue defined.");
+                }
+            }
+            else
+            {
+                if(!queue.performGet(session, channel, !body.noAck))
+                {
+
+
+                    // TODO - set clusterId
+                    session.writeFrame(BasicGetEmptyBody.createAMQFrame(channelId, (byte) 8, (byte) 0, null));
+                }
+            }
+        }
+    }
+}

Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?view=auto&rev=498574
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Mon Jan 22 02:26:03 2007
@@ -0,0 +1,81 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueuePurgeHandler implements StateAwareMethodListener<QueuePurgeBody>
+{
+    private static final QueuePurgeHandler _instance = new QueuePurgeHandler();
+
+    public static QueuePurgeHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private final boolean _failIfNotFound;
+
+    public QueuePurgeHandler()
+    {
+        this(true);
+    }
+
+    public QueuePurgeHandler(boolean failIfNotFound)
+    {
+        _failIfNotFound = failIfNotFound;
+    }
+
+    public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges, AMQProtocolSession session, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+    {
+        QueuePurgeBody body = evt.getMethod();
+        AMQQueue queue;
+        if(body.queue == null)
+        {
+            queue = session.getChannel(evt.getChannelId()).getDefaultQueue();
+            if(queue == null)
+            {
+                if(_failIfNotFound)
+                {
+                    throw body.getConnectionException(AMQConstant.NOT_ALLOWED.getCode(),"No queue specified.");
+                }
+
+            }
+        }
+        else
+        {
+            queue = queues.getQueue(body.queue);
+        }
+
+        if(queue == null)
+        {
+            if(_failIfNotFound)
+            {
+                throw body.getChannelException(404, "Queue " + body.queue + " does not exist.");
+            }
+        }
+        else
+        {
+                long purged = queue.clearQueue(session.getChannel(evt.getChannelId()).getStoreContext());
+
+
+                if(!body.nowait)
+                {
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    session.writeFrame(QueuePurgeOkBody.createAMQFrame(evt.getChannelId(),
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        purged));	// messageCount
+                }
+        }
+    }
+}