You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/01/16 05:44:50 UTC
svn commit: r496586 [2/2] - in /incubator/qpid/branches/qpid.0-9/java:
broker/etc/ broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apache/qpid/server/filter/ broker/src/main/ja...
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Mon Jan 15 20:44:48 2007
@@ -24,7 +24,6 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.configuration.Configured;
-import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.configuration.Configurator;
import java.util.ArrayList;
@@ -97,7 +96,8 @@
private boolean addMessageToQueue(AMQMessage msg)
{
// Shrink the ContentBodies to their actual size to save memory.
- if (compressBufferOnQueue)
+ if (true) throw new Error("XXX");
+ /*if (compressBufferOnQueue)
{
Iterator it = msg.getContentBodies().iterator();
while (it.hasNext())
@@ -105,7 +105,7 @@
ContentBody cb = (ContentBody) it.next();
cb.reduceBufferToFit();
}
- }
+ }*/
_messages.offer(msg);
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java Mon Jan 15 20:44:48 2007
@@ -21,9 +21,6 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.protocol.AMQConstant;
import java.util.List;
@@ -35,19 +32,14 @@
*/
public class NoConsumersException extends RequiredDeliveryException
{
- public NoConsumersException(String queue,
- BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
+ public NoConsumersException(String queue, AMQMessage message)
{
- super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
+ super("Immediate delivery to " + queue + " is not possible.", message);
}
- public NoConsumersException(BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
+ public NoConsumersException(AMQMessage message)
{
- super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies);
+ super("Immediate delivery is not possible.", message);
}
public int getReplyCode()
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Mon Jan 15 20:44:48 2007
@@ -28,9 +28,8 @@
import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -235,7 +234,9 @@
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ ByteBuffer deliver = null;
+ if (true) throw new Error("XXX");
+ //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
protocolSession.writeFrame(frame);
@@ -268,7 +269,9 @@
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
+ ByteBuffer deliver = null;
+ if (true) throw new Error("XXX");
+ //createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
protocolSession.writeFrame(frame);
@@ -382,10 +385,11 @@
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
- (byte)0, (byte)9, // AMQP version (major, minor)
+ if (true) throw new Error("XXX");
+ /*protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
consumerTag // consumerTag
- ));
+ ));*/
_closed = true;
}
}
@@ -396,12 +400,12 @@
}
- private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
+ /* private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
// AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
// TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
// Be aware of possible changes to parameter order as versions change.
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
+ AMQFrame deliverFrame = MessageTransferBody.createAMQFrame(channel.getChannelId(),
(byte)0, (byte)9, // AMQP version (major, minor)
consumerTag, // consumerTag
deliveryTag, // deliveryTag
@@ -413,5 +417,5 @@
deliverFrame.writePayload(buf);
buf.flip();
return buf;
- }
+ }*/
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java Mon Jan 15 20:44:48 2007
@@ -24,6 +24,8 @@
public class AMQRequestBody extends AMQBody
{
+ public static final byte TYPE = 9;
+
// Fields declared in specification
protected long requestId;
protected long responseMark;
@@ -54,14 +56,14 @@
protected int getSize()
{
- return 8 + 8 + 4 + methodPayload.getBodySize();
+ return 8 + 8 + 4 + methodPayload.getSize();
}
protected void writePayload(ByteBuffer buffer)
{
EncodingUtils.writeLong(buffer, requestId);
EncodingUtils.writeLong(buffer, responseMark);
- EncodingUtils.writeUnsignedShort(buffer, 0); // reserved, set to 0
+ EncodingUtils.writeInteger(buffer, 0); // reserved, set to 0
methodPayload.writePayload(buffer);
}
@@ -70,7 +72,7 @@
{
requestId = EncodingUtils.readLong(buffer);
responseMark = EncodingUtils.readLong(buffer);
- int reserved = EncodingUtils.readShort(buffer); // reserved, throw away
+ int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away
methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java Mon Jan 15 20:44:48 2007
@@ -24,6 +24,8 @@
public class AMQResponseBody extends AMQBody
{
+ public static final byte TYPE = 10;
+
// Fields declared in specification
protected long responseId;
protected long requestId;
@@ -54,14 +56,15 @@
protected int getSize()
{
- return 8 + 8 + 4 + methodPayload.getBodySize();
+ return 8 + 8 + 4 + methodPayload.getSize();
}
protected void writePayload(ByteBuffer buffer)
{
EncodingUtils.writeLong(buffer, responseId);
EncodingUtils.writeLong(buffer, requestId);
- EncodingUtils.writeUnsignedShort(buffer, batchOffset);
+ // XXX
+ EncodingUtils.writeInteger(buffer, batchOffset);
methodPayload.writePayload(buffer);
}
@@ -70,7 +73,8 @@
{
responseId = EncodingUtils.readLong(buffer);
requestId = EncodingUtils.readLong(buffer);
- batchOffset = EncodingUtils.readShort(buffer);
+ // XXX
+ batchOffset = EncodingUtils.readInteger(buffer);
methodPayload.populateFromBuffer(buffer, size - 8 - 8 - 4);
}
Modified: incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java?view=diff&rev=496586&r1=496585&r2=496586
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/common/src/main/java/org/apache/qpid/protocol/AMQProtocolWriter.java Mon Jan 15 20:44:48 2007
@@ -20,9 +20,10 @@
*/
package org.apache.qpid.protocol;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
public interface AMQProtocolWriter
@@ -34,8 +35,12 @@
public void writeFrame(AMQDataBlock frame);
public long writeRequest(int channelNum, AMQMethodBody methodBody,
- AMQMethodListener methodListener) throws RequestResponseMappingException;
+ AMQMethodListener methodListener)
+ throws AMQException;
public void writeResponse(int channelNum, long requestId, AMQMethodBody methodBody)
- throws RequestResponseMappingException;
+ throws AMQException;
+
+ public void writeResponse(AMQMethodEvent evt, AMQMethodBody response)
+ throws AMQException;
}