You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/02/23 12:36:45 UTC

svn commit: r510912 - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/queue/ broker/src/main/java/org/apache/qpid/s...

Author: gsim
Date: Fri Feb 23 03:36:44 2007
New Revision: 510912

URL: http://svn.apache.org/viewvc?view=rev&rev=510912
Log:
Implementation of queue.unbind & message.get


Added:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java   (with props)
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java   (with props)
Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Feb 23 03:36:44 2007
@@ -22,7 +22,6 @@
 
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.MessageOkBody;
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
@@ -32,12 +31,15 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MessageAppendBody;
 import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageGetBody;
 import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.framing.MessageTransferBody;
 import org.apache.qpid.framing.RequestManager;
 import org.apache.qpid.framing.ResponseManager;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.protocol.RequestToken;
 import org.apache.qpid.server.ack.TxAck;
 import org.apache.qpid.server.ack.UnacknowledgedMessage;
 import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -349,6 +351,12 @@
                                _session.getProtocolMinorVersion()
                            );
         _session.writeResponse(_channelId, msg.getRequestId(), ok);
+    }
+
+    public void deliverGet(RequestToken<MessageGetBody> request, long deliveryTag, AMQMessage msg)
+    {                            
+        request.respond(MessageOkBody.createMethodBody(request.getMajor(), request.getMinor()));
+        deliver(msg, request.getRequest().destination, deliveryTag);
     }
 
     public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag)

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java Fri Feb 23 03:36:44 2007
@@ -28,6 +28,7 @@
 import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.RequestToken;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.ConsumerTagNotUniqueException;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -89,13 +90,11 @@
             {
                 try
                 {
-                    if(queue.performGet(session, channel, !body.noAck))
-                    {
-                        session.writeResponse(evt, MessageOkBody.createMethodBody(
-                            session.getProtocolMajorVersion(), // AMQP major version
-                            session.getProtocolMinorVersion())); // AMQP minor version
-                    }
-                    else
+                    RequestToken<MessageGetBody> request = 
+                        new RequestToken<MessageGetBody>(session, evt, 
+                                                         session.getProtocolMajorVersion(), 
+                                                         session.getProtocolMinorVersion());
+                    if(!queue.performGet(request, channel))
                     {
                         session.writeResponse(evt, MessageEmptyBody.createMethodBody(
                             session.getProtocolMajorVersion(), // AMQP major version

Added: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?view=auto&rev=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Fri Feb 23 03:36:44 2007
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
+
+public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
+{
+    private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
+
+    private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
+
+    public static QueueUnbindHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private QueueUnbindHandler() {}
+
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueUnbindBody> evt) throws AMQException
+    {
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        final QueueUnbindBody body = evt.getMethod();
+        VirtualHost virtualHost = session.getVirtualHost();
+        ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+        
+
+        final AMQQueue queue = queueRegistry.getQueue(body.queue);
+        if (queue == null)
+        {
+            throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist.");
+        }
+        final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+        if (exch == null)
+        {
+            throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist.");
+        }
+
+        queue.unbind(body.routingKey, exch);//TODO: this should take the args as well 
+        if (_log.isInfoEnabled())
+        {
+            _log.info("Unbinding queue " + queue + " from exchange " + exch + " with routing key " + body.routingKey);
+        }
+        // Be aware of possible changes to parameter order as versions change.
+        final AMQMethodBody response = QueueUnbindOkBody.createMethodBody(
+            session.getProtocolMajorVersion(), // AMQP major version
+            session.getProtocolMinorVersion()); // AMQP minor version
+        session.writeResponse(evt, response);
+    }
+}

Propchange: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Feb 23 03:36:44 2007
@@ -626,14 +626,7 @@
     {
         throw new Error("XXX");
     }
-    public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
-    {
-        throw new Error("XXX");
-    }
-    private ByteBuffer createEncodedGetOkFrame(int channelId, long deliveryTag, int queueSize)
-    {
-        throw new Error("XXX");
-    }
+
     // Robert Godfrey added these in r503604
     public long getArrivalTime()
     {

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Feb 23 03:36:44 2007
@@ -24,6 +24,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.protocol.RequestToken;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
@@ -349,6 +351,11 @@
         _bindings.addBinding(routingKey, exchange);
     }
 
+    public void unbind(AMQShortString routingKey, Exchange exchange) throws AMQException
+    {
+        _bindings.unbind(routingKey, exchange);
+    }
+
     public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
                                         FieldTable filters, boolean noLocal, boolean exclusive)
             throws AMQException
@@ -598,9 +605,9 @@
         }
     }
 
-    public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
+    public boolean performGet(RequestToken<MessageGetBody> request, AMQChannel channel) throws AMQException
     {
-        return _deliveryMgr.performGet(session, channel, acks);
+        return _deliveryMgr.performGet(request, channel);
     }
 
     public QueueRegistry getQueueRegistry()

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri Feb 23 03:36:44 2007
@@ -25,6 +25,8 @@
 import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
 import org.apache.qpid.configuration.Configured;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.protocol.RequestToken;
 import org.apache.qpid.server.configuration.Configurator;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.AMQChannel;
@@ -185,8 +187,9 @@
         }
     }
 
