You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/01/23 21:31:11 UTC

svn commit: r499121 - in /incubator/qpid/branches/qpid.0-9/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/main/java/org/apache/qpi...

Author: rhs
Date: Tue Jan 23 12:31:10 2007
New Revision: 499121

URL: http://svn.apache.org/viewvc?view=rev&rev=499121
Log:
removed XXX from resend, centralized message deliver, cleaned up exception handling, added per channel max frame size

Modified:
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
    incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=499121&r1=499120&r2=499121
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Jan 23 12:31:10 2007
@@ -20,11 +20,15 @@
  */
 package org.apache.qpid.server;
 
-import org.apache.qpid.framing.Content;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.MessageOkBody;
 import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.Content;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MessageAppendBody;
 import org.apache.qpid.framing.MessageCloseBody;
@@ -47,6 +51,7 @@
 import org.apache.qpid.server.txn.TxnBuffer;
 import org.apache.qpid.server.txn.TxnOp;
 
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -74,6 +79,7 @@
 
     private RequestManager _requestManager;
     private ResponseManager _responseManager;
+    private AMQProtocolSession _session;
 
     /**
      * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
@@ -94,16 +100,9 @@
     private int _consumerTag;
 
     /**
-     * The set of current messages - which may be partial in the sense that not all frames have been received yet -
-     * which has been received by this channel. As the frames are received the references get updated and once all
-     * frames have been received the message can then be routed.
-     */
-    private Map<String, List<AMQMessage>> _messages = new LinkedHashMap();
-
-    /**
      * The set of open references on this channel.
      */
