You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/02/20 16:52:05 UTC

svn commit: r509616 - in /incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server: ./ handler/ protocol/ queue/

Author: gsim
Date: Tue Feb 20 07:52:04 2007
New Revision: 509616

URL: http://svn.apache.org/viewvc?view=rev&rev=509616
Log:
Some fixes to get more python tests passing.


Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Feb 20 07:52:04 2007
@@ -141,6 +141,11 @@
 
     private Set<Long> _browsedAcks = new HashSet<Long>();
 
+    /**
+     * Used in creating unique references. 
+     */
+    private byte _refCounter;
+
     // XXX: clean up arguments
     public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener)
     {
@@ -218,7 +223,7 @@
         _prefetch_HighWaterMark = prefetchCount;
     }
 
-    public void addMessageTransfer(MessageTransferBody transferBody, AMQProtocolSession publisher) throws AMQException
+    public void addMessageTransfer(MessageTransferBody transferBody, long requestId, AMQProtocolSession publisher) throws AMQException
     {
         Content body = transferBody.getBody();
         AMQMessage message;
@@ -226,14 +231,20 @@
         case INLINE_T:
             message = new AMQMessage(_messageStore, transferBody, Collections.singletonList(body.getContent()), _txnContext);
             message.setPublisher(publisher);
+            message.setRequestId(requestId);
             routeCurrentMessage(message);
-            message.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
             break;
         case REF_T:
-            AMQReference ref = getReference(body.getContentAsByteArray());
-            message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext);
-            message.setPublisher(publisher);
-            ref.addRefTransferBody(message);
+            try {
+                AMQReference ref = getReference(body.getContentAsByteArray());
+                message = new AMQMessage(_messageStore, transferBody, ref.getContentList(), _txnContext);
+                message.setPublisher(publisher);
+                message.setRequestId(requestId);
+                ref.addRefTransferBody(message);
+            } catch (IllegalArgumentException e) {
+                throw transferBody.getConnectionException(503, "Reference is not open");
+            }
+
             break;
         }
     }
@@ -277,24 +288,35 @@
         return ref;
     }
 
-    public void addMessageOpen(MessageOpenBody open)
+    public void addMessageOpen(MessageOpenBody open) throws AMQException
     {
-        createReference(open.reference);
+        try {
+            createReference(open.reference);
+        } catch (IllegalArgumentException e) {
+            throw open.getConnectionException(503, "Reference is already open");
+        }
     }
 
-    public void addMessageAppend(MessageAppendBody append)
+    public void addMessageAppend(MessageAppendBody append) throws AMQException
     {
-        AMQReference ref = getReference(append.reference);
-        ref.appendContent(ByteBuffer.wrap(append.bytes));
+        try {
+            AMQReference ref = getReference(append.reference);
+            ref.appendContent(ByteBuffer.wrap(append.bytes));
+        } catch (IllegalArgumentException e) {
+            throw append.getConnectionException(503, "Reference is not open");
+        }
     }
 
     public void addMessageClose(MessageCloseBody close) throws AMQException
     {
-        AMQReference ref = removeReference(close.reference);
-        for (AMQMessage msg : ref.getMessageList())
-        {
-            routeCurrentMessage(msg);
-            msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
+        try {
+            AMQReference ref = removeReference(close.reference);
+            for (AMQMessage msg : ref.getMessageList())
+            {
+                routeCurrentMessage(msg);
+            }
+        } catch (IllegalArgumentException e) {
+            throw close.getConnectionException(503, "Reference is not open");
         }
     }
 
@@ -308,38 +330,18 @@
         {
             _returnMessages.add(e);
         }
+        msg.routingComplete(_messageStore, _storeContext, _messageHandleFactory);
+
+        MessageOkBody ok = MessageOkBody.createMethodBody(
+                               _session.getProtocolMajorVersion(),
+                               _session.getProtocolMinorVersion()
+                           );
+        _session.writeResponse(_channelId, msg.getRequestId(), ok);
     }
 
     public void deliver(AMQMessage msg, AMQShortString destination, final long deliveryTag)
     {
-        // Do we need to refactor the content for a different frame size?
-        long maxFrameSize = _session.getFrameMax();
-        Iterable<ByteBuffer> contentItr = msg.getContents();
-        if (msg.getSize() > maxFrameSize)
-        {
-            Iterator<ByteBuffer> cItr = contentItr.iterator();
-            if (cItr.next().limit() > maxFrameSize) // First chunk should equal incoming frame size
-            {
-                // TODO - Refactor the chunks for smaller outbound frame size
-                throw new Error("XXX TODO - need to refactor content chunks here");
-                // deliverRef(msg, destination, deliveryTag);
-            }
-            else
-            {
-                // Use ref content as is - no need to refactor
-                deliverRef(msg, destination, deliveryTag);
-            }
-        }
-        else
-        {
-            // Concatenate - all incoming chunks will fit into single outbound frame
-            deliverInline(msg, destination, deliveryTag);
-        }
-    }
-    
-    public void deliverInline(AMQMessage msg, AMQShortString destination, final long deliveryTag)
-    {
-        deliverInline(msg, destination, new AMQMethodListener()
+        AMQMethodListener listener = new AMQMethodListener()
         {
             public boolean methodReceived(AMQMethodEvent evt) throws AMQException
             {
@@ -361,9 +363,20 @@
                 }
             }
             public void error(Exception e) {}
-        });
+        };
+        long maxFrameSize = _session.getFrameMax();
+        if (msg.getFullSize() > maxFrameSize)
+        {
+            //need to send as reference
+            deliverRef(msg, destination, listener);
+        }
+        else
+        {
+            //message will fit inline 
+            deliverInline(msg, destination, listener);
+        }
     }
