You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/02/20 16:52:05 UTC
svn commit: r509616 - in
/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server:
./ handler/ protocol/ queue/
Author: gsim
Date: Tue Feb 20 07:52:04 2007
New Revision: 509616
URL: http://svn.apache.org/viewvc?view=rev&rev=509616
Log:
Some fixes to get more python tests passing.
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Feb 20 07:52:04 2007
@@ -141,6 +141,11 @@
private Set<Long> _browsedAcks = new HashSet<Long>();
+ /**
+ * Used in creating unique references.
+ */
+ private byte _refCounter;
+
// XXX: clean up arguments
public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener)
{
@@ -218,7 +223,7 @@
_prefetch_HighWaterMark = prefetchCount;
}
- public void addMessageTransfer(MessageTransferBody transferBody, AMQProtocolSession publisher) throws AMQException
+ public void addMessageTransfer(MessageTransferBody transferBody, long requestId, AMQProtocolSession publisher) throws AMQException
{
Content body = transferBody.getBody();
AMQMessage message;
@@ -226,14 +231,20 @@
case INLINE_T:
message = new AMQMessage(_messageStore, transferBody, Collections.singletonList(body.getContent()), _txnContext);
message.setPublisher(publisher);
+ message.setRequestId(requestId);
routeCurrentMessage(message);
- message.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
break;
case REF_T:
- AMQReference ref = getReference(body.getContentAsByteArray());
- message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext);
- message.setPublisher(publisher);
- ref.addRefTransferBody(message);
+ try {
+ AMQReference ref = getReference(body.getContentAsByteArray());
+ message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext);
+ message.setPublisher(publisher);
+ message.setRequestId(requestId);
+ ref.addRefTransferBody(message);
+ } catch (IllegalArgumentException e) {
+ throw transferBody.getConnectionException(503, "Reference is not open");
+ }
+
break;
}
}
@@ -277,24 +288,35 @@
return ref;
}
- public void addMessageOpen(MessageOpenBody open)
+ public void addMessageOpen(MessageOpenBody open) throws AMQException
{
- createReference(open.reference);
+ try {
+ createReference(open.reference);
+ } catch (IllegalArgumentException e) {
+ throw open.getConnectionException(503, "Reference is already open");
+ }
}
- public void addMessageAppend(MessageAppendBody append)
+ public void addMessageAppend(MessageAppendBody append) throws AMQException
{
- AMQReference ref = getReference(append.reference);
- ref.appendContent(ByteBuffer.wrap(append.bytes));
+ try {
+ AMQReference ref = getReference(append.reference);
+ ref.appendContent(ByteBuffer.wrap(append.bytes));
+ } catch (IllegalArgumentException e) {
+ throw append.getConnectionException(503, "Reference is not open");
+ }
}
public void addMessageClose(MessageCloseBody close) throws AMQException
{
- AMQReference ref = removeReference(close.reference);
- for (AMQMessage msg : ref.getMessageList())
- {
- routeCurrentMessage(msg);
- msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
+ try {
+ AMQReference ref = removeReference(close.reference);
+ for (AMQMessage msg : ref.getMessageList())
+ {
+ routeCurrentMessage(msg);
+ }
+ } catch (IllegalArgumentException e) {
+ throw close.getConnectionException(503, "Reference is not open");
}
}
@@ -308,38 +330,18 @@
{
_returnMessages.add(e);
}
+ msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
+
+ MessageOkBody ok = MessageOkBody.createMethodBody(
+ _session.getProtocolMajorVersion(),
+ _session.getProtocolMinorVersion()
+ );
+ _session.writeResponse(_channelId, msg.getRequestId(), ok);
}
public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag)
{
- // Do we need to refactor the content for a different frame size?
- long maxFrameSize = _session.getFrameMax();
- Iterable<ByteBuffer> contentItr = msg.getContents();
- if (msg.getSize() > maxFrameSize)
- {
- Iterator<ByteBuffer> cItr = contentItr.iterator();
- if (cItr.next().limit() > maxFrameSize) // First chunk should equal incoming frame size
- {
- // TODO - Refactor the chunks for smaller outbound frame size
- throw new Error("XXX TODO - need to refactor content chunks here");
- // deliverRef(msg, destination, deliveryTag);
- }
- else
- {
- // Use ref content as is - no need to refactor
- deliverRef(msg, destination, deliveryTag);
- }
- }
- else
- {
- // Concatenate - all incoming chunks will fit into single outbound frame
- deliverInline(msg, destination, deliveryTag);
- }
- }
-
- public void deliverInline(AMQMessage msg, AMQShortString destination, final long deliveryTag)
- {
- deliverInline(msg, destination, new AMQMethodListener()
+ AMQMethodListener listener = new AMQMethodListener()
{
public boolean methodReceived(AMQMethodEvent evt) throws AMQException
{
@@ -361,9 +363,20 @@
}
}
public void error(Exception e) {}
- });
+ };
+ long maxFrameSize = _session.getFrameMax();
+ if (msg.getFullSize() > maxFrameSize)
+ {
+ //need to send as reference
+ deliverRef(msg, destination, listener);
+ }
+ else
+ {
+ //message will fit inline
+ deliverInline(msg, destination, listener);
+ }
}
-
+
public void deliverInline(AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
{
MessageTransferBody mtb = msg.getTransferBody().copy();
@@ -378,64 +391,37 @@
mtb.body = new Content(Content.TypeEnum.INLINE_T, buf);
_session.writeRequest(_channelId, mtb, listener);
}
-
- public void deliverRef(final AMQMessage msg, final AMQShortString destination, final long deliveryTag)
- {
- final byte[] refId = String.valueOf(System.currentTimeMillis()).getBytes();
- deliverRef(refId, msg, destination, new AMQMethodListener()
- {
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
- {
- AMQMethodBody method = evt.getMethod();
- if (_log.isDebugEnabled())
- {
- _log.debug(method + " received on channel " + _channelId);
- }
- // XXX: multiple?
- if (method instanceof MessageOkBody)
- {
- acknowledgeMessage(deliveryTag, false);
- return true;
- }
- else
- {
- // TODO: implement reject
- return false;
- }
- }
- public void error(Exception e) {}
- });
+
+ private synchronized byte[] nextRefId() {
+ return new byte[]{_refCounter++};
}
- public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
+ public void deliverRef(AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
{
- AMQMethodBody openBody = MessageOpenBody.createMethodBody(
- _session.getProtocolMajorVersion(), // AMQP major version
- _session.getProtocolMinorVersion(), // AMQP minor version
- refId);
- _session.writeRequest(_channelId, openBody, listener);
+ AMQMethodListener dummy = new AMQMethodListener()
+ {
+ public boolean methodReceived(AMQMethodEvent evt){ return true; }
+ public void error(Exception e) {}
+ };
+ byte major = _session.getProtocolMajorVersion();
+ byte minor = _session.getProtocolMinorVersion();
+ byte[] refId = nextRefId();
+ _session.writeRequest(_channelId, MessageOpenBody.createMethodBody(major, minor, refId), dummy);
MessageTransferBody mtb = msg.getTransferBody().copy();
mtb.destination = destination;
- mtb.redelivered = msg.isRedelivered();
mtb.body = new Content(Content.TypeEnum.REF_T, refId);
_session.writeRequest(_channelId, mtb, listener);
- for (ByteBuffer bb : msg.getContents())
- {
- ByteBuffer dup = bb.duplicate();
- byte[] ba = new byte[dup.limit()];
- dup.get(ba);
- AMQMethodBody appendBody = MessageAppendBody.createMethodBody(
- _session.getProtocolMajorVersion(), // AMQP major version
- _session.getProtocolMinorVersion(), // AMQP minor version
- ba,
- refId);
- _session.writeRequest(_channelId, appendBody, listener);
- }
- AMQMethodBody closeBody = MessageCloseBody.createMethodBody(
- _session.getProtocolMajorVersion(), // AMQP major version
- _session.getProtocolMinorVersion(), // AMQP minor version
- refId);
- _session.writeRequest(_channelId, closeBody, listener);
+ for (ByteBuffer buffer : msg.getContents())
+ {
+ //TODO: try and avoid all this copying!
+ while (buffer.remaining() > 0)
+ {
+ byte[] data = new byte[Math.min((int) _session.getFrameMax(), buffer.remaining())];
+ buffer.get(data);
+ _session.writeRequest(_channelId, MessageAppendBody.createMethodBody(major, minor, data, refId), dummy);
+ }
+ }
+ _session.writeRequest(_channelId, MessageCloseBody.createMethodBody(major, minor, refId), dummy);
}
public RequestManager getRequestManager()
@@ -554,6 +540,7 @@
for (UnacknowledgedMessage unacked : messagesToBeDelivered)
{
+ unacked.message.setRedelivered(true);
if (unacked.queue != null)
{
_txnContext.deliver(unacked.message, unacked.queue);
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Tue Feb 20 07:52:04 2007
@@ -66,7 +66,7 @@
}
else
{
- virtualHostName = String.valueOf(body.virtualHost);
+ virtualHostName = body.virtualHost == null ? null : String.valueOf(body.virtualHost);
}
VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java Tue Feb 20 07:52:04 2007
@@ -24,6 +24,7 @@
import org.apache.qpid.framing.MessageQosBody;
import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -46,7 +47,9 @@
public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageQosBody> evt) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ channel.setPrefetchCount(evt.getMethod().prefetchCount);
+ channel.setPrefetchSize(evt.getMethod().prefetchSize);
// Be aware of possible changes to parameter order as versions change.
session.writeResponse(evt.getChannelId(), evt.getRequestId(), MessageOkBody.createMethodBody(
session.getProtocolMajorVersion(), // AMQP major version
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java Tue Feb 20 07:52:04 2007
@@ -78,10 +78,10 @@
// is stored in the channel. Once the final body frame has been received
// it is routed to the exchange.
AMQChannel channel = session.getChannel(evt.getChannelId());
- channel.addMessageTransfer(body, session);
- session.writeResponse(evt, MessageOkBody.createMethodBody(
- session.getProtocolMajorVersion(), // AMQP major version
- session.getProtocolMinorVersion())); // AMQP minor version
+ channel.addMessageTransfer(body, evt.getRequestId(), session);
+ //session.writeResponse(evt, MessageOkBody.createMethodBody(
+ // session.getProtocolMajorVersion(), // AMQP major version
+ // session.getProtocolMinorVersion())); // AMQP minor version
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue Feb 20 07:52:04 2007
@@ -610,8 +610,6 @@
task.doTask(this);
}
}
-// gsim-python
-// _minaProtocolSession.close();
}
/**
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Tue Feb 20 07:52:04 2007
@@ -168,6 +168,7 @@
// gsim-python
//session.closeSessionRequest(200, new AMQShortString(throwable.getMessage()));
session.closeSession();
+ protocolSession.close();
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Feb 20 07:52:04 2007
@@ -80,8 +80,6 @@
}
};
- private boolean _redelivered;
-
private final Long _messageId;
private final AtomicInteger _referenceCount = new AtomicInteger(1);
@@ -119,6 +117,7 @@
private boolean _deliveredToConsumer;
private AtomicBoolean _taken = new AtomicBoolean(false);
+ private long _requestId;//the request id of the transfer that this message represents
public AMQMessage(MessageStore messageStore, MessageTransferBody transferBody, TransactionalContext txnContext)
{
@@ -160,6 +159,16 @@
public long getSize()
{
+ //based on existing usage, this should return the size of the
+ //data and inline data will already be included in the count
+ //by getBodySize()
+ return getBodySize();
+ }
+
+ public long getFullSize()
+ {
+ //this is used in determining whether a message can be inlined
+ //or not and therefore must include the header size also
return getHeaderSize() + getBodySize();
}
@@ -300,11 +309,11 @@
_transferBody.priority = priority;
}
- // TODO - how does this relate to the _redelivered flag in this class? See other isRedelivered() method below.
-// public boolean isRedelivered()
-// {
-// return _transferBody.getRedelivered();
-// }
+
+ public boolean isRedelivered()
+ {
+ return _transferBody.getRedelivered();
+ }
public AMQShortString getReplyTo()
{
@@ -406,12 +415,6 @@
//return _bodyLengthReceived == _contentHeaderBody.bodySize;
}
-
- public boolean isRedelivered()
- {
- return _redelivered;
- }
-
NoConsumersException getNoConsumersException(String queue)
{
return new NoConsumersException(queue, this);
@@ -420,7 +423,6 @@
public void setRedelivered(boolean redelivered)
{
_transferBody.redelivered = redelivered;
- _redelivered = redelivered;
}
public long getMessageId()
@@ -636,6 +638,16 @@
public long getArrivalTime()
{
throw new Error("XXX");
+ }
+
+ public void setRequestId(long requestId)
+ {
+ _requestId = requestId;
+ }
+
+ public long getRequestId()
+ {
+ return _requestId;
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Feb 20 07:52:04 2007
@@ -371,25 +371,6 @@
setExclusive(true);
}
- if(incrementSubscriberCount() > 1)
- {
- if(isExclusive())
- {
- decrementSubscriberCount();
- throw EXISTING_EXCLUSIVE;
- }
- else if(exclusive)
- {
- decrementSubscriberCount();
- throw EXISTING_SUBSCRIPTION;
- }
-
- }
- else if(exclusive)
- {
- setExclusive(true);
- }
-
debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Feb 20 07:52:04 2007
@@ -255,7 +255,6 @@
while (msg != null)
{
msg.dequeue(storeContext, _queue);
- count++;
_totalMessageSize.set(0L);
count++;
msg = poll();