-    private Map<String, List<MessageAppendBody>> _references = new LinkedHashMap();
+    private Map<String, Reference> _references = new LinkedHashMap();
 
     /**
      * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
@@ -129,16 +128,17 @@
     private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
     private Set<Long> _browsedAcks = new HashSet<Long>();
 
-    public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges, AMQProtocolWriter protocolWriter, AMQMethodListener methodListener)
-            throws AMQException
+    // XXX: clean up arguments
+    public AMQChannel(int channelId, AMQProtocolSession session, MessageStore messageStore, MessageRouter exchanges, AMQMethodListener methodListener)
     {
         _channelId = channelId;
+        _session = session;
         _prefetch_HighWaterMark = DEFAULT_PREFETCH;
         _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
         _messageStore = messageStore;
         _exchanges = exchanges;
-        _requestManager = new RequestManager(channelId, protocolWriter, true);
-    	_responseManager = new ResponseManager(channelId, methodListener, protocolWriter, true);
+        _requestManager = new RequestManager(channelId, _session, true);
+        _responseManager = new ResponseManager(channelId, methodListener, _session, true);
         _txnBuffer = new TxnBuffer(_messageStore);
     }
 
@@ -189,51 +189,54 @@
 
     public void addMessageTransfer(MessageTransferBody transferBody, AMQProtocolSession publisher) throws AMQException
     {
-        AMQMessage message = new AMQMessage(_messageStore, transferBody);
-        message.setPublisher(publisher);
         Content body = transferBody.getBody();
+        AMQMessage message;
         switch (body.getContentType()) {
         case INLINE_T:
+            message = new AMQMessage(_messageStore, transferBody,
+                                                Collections.singletonList(body.getContent()));
+            message.setPublisher(publisher);
             route(message);
             break;
         case REF_T:
-            getMessages(body.getContentAsByteArray()).add(message);
+            Reference ref = getReference(body.getContentAsByteArray());
+            message = new AMQMessage(_messageStore, transferBody, ref.contents);
+            message.setPublisher(publisher);
+            ref.messages.add(message);
             break;
         }
     }
 
-    private List<AMQMessage> getMessages(byte[] reference) {
-        String key = new String(reference);
-        List<AMQMessage> result = _messages.get(key);
-        if (result == null) {
-            throw new IllegalArgumentException(key);
-        }
-        return result;
+    private static String key(byte[] id) {
+        return new String(id);
     }
 
-    private List<MessageAppendBody> getReference(byte[] reference) {
-        String key = new String(reference);
-        List<MessageAppendBody> result = _references.get(key);
-        if (result == null) {
+    private Reference getReference(byte[] id) {
+        String key = key(id);
+        Reference ref = _references.get(key);
+        if (ref == null) {
             throw new IllegalArgumentException(key);
         }
-        return result;
+        return ref;
     }
 
-    private void createReference(byte[] reference) {
-        String key = new String(reference);
+    private Reference createReference(byte[] id) {
+        String key = key(id);
         if (_references.containsKey(key)) {
             throw new IllegalArgumentException(key);
-        } else {
-            _references.put(key, new LinkedList());
-            _messages.put(key, new LinkedList());
         }
+        Reference ref = new Reference();
+        _references.put(key, ref);
+        return ref;
     }
 
-    private void clearReference(byte[] reference) {
-        String key = new String(reference);
-        _references.remove(key);
-        _messages.remove(key);
+    private Reference removeReference(byte[] id) {
+        String key = key(id);
+        Reference ref = _references.remove(key);
+        if (ref == null) {
+            throw new IllegalArgumentException(key);
+        }
+        return ref;
     }
 
     public void addMessageOpen(MessageOpenBody open) {
@@ -241,20 +244,50 @@
     }
 
     public void addMessageAppend(MessageAppendBody append) {
-        getReference(append.reference).add(append);
+        Reference ref = getReference(append.reference);
+        ref.contents.add(ByteBuffer.wrap(append.bytes));
     }
 
     public void addMessageClose(MessageCloseBody close) throws AMQException {
-        List<AMQMessage> messages = getMessages(close.reference);
-        try {
-            for (AMQMessage msg : messages) {
-                route(msg);
-            }
-        } finally {
-            clearReference(close.reference);
+        Reference ref = removeReference(close.reference);
+        for (AMQMessage msg : ref.messages) {
+            route(msg);
         }
     }
 
+    public void deliver(AMQMessage msg, String destination, final long deliveryTag) {
+        deliver(msg, destination, new AMQMethodListener() {
+            public boolean methodReceived(AMQMethodEvent evt) throws AMQException {
+                AMQMethodBody method = evt.getMethod();
+                if (_log.isDebugEnabled()) {
+                    _log.debug(method + " received on channel " + _channelId);
+                }
+                // XXX: multiple?
+                if (method instanceof MessageOkBody) {
+                    acknowledgeMessage(deliveryTag, false);
+                    return true;
+                } else {
+                    // TODO: implement reject
+                    return false;
+                }
+            }
+            public void error(Exception e) {}
+        });
+    }
+
+    public void deliver(AMQMessage msg, String destination, AMQMethodListener listener) {
+        // XXX: should reframe if necessary to conform to max frame size
+        MessageTransferBody mtb = msg.getTransferBody().copy();
+        mtb.destination = destination;
+        ByteBuffer buf = ByteBuffer.allocate((int)msg.getBodySize());
+        for (ByteBuffer bb : msg.getContents()) {
+            buf.put(bb);
+        }
+        buf.flip();
+        mtb.body = new Content(Content.TypeEnum.INLINE_T, buf);
+        _session.writeRequest(_channelId, mtb, listener);
+    }
+
     protected void route(AMQMessage msg) throws AMQException
     {
         if (_transactional)
@@ -452,7 +485,7 @@
                 String consumerTag = entry.getValue().consumerTag;
                 AMQMessage msg = entry.getValue().message;
                 msg.setRedelivered(true);
-                session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+                deliver(msg, consumerTag, deliveryTag);
             }
         }
     }
@@ -865,6 +898,13 @@
         public void rollback()
         {
         }
+    }
+
+    private static class Reference {
+
+        public List<AMQMessage> messages = new LinkedList();
+        public List<ByteBuffer> contents = new LinkedList();
+
     }
 
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?view=diff&rev=499121&r1=499120&r2=499121
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Tue Jan 23 12:31:10 2007
@@ -52,5 +52,6 @@
         }
         protocolSession.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
         protocolSession.initHeartbeats(body.heartbeat);
+        protocolSession.setFrameMax(body.getFrameMax());
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=499121&r1=499120&r2=499121
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue Jan 23 12:31:10 2007
@@ -108,6 +108,8 @@
     private boolean _closed;
     // maximum number of channels this session should have
     private long _maxNoOfChannels = 1000;
+    // XXX: is this spec or should this be set to the configurable default?
+    private long _maxFrameSize = 65536;
 
     /* AMQP Version for this session */
     private byte _major;
