You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/02/23 12:36:45 UTC
svn commit: r510912 - in /incubator/qpid/branches/qpid.0-9/java:
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/handler/
broker/src/main/java/org/apache/qpid/server/queue/
broker/src/main/java/org/apache/qpid/s...
Author: gsim
Date: Fri Feb 23 03:36:44 2007
New Revision: 510912
URL: http://svn.apache.org/viewvc?view=rev&rev=510912
Log:
Implementation of queue.unbind & message.get
Added:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (with props)
incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java (with props)
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Feb 23 03:36:44 2007
@@ -22,7 +22,6 @@
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.MessageOkBody;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
@@ -32,12 +31,15 @@
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MessageAppendBody;
import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageGetBody;
import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.framing.MessageTransferBody;
import org.apache.qpid.framing.RequestManager;
import org.apache.qpid.framing.ResponseManager;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.protocol.RequestToken;
import org.apache.qpid.server.ack.TxAck;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
@@ -349,6 +351,12 @@
_session.getProtocolMinorVersion()
);
_session.writeResponse(_channelId, msg.getRequestId(), ok);
+ }
+
+ public void deliverGet(RequestToken<MessageGetBody> request, long deliveryTag, AMQMessage msg)
+ {
+ request.respond(MessageOkBody.createMethodBody(request.getMajor(), request.getMinor()));
+ deliver(msg, request.getRequest().destination, deliveryTag);
}
public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag)
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageGetHandler.java Fri Feb 23 03:36:44 2007
@@ -28,6 +28,7 @@
import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.RequestToken;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.ConsumerTagNotUniqueException;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -89,13 +90,11 @@
{
try
{
- if(queue.performGet(session, channel, !body.noAck))
- {
- session.writeResponse(evt, MessageOkBody.createMethodBody(
- session.getProtocolMajorVersion(), // AMQP major version
- session.getProtocolMinorVersion())); // AMQP minor version
- }
- else
+ RequestToken<MessageGetBody> request =
+ new RequestToken<MessageGetBody>(session, evt,
+ session.getProtocolMajorVersion(),
+ session.getProtocolMinorVersion());
+ if(!queue.performGet(request, channel))
{
session.writeResponse(evt, MessageEmptyBody.createMethodBody(
session.getProtocolMajorVersion(), // AMQP major version
Added: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?view=auto&rev=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Fri Feb 23 03:36:44 2007
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
+
+public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
+{
+ private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
+
+ private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
+
+ public static QueueUnbindHandler getInstance()
+ {
+ return _instance;
+ }
+
+ private QueueUnbindHandler() {}
+
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueUnbindBody> evt) throws AMQException
+ {
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ final QueueUnbindBody body = evt.getMethod();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
+ final AMQQueue queue = queueRegistry.getQueue(body.queue);
+ if (queue == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Queue " + body.queue + " does not exist.");
+ }
+ final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+ if (exch == null)
+ {
+ throw body.getChannelException(AMQConstant.NOT_FOUND.getCode(), "Exchange " + body.exchange + " does not exist.");
+ }
+
+ queue.unbind(body.routingKey, exch);//TODO: this should take the args as well
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Unbinding queue " + queue + " from exchange " + exch + " with routing key " + body.routingKey);
+ }
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQMethodBody response = QueueUnbindOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion()); // AMQP minor version
+ session.writeResponse(evt, response);
+ }
+}
Propchange: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Feb 23 03:36:44 2007
@@ -626,14 +626,7 @@
{
throw new Error("XXX");
}
- public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
- {
- throw new Error("XXX");
- }
- private ByteBuffer createEncodedGetOkFrame(int channelId, long deliveryTag, int queueSize)
- {
- throw new Error("XXX");
- }
+
// Robert Godfrey added these in r503604
public long getArrivalTime()
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Feb 23 03:36:44 2007
@@ -24,6 +24,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.protocol.RequestToken;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.management.Managable;
import org.apache.qpid.server.management.ManagedObject;
@@ -349,6 +351,11 @@
_bindings.addBinding(routingKey, exchange);
}
+ public void unbind(AMQShortString routingKey, Exchange exchange) throws AMQException
+ {
+ _bindings.unbind(routingKey, exchange);
+ }
+
public void registerProtocolSession(AMQProtocolSession ps, int channel, AMQShortString consumerTag, boolean acks,
FieldTable filters, boolean noLocal, boolean exclusive)
throws AMQException
@@ -598,9 +605,9 @@
}
}
- public boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException
+ public boolean performGet(RequestToken<MessageGetBody> request, AMQChannel channel) throws AMQException
{
- return _deliveryMgr.performGet(session, channel, acks);
+ return _deliveryMgr.performGet(request, channel);
}
public QueueRegistry getQueueRegistry()
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Fri Feb 23 03:36:44 2007
@@ -25,6 +25,8 @@
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.protocol.RequestToken;
import org.apache.qpid.server.configuration.Configurator;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.AMQChannel;
@@ -185,8 +187,9 @@
}
}
- public boolean performGet(AMQProtocolSession protocolSession, AMQChannel channel, boolean acks) throws AMQException
+ public boolean performGet(RequestToken<MessageGetBody> request, AMQChannel channel) throws AMQException
{
+ final boolean acks = !request.getRequest().noAck;
AMQMessage msg = getNextMessage();
if(msg == null)
{
@@ -223,7 +226,7 @@
channel.addUnacknowledgedMessage(msg, deliveryTag, null, _queue);
}
- msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount());
+ channel.deliverGet(request, deliveryTag, msg);
_totalMessageSize.addAndGet(-msg.getSize());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Fri Feb 23 03:36:44 2007
@@ -22,6 +22,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.protocol.RequestToken;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -81,7 +83,7 @@
void populatePreDeliveryQueue(Subscription subscription);
- boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException;
+ boolean performGet(RequestToken<MessageGetBody> request, AMQChannel channel) throws AMQException;
long getTotalMessageSize();
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java Fri Feb 23 03:36:44 2007
@@ -44,6 +44,7 @@
{
this.routingKey = routingKey;
this.exchange = exchange;
+ if(exchange == null) throw new NullPointerException("Can't create binding for null exchange");
}
void unbind(AMQQueue queue) throws AMQException
@@ -70,7 +71,8 @@
{
if (!(o instanceof ExchangeBinding)) return false;
ExchangeBinding eb = (ExchangeBinding) o;
- return exchange.equals(eb.exchange) && routingKey.equals(eb.routingKey);
+ return exchange.equals(eb.exchange)
+ && (routingKey == null ? eb.routingKey == null : routingKey.equals(eb.routingKey));
}
}
@@ -91,6 +93,15 @@
void addBinding(AMQShortString routingKey, Exchange exchange)
{
_bindings.add(new ExchangeBinding(routingKey, exchange));
+ }
+
+ void unbind(AMQShortString routingKey, Exchange exchange) throws AMQException
+ {
+ ExchangeBinding b = new ExchangeBinding(routingKey, exchange);
+ if (_bindings.remove(b))
+ {
+ b.unbind(_queue);
+ }
}
/**
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=510912&r1=510911&r2=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Feb 23 03:36:44 2007
@@ -130,6 +130,7 @@
frame2handlerMap.put(MessageResumeBody.class, MessageResumeHandler.getInstance());
frame2handlerMap.put(MessageTransferBody.class, MessageTransferHandler.getInstance());
frame2handlerMap.put(QueueBindBody.class, QueueBindHandler.getInstance());
+ frame2handlerMap.put(QueueUnbindBody.class, QueueUnbindHandler.getInstance());
frame2handlerMap.put(QueueDeclareBody.class, QueueDeclareHandler.getInstance());
frame2handlerMap.put(QueueDeleteBody.class, QueueDeleteHandler.getInstance());
frame2handlerMap.put(QueuePurgeBody.class, QueuePurgeHandler.getInstance());
Added: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java?view=auto&rev=510912
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java Fri Feb 23 03:36:44 2007
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * Allows the context for a request to be passed around, simplying the
+ * task of responding to it.
+ */
+public class RequestToken<M extends AMQMethodBody>
+{
+ private final AMQProtocolWriter _session;
+ private final AMQMethodEvent<M> _request;
+ private final byte _major;
+ private final byte _minor;
+
+ public RequestToken(AMQProtocolWriter session, AMQMethodEvent<M> request, byte major, byte minor)
+ {
+ _session = session;
+ _request = request;
+ _major = major;
+ _minor = minor;
+ }
+
+ /**
+ * Sends a response to the request this token represents.
+ */
+ public void respond(AMQMethodBody response)
+ {
+ _session.writeResponse(_request.getChannelId(), _request.getRequestId(), response);
+ }
+
+ /**
+ * Provides access to the original request
+ */
+ public M getRequest()
+ {
+ return _request.getMethod();
+ }
+
+ public byte getMajor()
+ {
+ return _major;
+ }
+
+ public byte getMinor()
+ {
+ return _minor;
+ }
+}
Propchange: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/RequestToken.java
------------------------------------------------------------------------------
svn:keywords = Rev Date