-
+    
     public void deliverInline(AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
     {
         MessageTransferBody mtb = msg.getTransferBody().copy();
@@ -378,64 +391,37 @@
         mtb.body = new Content(Content.TypeEnum.INLINE_T, buf);
         _session.writeRequest(_channelId, mtb, listener);
     }
-    
-    public void deliverRef(final AMQMessage msg, final AMQShortString destination, final long deliveryTag)
-    {
-        final byte[] refId = String.valueOf(System.currentTimeMillis()).getBytes();
-        deliverRef(refId, msg, destination, new AMQMethodListener()
-        {
-            public boolean methodReceived(AMQMethodEvent evt) throws AMQException
-            {
-                AMQMethodBody method = evt.getMethod();
-                if (_log.isDebugEnabled())
-                {
-                    _log.debug(method + " received on channel " + _channelId);
-                }
-                // XXX: multiple?
-                if (method instanceof MessageOkBody)
-                {
-                    acknowledgeMessage(deliveryTag, false);
-                    return true;
-                }
-                else
-                {
-                    // TODO: implement reject
-                    return false;
-                }
-            }
-            public void error(Exception e) {}
-        });
+
+    private synchronized byte[] nextRefId() {
+        return new byte[]{_refCounter++};
     }
     
-    public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
+    public void deliverRef(AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
     {
-        AMQMethodBody openBody = MessageOpenBody.createMethodBody(
-            _session.getProtocolMajorVersion(), // AMQP major version
-            _session.getProtocolMinorVersion(), // AMQP minor version
-            refId);
-        _session.writeRequest(_channelId, openBody, listener);
+        AMQMethodListener dummy = new AMQMethodListener()
+        { 
+            public boolean methodReceived(AMQMethodEvent evt){ return true; } 
+            public void error(Exception e) {}
+        };
+        byte major = _session.getProtocolMajorVersion();
+        byte minor = _session.getProtocolMinorVersion();
+        byte[] refId = nextRefId();
+        _session.writeRequest(_channelId, MessageOpenBody.createMethodBody(major, minor, refId), dummy);
         MessageTransferBody mtb = msg.getTransferBody().copy();
         mtb.destination = destination;
-        mtb.redelivered = msg.isRedelivered();
         mtb.body = new Content(Content.TypeEnum.REF_T, refId);
         _session.writeRequest(_channelId, mtb, listener);
-        for (ByteBuffer bb : msg.getContents())
-        {
-            ByteBuffer dup = bb.duplicate();
-            byte[] ba = new byte[dup.limit()];
-            dup.get(ba);
-        	AMQMethodBody appendBody = MessageAppendBody.createMethodBody(
-                _session.getProtocolMajorVersion(), // AMQP major version
-                _session.getProtocolMinorVersion(), // AMQP minor version
-                ba,
-                refId);
-            _session.writeRequest(_channelId, appendBody, listener);
-        }
-        AMQMethodBody closeBody = MessageCloseBody.createMethodBody(
-            _session.getProtocolMajorVersion(), // AMQP major version
-            _session.getProtocolMinorVersion(), // AMQP minor version
-            refId);
-        _session.writeRequest(_channelId, closeBody, listener);
+        for (ByteBuffer buffer : msg.getContents()) 
+        {            
+            //TODO: try and avoid all this copying!
+            while (buffer.remaining() > 0) 
+            {
+                byte[] data = new byte[Math.min((int) _session.getFrameMax(), buffer.remaining())];
+                buffer.get(data);
+                _session.writeRequest(_channelId, MessageAppendBody.createMethodBody(major, minor, data, refId), dummy);
+            }
+        }
+        _session.writeRequest(_channelId, MessageCloseBody.createMethodBody(major, minor, refId), dummy);
     }
 
     public RequestManager getRequestManager()
@@ -554,6 +540,7 @@
 
         for (UnacknowledgedMessage unacked : messagesToBeDelivered)
         {
+            unacked.message.setRedelivered(true);
             if (unacked.queue != null)
             {
                 _txnContext.deliver(unacked.message, unacked.queue);

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Tue Feb 20 07:52:04 2007
@@ -66,7 +66,7 @@
         }
         else
         {
-            virtualHostName = String.valueOf(body.virtualHost);
+            virtualHostName = body.virtualHost == null ? null : String.valueOf(body.virtualHost);
         }
 
         VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageQosHandler.java Tue Feb 20 07:52:04 2007
@@ -24,6 +24,7 @@
 import org.apache.qpid.framing.MessageQosBody;
 import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -46,7 +47,9 @@
     public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageQosBody> evt) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
-        session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
+        AMQChannel channel = session.getChannel(evt.getChannelId());
+        channel.setPrefetchCount(evt.getMethod().prefetchCount);
+        channel.setPrefetchSize(evt.getMethod().prefetchSize);
         // Be aware of possible changes to parameter order as versions change.
         session.writeResponse(evt.getChannelId(), evt.getRequestId(), MessageOkBody.createMethodBody(
             session.getProtocolMajorVersion(), // AMQP major version

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageTransferHandler.java Tue Feb 20 07:52:04 2007
@@ -78,10 +78,10 @@
             // is stored in the channel. Once the final body frame has been received
             // it is routed to the exchange.
             AMQChannel channel = session.getChannel(evt.getChannelId());
-            channel.addMessageTransfer(body, session);
-            session.writeResponse(evt, MessageOkBody.createMethodBody(
-                session.getProtocolMajorVersion(), // AMQP major version
-                session.getProtocolMinorVersion())); // AMQP minor version
+            channel.addMessageTransfer(body, evt.getRequestId(), session);
+            //session.writeResponse(evt, MessageOkBody.createMethodBody(
+            //    session.getProtocolMajorVersion(), // AMQP major version
+            //    session.getProtocolMinorVersion())); // AMQP minor version
         }
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue Feb 20 07:52:04 2007
@@ -610,8 +610,6 @@
                 task.doTask(this);
             }
         }
-// gsim-python
-//        _minaProtocolSession.close();
     }
 
     /**

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Tue Feb 20 07:52:04 2007
@@ -168,6 +168,7 @@
             // gsim-python
             //session.closeSessionRequest(200, new AMQShortString(throwable.getMessage()));
             session.closeSession();
+            protocolSession.close();
         }
     }
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Feb 20 07:52:04 2007
@@ -80,8 +80,6 @@
         }
     };
 
-    private boolean _redelivered;
-
     private final Long _messageId;
 
     private final AtomicInteger _referenceCount = new AtomicInteger(1);
@@ -119,6 +117,7 @@
     private boolean _deliveredToConsumer;
     private AtomicBoolean _taken = new AtomicBoolean(false);
 
+    private long _requestId;//the request id of the transfer that this message represents
 
     public AMQMessage(MessageStore messageStore, MessageTransferBody transferBody, TransactionalContext txnContext)
     {
@@ -160,6 +159,16 @@
 
     public long getSize()
     {
+        //based on existing usage, this should return the size of the
+        //data and inline data will already be included in the count
+        //by getBodySize()
+        return getBodySize();
+    }
+
+    public long getFullSize()
+    {
+        //this is used in determining whether a message can be inlined
+        //or not and therefore must include the header size also
         return getHeaderSize() + getBodySize();
     }
 
@@ -300,11 +309,11 @@
         _transferBody.priority = priority;
     }
 
-    // TODO - how does this relate to the _redelivered flag in this class? See other isRedelivered() method below.    
-//     public boolean isRedelivered()
-//     {
-//         return _transferBody.getRedelivered();
-//     }
+
+    public boolean isRedelivered()
+    {
+        return _transferBody.getRedelivered();
+    }
     
     public AMQShortString getReplyTo()
     {
@@ -406,12 +415,6 @@
         //return _bodyLengthReceived == _contentHeaderBody.bodySize;
     }
 
-
-    public boolean isRedelivered()
-    {
-        return _redelivered;
-    }
-
     NoConsumersException getNoConsumersException(String queue)
     {
         return new NoConsumersException(queue, this);
@@ -420,7 +423,6 @@
     public void setRedelivered(boolean redelivered)
     {
         _transferBody.redelivered = redelivered;
-        _redelivered = redelivered;
     }
 
     public long getMessageId()
@@ -636,6 +638,16 @@
     public long getArrivalTime()
     {
         throw new Error("XXX");
+    }
+
+    public void setRequestId(long requestId) 
+    {
+        _requestId = requestId;
+    }
+
+    public long getRequestId() 
+    {
+        return _requestId;
     }
 
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Tue Feb 20 07:52:04 2007
@@ -371,25 +371,6 @@
             setExclusive(true);
         }
 
-        if(incrementSubscriberCount() > 1)
-        {
-            if(isExclusive())
-            {
-                decrementSubscriberCount();
-                throw EXISTING_EXCLUSIVE;
-            }
-            else if(exclusive)
-            {
-                decrementSubscriberCount();
-                throw EXISTING_SUBSCRIPTION;
-            }
-
-        }
-        else if(exclusive)
-        {
-            setExclusive(true);
-        }
-
         debug("Registering protocol session {0} with channel {1} and consumer tag {2} with {3}", ps, channel, consumerTag, this);
 
         Subscription subscription = _subscriptionFactory.createSubscription(channel, ps, consumerTag, acks, filters, noLocal);

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=509616&r1=509615&r2=509616
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Feb 20 07:52:04 2007
@@ -255,7 +255,6 @@
         while (msg != null)
         {
             msg.dequeue(storeContext, _queue);
-            count++;
             _totalMessageSize.set(0L);
             count++;
             msg = poll();