@@ -180,8 +182,8 @@
     private AMQChannel createChannel(int id) throws AMQException
     {
         IApplicationRegistry registry = ApplicationRegistry.getInstance();
-        AMQChannel channel = new AMQChannel(id, registry.getMessageStore(),
-                                            _exchangeRegistry, this, _stateManager);
+        AMQChannel channel = new AMQChannel(id, this, registry.getMessageStore(),
+                                            _exchangeRegistry, _stateManager);
         addChannel(channel);
         return channel;
     }
@@ -302,10 +304,8 @@
     }
 
     public long writeRequest(int channelNum, AMQMethodBody methodBody, AMQMethodListener methodListener)
-        throws AMQException
     {
-        if (!checkMethodBodyVersion(methodBody))
-            throw new AMQProtocolVersionException("MethodBody version did not match version of current session.");
+        checkMethodBodyVersion(methodBody);
         AMQChannel channel = getChannel(channelNum);
         RequestManager requestManager = channel.getRequestManager();
         return requestManager.sendRequest(methodBody, methodListener);
@@ -313,23 +313,23 @@
 
     // This version uses this session's instance of AMQStateManager as the listener
     public long writeRequest(int channelNum, AMQMethodBody methodBody)
-        throws AMQException
     {
         return writeRequest(channelNum, methodBody, _stateManager);
     }
 
     public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
-        throws AMQException
     {
-        if (!checkMethodBodyVersion(methodBody))
-            throw new AMQProtocolVersionException("MethodBody version did not match version of current session.");
+        checkMethodBodyVersion(methodBody);
         AMQChannel channel = getChannel(channelNum);
         ResponseManager responseManager = channel.getResponseManager();
-        responseManager.sendResponse(requestId, methodBody);
+        try {
+            responseManager.sendResponse(requestId, methodBody);
+        } catch (RequestResponseMappingException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
-        throws AMQException
     {
         writeResponse(evt.getChannelId(), evt.getRequestId(), response);
     }
@@ -361,16 +361,16 @@
         return new ArrayList<AMQChannel>(_channelMap.values());
     }
 
-    public AMQChannel getChannel(int channelId) throws AMQException
+    public AMQChannel getChannel(int channelId)
     {
         return _channelMap.get(channelId);
     }
 
-    public void addChannel(AMQChannel channel) throws AMQException
+    public void addChannel(AMQChannel channel)
     {
         if (_closed)
         {
-            throw new AMQException("Session is closed");    
+            throw new IllegalStateException("Session is closed");
         }
 
         _channelMap.put(channel.getChannelId(), channel);
@@ -538,6 +538,22 @@
     }
 
     /**
+     * Set the negotiated maximum frame size for this connection.
+     * @param size the size in bytes
+     */
+    public void setFrameMax(long size) {
+        _maxFrameSize = size;
+    }
+
+    /**
+     * Gets the negotiaed maximum frame size for this connection.
+     * @return the size in bytes
+     */
+    public long getFrameMax() {
+        return _maxFrameSize;
+    }
+
+    /**
      * Closes all channels that were opened by this protocol session. This frees up all resources
      * used by the channel.
      *
@@ -649,9 +665,10 @@
     {
         return _major == major && _minor == minor;
     }
-    
-    public boolean checkMethodBodyVersion(AMQMethodBody methodBody)
-    {
-        return versionEquals(methodBody.getMajor(), methodBody.getMinor());
+
+    public void checkMethodBodyVersion(AMQMethodBody methodBody) {
+        if (!versionEquals(methodBody.getMajor(), methodBody.getMinor())) {
+            throw new RuntimeException("MethodBody version did not match version of current session.");
+        }
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=499121&r1=499120&r2=499121
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Jan 23 12:31:10 2007
@@ -82,8 +82,7 @@
     
     void closeSessionResponse(long requestId) throws AMQException;
     
-    void closeSession() throws AMQException;
-    
+
     /**
      * Remove a channel from the session but do not close it.
      * @param channelId
@@ -97,6 +96,24 @@
     void initHeartbeats(int delay);
 
     /**
+     * Set the maximum frame size for this client.
+     * @param size the size in bytes
+     */
+    void setFrameMax(long size);
+
+    /**
+     * Get the maximum frame size for this client.
+     * @return the size in bytes
+     */
+    long getFrameMax();
+
+    /**
+     * This must be called when the session is _closed in order to free up any resources
+     * managed by the session.
+     */
+    void closeSession() throws AMQException;
+
+    /**
      * @return a key that uniquely identifies this session
      */
     Object getKey();
@@ -131,5 +148,5 @@
     byte getMajor();
     byte getMinor();
     boolean versionEquals(byte major, byte minor);
-    boolean checkMethodBodyVersion(AMQMethodBody methodBody);
+    void checkMethodBodyVersion(AMQMethodBody methodBody);
 }

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=499121&r1=499120&r2=499121
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Jan 23 12:31:10 2007
@@ -51,7 +51,7 @@
 
     private final MessageTransferBody _transferBody;
 
-    private List<MessageAppendBody> _contentBodies;
+    private List<ByteBuffer> _contents;
 
     private boolean _redelivered;
 
@@ -101,34 +101,34 @@
         _messageId = messageStore.getNewMessageId();
         _transferBody = transferBody;
         _store = messageStore;
-        _contentBodies = new LinkedList<MessageAppendBody>();
+        _contents = new LinkedList();
         _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
         _storeWhenComplete = storeWhenComplete;
         _taken = new AtomicBoolean(false);
     }
 
     public AMQMessage(MessageStore store, long messageId, MessageTransferBody transferBody,
-                      List<MessageAppendBody> contentBodies)
+                      List<ByteBuffer> contents)
             throws AMQException
 
     {
         _transferBody = transferBody;
-        _contentBodies = contentBodies;
+        _contents = contents;
         _decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
         _messageId = messageId;
         _store = store;
         storeMessage();
     }
 
