You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/01/16 05:44:50 UTC

svn commit: r496586 [2/2] - in /incubator/qpid/branches/qpid.0-9/java: broker/etc/ broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/filter/ broker/src/main/ja...

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Mon Jan 15 20:44:48 2007
@@ -24,7 +24,6 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
 import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.server.configuration.Configurator;
 
 import java.util.ArrayList;
@@ -97,7 +96,8 @@
     private boolean addMessageToQueue(AMQMessage msg)
     {
         // Shrink the ContentBodies to their actual size to save memory.
-        if (compressBufferOnQueue)
+        if (true) throw new Error("XXX");
+        /*if (compressBufferOnQueue)
         {
             Iterator it = msg.getContentBodies().iterator();
             while (it.hasNext())
@@ -105,7 +105,7 @@
                 ContentBody cb = (ContentBody) it.next();
                 cb.reduceBufferToFit();
             }
-        }
+            }*/
 
         _messages.offer(msg);
 

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java Mon Jan 15 20:44:48 2007
@@ -21,9 +21,6 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.BasicPublishBody;
 import org.apache.qpid.protocol.AMQConstant;
 
 import java.util.List;
@@ -35,19 +32,14 @@
  */
 public class NoConsumersException extends RequiredDeliveryException
 {
-    public NoConsumersException(String queue,
-                                BasicPublishBody publishBody,
-                                ContentHeaderBody contentHeaderBody,
-                                List<ContentBody> contentBodies)
+    public NoConsumersException(String queue, AMQMessage message)
     {
-        super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
+        super("Immediate delivery to " + queue + " is not possible.", message);
     }
 
-    public NoConsumersException(BasicPublishBody publishBody,
-                                ContentHeaderBody contentHeaderBody,
-                                List<ContentBody> contentBodies)
+    public NoConsumersException(AMQMessage message)
     {
-        super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies);
+        super("Immediate delivery is not possible.", message);
     }
 
     public int getReplyCode()

Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Mon Jan 15 20:44:48 2007
@@ -28,9 +28,8 @@
 import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicDeliverBody;
 import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.MessageOkBody;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.filter.FilterManager;
 import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -235,7 +234,9 @@
             {
                 channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
             }
-            ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+            ByteBuffer deliver = null;
+            if (true) throw new Error("XXX");
+            //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
             AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
 
             protocolSession.writeFrame(frame);
@@ -268,7 +269,9 @@
                     channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
                 }
 
-                ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+                ByteBuffer deliver = null;
+                if (true) throw new Error("XXX");
+                //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
                 AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
 
                 protocolSession.writeFrame(frame);
@@ -382,10 +385,11 @@
             // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
-            protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
-        		(byte)0, (byte)9,	// AMQP version (major, minor)
+            if (true) throw new Error("XXX");
+            /*protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
+        		(byte)8, (byte)0,	// AMQP version (major, minor)
             	consumerTag	// consumerTag
-                ));
+                ));*/
             _closed = true;
         }
     }
@@ -396,12 +400,12 @@
     }
 
 
-    private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
+    /*    private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
     {
         // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
+        AMQFrame deliverFrame = MessageTransferBody.createAMQFrame(channel.getChannelId(),
         	(byte)0, (byte)9,	// AMQP version (major, minor)
             consumerTag,	// consumerTag
         	deliveryTag,	// deliveryTag
@@ -413,5 +417,5 @@
         deliverFrame.writePayload(buf);
         buf.flip();
         return buf;
-    }
+        }*/
 }

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java Mon Jan 15 20:44:48 2007
@@ -24,6 +24,8 @@
 
 public class AMQRequestBody extends AMQBody
 {
+    public static final byte TYPE = 9;
+
     // Fields declared in specification
     protected long requestId;
     protected long responseMark;
@@ -54,14 +56,14 @@
     
     protected int getSize()
     {
-        return 8 + 8 + 4 + methodPayload.getBodySize();
+        return 8 + 8 + 4 + methodPayload.getSize();
     }
     
     protected void writePayload(ByteBuffer buffer)
     {
         EncodingUtils.writeLong(buffer, requestId);
         EncodingUtils.writeLong(buffer, responseMark);
-        EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0
+        EncodingUtils.writeInteger(buffer, 0); // reserved, set to 0
         methodPayload.writePayload(buffer);
     }
     
@@ -70,7 +72,7 @@
     {
         requestId = EncodingUtils.readLong(buffer);
         responseMark = EncodingUtils.readLong(buffer);
-        int reserved = EncodingUtils.readShort(buffer); // reserved, throw away
+        int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away
         methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
     }
     

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java Mon Jan 15 20:44:48 2007
@@ -24,6 +24,8 @@
 
 public class AMQResponseBody extends AMQBody
 {
+    public static final byte TYPE = 10;
+
     // Fields declared in specification
     protected long responseId;
     protected long requestId;
@@ -54,14 +56,15 @@
     
     protected int getSize()
     {
-        return 8 + 8 + 4 + methodPayload.getBodySize();
+        return 8 + 8 + 4 + methodPayload.getSize();
     }
         
     protected void writePayload(ByteBuffer buffer)
     {
         EncodingUtils.writeLong(buffer, responseId);
         EncodingUtils.writeLong(buffer, requestId);
-        EncodingUtils.writeUnsignedShort(buffer, batchOffset);
+        // XXX
+        EncodingUtils.writeInteger(buffer, batchOffset);
         methodPayload.writePayload(buffer);
     }
     
@@ -70,7 +73,8 @@
     {
         responseId = EncodingUtils.readLong(buffer);
         requestId = EncodingUtils.readLong(buffer);
-        batchOffset = EncodingUtils.readShort(buffer);
+        // XXX
+        batchOffset = EncodingUtils.readInteger(buffer);
         methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
     }
     

Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java Mon Jan 15 20:44:48 2007
@@ -20,9 +20,10 @@
  */
 package org.apache.qpid.protocol;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 
 public interface AMQProtocolWriter
@@ -34,8 +35,12 @@
 	public void writeFrame(AMQDataBlock frame);
     
     public long writeRequest(int channelNum, AMQMethodBody methodBody,
-        AMQMethodListener methodListener) throws RequestResponseMappingException;
+                             AMQMethodListener methodListener)
+        throws AMQException;
 
     public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
-        throws RequestResponseMappingException;
+        throws AMQException;
+
+    public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
+        throws AMQException;
 }