-    public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
+    public boolean performGet(RequestToken<MessageGetBody> request, AMQChannel channel) throws AMQException
     {
+        final boolean acks = !request.getRequest().noAck;
         AMQMessage msg = getNextMessage();
         if(msg == null)
         {
@@ -223,7 +226,7 @@
                         channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
                     }
 
-                    msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount());
+                    channel.deliverGet(request, deliveryTag, msg);
                     _totalMessageSize.addAndGet(-msg.getSize());
                 }
             }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Fri Feb 23 03:36:44 2007
@@ -22,6 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.protocol.RequestToken;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -81,7 +83,7 @@
 
     void populatePreDeliveryQueue(Subscription subscription);
 
-    boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException;
+    boolean performGet(RequestToken<MessageGetBody> request, AMQChannel channel) throws AMQException;
 
     long getTotalMessageSize();
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Fri Feb 23 03:36:44 2007
@@ -44,6 +44,7 @@
         {
             this.routingKey = routingKey;
             this.exchange = exchange;
+            if(exchange == null) throw new NullPointerException("Can't create binding for null exchange");
         }
 
         void unbind(AMQQueue queue) throws AMQException
@@ -70,7 +71,8 @@
         {
             if (!(o instanceof ExchangeBinding)) return false;
             ExchangeBinding eb = (ExchangeBinding) o;
-            return exchange.equals(eb.exchange) && routingKey.equals(eb.routingKey);
+            return exchange.equals(eb.exchange) 
+                && (routingKey == null ? eb.routingKey == null : routingKey.equals(eb.routingKey));            
         }
     }
 
@@ -91,6 +93,15 @@
     void addBinding(AMQShortString routingKey, Exchange exchange)
     {
         _bindings.add(new ExchangeBinding(routingKey, exchange));
+    }
+
+    void unbind(AMQShortString routingKey, Exchange exchange) throws AMQException
+    {
+        ExchangeBinding b = new ExchangeBinding(routingKey, exchange);
+        if (_bindings.remove(b)) 
+        {
+            b.unbind(_queue);
+        }
     }
 
     /**

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Feb 23 03:36:44 2007
@@ -130,6 +130,7 @@
         frame2handlerMap.put(MessageResumeBody.class, MessageResumeHandler.getInstance());
         frame2handlerMap.put(MessageTransferBody.class, MessageTransferHandler.getInstance());
         frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance());
+        frame2handlerMap.put(QueueUnbindBody.class, QueueUnbindHandler.getInstance());
         frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance());
         frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance());
         frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance());

Added: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java?view=auto&rev=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java Fri Feb 23 03:36:44 2007
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * Allows the context for a request to be passed around, simplying the
+ * task of responding to it.
+ */
+public class RequestToken<M extends AMQMethodBody>
+{
+    private final AMQProtocolWriter _session;
+    private final AMQMethodEvent<M> _request;
+    private final byte _major;
+    private final byte _minor;
+
+    public RequestToken(AMQProtocolWriter session, AMQMethodEvent<M> request, byte major, byte minor)
+    {
+        _session = session;
+        _request = request;
+        _major = major;
+        _minor = minor;
+    }
+
+    /**
+     * Sends a response to the request this token represents.
+     */
+    public void respond(AMQMethodBody response)
+    {
+        _session.writeResponse(_request.getChannelId(), _request.getRequestId(), response);
+    }
+
+    /**
+     * Provides access to the original request
+     */
+    public M getRequest()
+    {
+        return _request.getMethod();
+    }
+
+    public byte getMajor()
+    {
+        return _major;
+    }
+
+    public byte getMinor()
+    {
+        return _minor;
+    }
+}

Propchange: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date