-    public AMQMessage(MessageStore store, MessageTransferBody transferBody, List<MessageAppendBody> contentBodies)
+    public AMQMessage(MessageStore store, MessageTransferBody transferBody, List<ByteBuffer> contents)
             throws AMQException
     {
-        this(store, store.getNewMessageId(), transferBody, contentBodies);
+        this(store, store.getNewMessageId(), transferBody, contents);
     }
 
     protected AMQMessage(AMQMessage msg) throws AMQException
     {
-        this(msg._store, msg._messageId, msg._transferBody, msg._contentBodies);
+        this(msg._store, msg._messageId, msg._transferBody, msg._contents);
     }
 
     public long getSize() {
@@ -147,21 +147,9 @@
     }
 
     public long getBodySize() {
-        Content body = _transferBody.getBody();
-        switch (body.getContentType()) {
-        case INLINE_T:
-            return _transferBody.getBody().getContent().limit();
-        case REF_T:
-            return getReferenceSize();
-        default:
-            throw new IllegalStateException("unrecognized type: " + body.getContentType());
-        }
-    }
-
-    public long getReferenceSize() {
         long size = 0;
-        for (MessageAppendBody mab : _contentBodies) {
-            size += mab.getBytes().length;
+        for (ByteBuffer buffer : _contents) {
+            size += buffer.limit();
         }
         return size;
     }
@@ -258,56 +246,17 @@
         }
     }
 
-    public CompositeAMQDataBlock getDataBlock(ByteBuffer encodedDeliverBody, int channel)
-    {
-        AMQFrame[] allFrames = new AMQFrame[1 + _contentBodies.size()];
-
-        if (true) throw new Error("XXX");
-        /*allFrames[0] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
-        for (int i = 1; i < allFrames.length; i++)
-        {
-            allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 1));
-            }*/
-        return new CompositeAMQDataBlock(encodedDeliverBody, allFrames);
-    }
-
-    public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
+    public MessageTransferBody getTransferBody()
     {
-        
-        AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
-
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        if (true) throw new Error("XXX");
-        /*
-        allFrames[0] = MessageTransferBody.createAMQFrame(channel,
-        	(byte)0, (byte)9,	// AMQP version (major, minor)
-            consumerTag,	// consumerTag
-        	deliveryTag,	// deliveryTag
-            getExchangeName(),	// exchange
-            _redelivered,	// redelivered
-            getRoutingKey()	// routingKey
-            );
-            allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
-        for (int i = 2; i < allFrames.length; i++)
-        {
-            allFrames[i] = ContentBody.createAMQFrame(channel, _contentBodies.get(i - 2));
-            }*/
-        return new CompositeAMQDataBlock(allFrames);
+        return _transferBody;
     }
 
-    public List<AMQBody> getPayload()
-    {
-        List<AMQBody> payload = new ArrayList<AMQBody>(2 + _contentBodies.size());
-        payload.add(_transferBody);
-        payload.addAll(_contentBodies);
-        return payload;
+    public List<ByteBuffer> getContents() {
+        return _contents;
     }
 
-    public MessageTransferBody getTransferBody()
-    {
-        return _transferBody;
+    public List<AMQBody> getPayload() {
+        throw new Error("XXX");
     }
 
     public boolean isAllContentReceived()

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=499121&r1=499120&r2=499121
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Jan 23 12:31:10 2007
@@ -208,13 +208,28 @@
     {
         if (msg != null)
         {
-            if (_isBrowser)
-            {
-                sendToBrowser(msg, queue);
-            }
-            else
-            {
-                sendToConsumer(msg, queue);
+            try {
+                if (!_isBrowser && !_acks) {
+                    queue.dequeue(msg);
+                }
+
+                synchronized(channel) {
+                    long deliveryTag = channel.getNextDeliveryTag();
+
+                    if (_acks) {
+                        if (_isBrowser) {
+                            channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
+                        } else {
+                            channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
+                        }
+                    }
+
+                    channel.deliver(msg, consumerTag, deliveryTag);
+                }
+            } finally {
+                if (!_isBrowser) {
+                    msg.setDeliveredToConsumer();
+                }
             }
         }
         else
@@ -223,7 +238,8 @@
         }
     }
 
-    private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+    // XXX
+    /*    private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
     {
         // We don't decrement the reference here as we don't want to consume the message
         // but we do want to send it to the client.
@@ -241,9 +257,7 @@
             ByteBuffer deliver = null;
             if (true) throw new Error("XXX");
             //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
-            AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
-
-            protocolSession.writeFrame(frame);
+            channel.deliver(msg, consumerTag, null);
         }
     }
 
@@ -273,33 +287,25 @@
                     channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
                 }
 
-                // XXX: references
-                MessageTransferBody mtb = msg.getTransferBody().copy();
-                mtb.destination = consumerTag;
-                try {
-                    protocolSession.writeRequest
-                        (channel.getChannelId(),
-                         mtb, new AMQMethodListener() {
-                             public boolean methodReceived(AMQMethodEvent evt) throws AMQException {
-                                 if (_logger.isDebugEnabled()) {
-                                     _logger.debug("Ack received on channel " + evt.getChannelId());
-                                 }
-                                 // XXX: multiple
-                                 channel.acknowledgeMessage(deliveryTag, false);
-                                 return true;
-                             }
-                             public void error(Exception e) {}
-                         });
-                } catch (AMQException e) {
-                    throw new RuntimeException(e);
-                }
+                channel.deliver(msg, consumerTag, new AMQMethodListener() {
+                    public boolean methodReceived(AMQMethodEvent evt) throws AMQException {
+                        if (_logger.isDebugEnabled()) {
+                            _logger.debug("Ack received on channel " + evt.getChannelId());
+                        }
+                        // XXX: reject?
+                        // XXX: multiple
+                        channel.acknowledgeMessage(deliveryTag, false);
+                        return true;
+                    }
+                    public void error(Exception e) {}
+                });
             }
         }
         finally
         {
             msg.setDeliveredToConsumer();
         }
-    }
+        }*/
 
     public boolean isSuspended()
     {

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=499121&r1=499120&r2=499121
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Tue Jan 23 12:31:10 2007
@@ -46,6 +46,7 @@
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersionList;
 import org.apache.qpid.framing.RequestManager;
+import org.apache.qpid.framing.RequestResponseMappingException;
 import org.apache.qpid.framing.ResponseManager;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
@@ -314,25 +315,26 @@
     
     public long writeRequest(int channelNum, AMQMethodBody methodBody,
                              AMQMethodListener methodListener)
-        throws AMQException
     {
         RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelNum);
         if (requestManager == null)
-            throw new AMQException("Unable to find RequestManager for channel " + channelNum);
+            throw new IllegalArgumentException("Unable to find RequestManager for channel " + channelNum);
         return requestManager.sendRequest(methodBody, methodListener);
     }
 
     public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
-        throws AMQException
     {
         ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelNum);
         if (responseManager == null)
-            throw new AMQException("Unable to find ResponseManager for channel " + channelNum);
-        responseManager.sendResponse(requestId, methodBody);
+            throw new IllegalArgumentException("Unable to find ResponseManager for channel " + channelNum);
+        try {
+            responseManager.sendResponse(requestId, methodBody);
+        } catch (RequestResponseMappingException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
-        throws AMQException
     {
         writeResponse(evt.getChannelId(), evt.getRequestId(), response);
     }

Modified: incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java?view=diff&rev=499121&r1=499120&r2=499121
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/cluster/src/main/java/org/apache/qpid/server/cluster/ClusteredProtocolSession.java Tue Jan 23 12:31:10 2007
@@ -61,7 +61,6 @@
     }
 
     public AMQChannel getChannel(int channelId)
-        throws AMQException
     {
         AMQChannel channel = super.getChannel(channelId);
         if (isPeerSession() && channel == null)
@@ -102,21 +101,19 @@
      */
     private class OneUseChannel extends AMQChannel
     {
-        public OneUseChannel(int channelId, AMQProtocolWriter protocolWriter,
+        public OneUseChannel(int channelId, AMQProtocolSession session,
             AMQMethodListener methodListener)
-            throws AMQException
         {
-            this(channelId, ApplicationRegistry.getInstance(), protocolWriter, methodListener);
+            this(channelId, session, ApplicationRegistry.getInstance(), methodListener);
         }
 
-        public OneUseChannel(int channelId, IApplicationRegistry registry,
-            AMQProtocolWriter protocolWriter, AMQMethodListener methodListener)
-            throws AMQException
+        public OneUseChannel(int channelId, AMQProtocolSession session, IApplicationRegistry registry,
+                             AMQMethodListener methodListener)
         {
             super(channelId,
+                  session,
                   registry.getMessageStore(),
                   registry.getExchangeRegistry(),
-                  protocolWriter,
                   methodListener);
         }
 

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java?view=diff&rev=499121&r1=499120&r2=499121
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/Content.java Tue Jan 23 12:31:10 2007
@@ -65,6 +65,7 @@
     	this.contentType = contentType;
         this.content = ByteBuffer.allocate(content.length);
         this.content.put(content);
+        this.content.flip();
     }
     
     public Content(TypeEnum contentType, String contentStr)
@@ -99,7 +100,7 @@
     
     public byte[] getContentAsByteArray()
     {
-        ByteBuffer dup = content.duplicate().rewind();
+        ByteBuffer dup = content.duplicate();
         byte[] ba = new byte[dup.remaining()];
         dup.get(ba);
         return ba;
@@ -123,9 +124,11 @@
     
     public void writePayload(ByteBuffer buffer)
     {
+        System.out.println("Before: " + content);
     	EncodingUtils.writeUnsignedByte(buffer, contentType.toByte());
     	EncodingUtils.writeUnsignedInteger(buffer, content.remaining());
         buffer.put(content);
+        System.out.println("After: " + content);
     }
     
     public void populateFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException
@@ -139,6 +142,6 @@
     
     public synchronized String toString()
     {
-        return getContent().rewind().toString();
+        return getContent().toString();
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java?view=diff&rev=499121&r1=499120&r2=499121
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java Tue Jan 23 12:31:10 2007
@@ -35,12 +35,9 @@
     public void writeFrame(AMQDataBlock frame);
 
     public long writeRequest(int channelNum, AMQMethodBody methodBody,
-                             AMQMethodListener methodListener)
-        throws AMQException;
+                             AMQMethodListener methodListener);
 
-    public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
-        throws AMQException;
+    public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody);
 
-    public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
-        throws AMQException;
+    public void writeResponse(AMQMethodEvent evt, AMQMethodBody response);
 }