You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/08 20:31:19 UTC
svn commit: r684036 [2/2] - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/handler/
client/src/main/java/org/apache/qpid/client/message/
client/src/main/java/org/apache/qpid/...
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Fri Aug 8 11:31:18 2008
@@ -66,17 +66,6 @@
}
- JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
- {
- this(delegateFactory, data, null);
- }
-
- JMSTextMessage(AMQMessageDelegateFactory delegateFactory, String text) throws JMSException
- {
- super(delegateFactory, (ByteBuffer) null);
- setText(text);
- }
-
public void clearBodyImpl() throws JMSException
{
if (_data != null)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Fri Aug 8 11:31:18 2008
@@ -40,7 +40,7 @@
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
Struct[] contentHeader,
- List bodies)
+ java.nio.ByteBuffer body)
throws JMSException, AMQException;
AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException;
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Fri Aug 8 11:31:18 2008
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
@@ -32,6 +33,8 @@
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.DeliveryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -118,24 +121,30 @@
}
}
- public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
- Struct[] contentHeader, List bodies) throws AMQException, JMSException
+ public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException
{
- MessageProperties mprop = (MessageProperties) contentHeader[0];
+
+ MessageProperties mprop = transfer.getHeader().get(MessageProperties.class);
String messageType = mprop.getContentType();
if (messageType == null)
{
_logger.debug("no message type specified, building a byte message");
messageType = JMSBytesMessage.MIME_TYPE;
}
- MessageFactory mf = _mimeShortStringToFactoryMap.get(new AMQShortString(messageType));
+ MessageFactory mf = _mimeStringToFactoryMap.get(messageType);
if (mf == null)
{
throw new AMQException(null, "Unsupport MIME type of " + messageType, null);
}
else
{
- return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies);
+ boolean redelivered = false;
+ DeliveryProperties deliverProps;
+ if((deliverProps = transfer.getHeader().get(DeliveryProperties.class)) != null)
+ {
+ redelivered = deliverProps.getRedelivered();
+ }
+ return mf.createMessage(transfer.getId(), redelivered, transfer.getHeader().getStructs(), transfer.getBody());
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java Fri Aug 8 11:31:18 2008
@@ -7,9 +7,9 @@
final private AMQShortString _replyText;
final private int _replyCode;
- public ReturnMessage(int channelId,AMQShortString exchange,AMQShortString routingKey,AMQShortString replyText,int replyCode)
+ public ReturnMessage(AMQShortString exchange, AMQShortString routingKey, AMQShortString replyText, int replyCode)
{
- super(channelId,-1,null,exchange,routingKey,false);
+ super(-1,0,exchange,routingKey,false);
_replyText = replyText;
_replyCode = replyCode;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Fri Aug 8 11:31:18 2008
@@ -20,23 +20,7 @@
*/
package org.apache.qpid.client.message;
-import java.util.List;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQChannelException;
-import org.apache.qpid.AMQConnectionException;
-import org.apache.qpid.AMQException;
import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
@@ -46,217 +30,24 @@
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public abstract class UnprocessedMessage<H,B>
+public abstract class UnprocessedMessage
{
- private final int _channelId;
- private final long _deliveryId;
- private final AMQShortString _consumerTag;
- protected AMQShortString _exchange;
- protected AMQShortString _routingKey;
- protected boolean _redelivered;
+ private final int _consumerTag;
+
- public UnprocessedMessage(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+ public UnprocessedMessage(int consumerTag)
{
- _channelId = channelId;
- _deliveryId = deliveryId;
_consumerTag = consumerTag;
- _exchange = exchange;
- _routingKey = routingKey;
- _redelivered = redelivered;
}
- public abstract void receiveBody(B nativeMessageBody);
- public abstract void setContentHeader(H nativeMessageHeader);
+ abstract public long getDeliveryTag();
- public int getChannelId()
- {
- return _channelId;
- }
- public long getDeliveryTag()
- {
- return _deliveryId;
- }
-
- public AMQShortString getConsumerTag()
+ public int getConsumerTag()
{
return _consumerTag;
}
- public AMQShortString getExchange()
- {
- return _exchange;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
- public boolean isRedelivered()
- {
- return _redelivered;
- }
- public abstract List<B> getBodies();
-
- public abstract H getContentHeader();
-
- // specific to 0_10
- public String getReplyToURL()
- {
- return "";
- }
-
- public static final class CloseConsumerMessage extends UnprocessedMessage
- {
- AMQShortString _consumerTag;
-
- public CloseConsumerMessage(int channelId, long deliveryId, AMQShortString consumerTag,
- AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
- {
- super(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
- _consumerTag = consumerTag;
- }
-
- public CloseConsumerMessage(BasicMessageConsumer consumer)
- {
- this(0, 0, consumer.getConsumerTag(), null, null, false);
- }
-
- public BasicDeliverBody getDeliverBody()
- {
- return new BasicDeliverBody()
- {
-
- public AMQShortString getConsumerTag()
- {
- return _consumerTag;
- }
-
- public long getDeliveryTag()
- {
- return 0;
- }
-
- public AMQShortString getExchange()
- {
- return null;
- }
-
- public boolean getRedelivered()
- {
- return false;
- }
-
- public AMQShortString getRoutingKey()
- {
- return null;
- }
-
- public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException
- {
- return false;
- }
-
- public AMQFrame generateFrame(int channelId)
- {
- return null;
- }
-
- public AMQChannelException getChannelException(AMQConstant code, String message)
- {
- return null;
- }
-
- public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
- {
- return null;
- }
-
- public AMQChannelException getChannelNotFoundException(int channelId)
- {
- return null;
- }
-
- public int getClazz()
- {
- return 0;
- }
-
- public AMQConnectionException getConnectionException(AMQConstant code, String message)
- {
- return null;
- }
-
- public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
- {
- return null;
- }
-
- public byte getMajor()
- {
- return 0;
- }
-
- public int getMethod()
- {
- return 0;
- }
-
- public byte getMinor()
- {
- return 0;
- }
-
- public int getSize()
- {
- return 0;
- }
-
- public void writeMethodPayload(ByteBuffer buffer)
- {
- }
-
- public void writePayload(ByteBuffer buffer)
- {
- }
-
- public byte getFrameType()
- {
- return 0;
- }
-
- public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession)
- throws AMQException
- {
-
- }
- };
- }
-
- @Override
- public List getBodies()
- {
- return null;
- }
-
- @Override
- public Object getContentHeader()
- {
- return null;
- }
-
- @Override
- public void receiveBody(Object nativeMessageBody)
- {
-
- }
-
- @Override
- public void setContentHeader(Object nativeMessageHeader)
- {
-
- }
- }
}
\ No newline at end of file
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Fri Aug 8 11:31:18 2008
@@ -20,13 +20,7 @@
*/
package org.apache.qpid.client.message;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.MessageTransfer;
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -35,58 +29,25 @@
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer>
+public class UnprocessedMessage_0_10 extends UnprocessedMessage
{
- private Struct[] _headers;
- private String _replyToURL;
+ private MessageTransfer _transfer;
- /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
- private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
-
- public UnprocessedMessage_0_10(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
- {
- super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
- }
-
- public void receiveBody(ByteBuffer body)
- {
-
- _bodies.add(body);
- }
-
- public void setContentHeader(Struct[] headers)
+ public UnprocessedMessage_0_10(int consumerTag, MessageTransfer xfr)
{
- this._headers = headers;
- for(Struct s: headers)
- {
- if (s instanceof DeliveryProperties)
- {
- DeliveryProperties props = (DeliveryProperties)s;
- _exchange = new AMQShortString(props.getExchange());
- _routingKey = new AMQShortString(props.getRoutingKey());
- _redelivered = props.getRedelivered();
- }
- }
- }
-
- public Struct[] getContentHeader()
- {
- return _headers;
- }
-
- public List<ByteBuffer> getBodies()
- {
- return _bodies;
+ super(consumerTag);
+ _transfer = xfr;
}
// additional 0_10 method
- public String getReplyToURL()
+
+ public long getDeliveryTag()
{
- return _replyToURL;
+ return _transfer.getId();
}
- public void setReplyToURL(String url)
+ public MessageTransfer getMessageTransfer()
{
- _replyToURL = url;
+ return _transfer;
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Fri Aug 8 11:31:18 2008
@@ -26,7 +26,6 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -37,32 +36,54 @@
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public class UnprocessedMessage_0_8 extends UnprocessedMessage<ContentHeaderBody,ContentBody>
+public class UnprocessedMessage_0_8 extends UnprocessedMessage
{
private long _bytesReceived = 0;
+
+ private AMQShortString _exchange;
+ private AMQShortString _routingKey;
+ private final long _deliveryId;
+ protected boolean _redelivered;
+
private BasicDeliverBody _deliverBody;
private ContentHeaderBody _contentHeader;
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
- public UnprocessedMessage_0_8(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+ public UnprocessedMessage_0_8(long deliveryId, int consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
+ {
+ super(consumerTag);
+ _exchange = exchange;
+ _routingKey = routingKey;
+
+ _redelivered = redelivered;
+ _deliveryId = deliveryId;
+ }
+
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public AMQShortString getRoutingKey()
{
- super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
+ return _routingKey;
}
- public UnprocessedMessage_0_8(int channelId, BasicReturnBody body)
+ public long getDeliveryTag()
{
- //FIXME: TGM, SRSLY 4RL
- super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false);
+ return _deliveryId;
}
- public UnprocessedMessage_0_8(int channelId, BasicDeliverBody body)
+ public boolean isRedelivered()
{
- super(channelId, body.getDeliveryTag(), body.getConsumerTag(), body.getExchange(), body.getRoutingKey(), false);
+ return _redelivered;
}
+
public void receiveBody(ContentBody body)
{
@@ -124,7 +145,7 @@
public String toString()
{
StringBuilder buf = new StringBuilder();
- buf.append("Channel Id : " + this.getChannelId());
+
if (_contentHeader != null)
{
buf.append("ContentHeader " + _contentHeader);
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Aug 8 11:31:18 2008
@@ -224,9 +224,8 @@
*
* @throws AMQException if this was not expected
*/
- public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
- {
- final int channelId = message.getChannelId();
+ public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
+ {
if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
_channelId2UnprocessedMsgArray[channelId] = message;
@@ -239,8 +238,8 @@
public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
{
- final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
- : _channelId2UnprocessedMsgMap.get(channelId);
+ final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+ : _channelId2UnprocessedMsgMap.get(channelId));
if (msg == null)
{
@@ -290,15 +289,7 @@
throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
}
- /*try
- {*/
msg.receiveBody(contentBody);
- /*}
- catch (UnexpectedBodyReceivedException e)
- {
- _channelId2UnprocessedMsgMap.remove(channelId);
- throw e;
- }*/
if (msg.isAllBodyDataReceived())
{
@@ -478,7 +469,7 @@
{
final AMQSession session = getSession(channelId);
- session.confirmConsumerCancelled(consumerTag);
+ session.confirmConsumerCancelled(consumerTag.toIntValue());
}
public void setProtocolVersion(final ProtocolVersion pv)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Fri Aug 8 11:31:18 2008
@@ -21,8 +21,10 @@
package org.apache.qpid.client.util;
import java.util.Iterator;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
/**
* A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
@@ -35,7 +37,7 @@
public class FlowControllingBlockingQueue
{
/** This queue is bounded and is used to store messages before being dispatched to the consumer */
- private final BlockingQueue _queue = new LinkedBlockingQueue();
+ private final Queue _queue = new ConcurrentLinkedQueue();
private final int _flowControlHighThreshold;
private final int _flowControlLowThreshold;
@@ -71,7 +73,17 @@
public Object take() throws InterruptedException
{
- Object o = _queue.take();
+ Object o = _queue.poll();
+ if(o == null)
+ {
+ synchronized(this)
+ {
+ while((o = _queue.poll())==null)
+ {
+ wait();
+ }
+ }
+ }
if (_listener != null)
{
synchronized (_listener)
@@ -88,7 +100,12 @@
public void add(Object o)
{
- _queue.add(o);
+ synchronized(this)
+ {
+ _queue.add(o);
+
+ notifyAll();
+ }
if (_listener != null)
{
synchronized (_listener)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java Fri Aug 8 11:31:18 2008
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageTransfer;
/**
* Assembles message parts.
@@ -33,31 +34,13 @@
* are transferred.
*/
public interface MessagePartListener
-{
- /**
- * Indicates the Message transfer has started.
- *
- * @param transferId The message transfer ID.
- */
- public void messageTransfer(int transferId);
-
- /**
- * Add the following a header to the message being received.
- *
- * @param header Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
- */
- public void messageHeader(Header header);
+{
/**
- * Add the following byte array to the content of the message being received
+ * Inform the listener of the message transfer
*
- * @param src Data to be added or streamed.
- */
- public void data(ByteBuffer src);
-
- /**
- * Indicates that the message has been fully received.
+ * @param xfr the message transfer object
*/
- public void messageReceived();
+ public void messageTransfer(MessageTransfer xfr);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java Fri Aug 8 11:31:18 2008
@@ -26,15 +26,7 @@
{
MessagePartListener listener = ((ClientSession)session).getMessageListeners()
.get(xfr.getDestination());
- listener.messageTransfer(xfr.getId());
- listener.messageHeader(xfr.getHeader());
- ByteBuffer body = xfr.getBody();
- if (body == null)
- {
- body = ByteBuffer.allocate(0);
- }
- listener.data(body);
- listener.messageReceived();
+ listener.messageTransfer(xfr);
}
@Override public void messageReject(Session session, MessageReject struct)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java Fri Aug 8 11:31:18 2008
@@ -2,8 +2,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.*;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
@@ -22,7 +21,7 @@
*/
public class ByteBufferMessage implements Message
{
- private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
+ private List<ByteBuffer> _data;// = new ArrayList<ByteBuffer>();
private ByteBuffer _readBuffer;
private int _dataSize;
private DeliveryProperties _currentDeliveryProps;
@@ -76,7 +75,18 @@
*/
public void appendData(ByteBuffer src) throws IOException
{
- _data.offer(src);
+ if(_data == null)
+ {
+ _data = Collections.singletonList(src);
+ }
+ else
+ {
+ if(_data.size() == 1)
+ {
+ _data = new ArrayList<ByteBuffer>(_data);
+ }
+ _data.add(src);
+ }
_dataSize += src.remaining();
}
@@ -100,12 +110,12 @@
_currentMessageProps = props;
}
- public void readData(byte[] target) throws IOException
+ public void readData(byte[] target)
{
getReadBuffer().get(target);
}
- public ByteBuffer readData() throws IOException
+ public ByteBuffer readData()
{
return getReadBuffer();
}
@@ -115,7 +125,7 @@
//optimize for the simple cases
if(_data.size() == 1)
{
- _readBuffer = _data.element().duplicate();
+ _readBuffer = _data.get(0).duplicate();
}
else
{
@@ -128,7 +138,7 @@
}
}
- private ByteBuffer getReadBuffer() throws IOException
+ private ByteBuffer getReadBuffer()
{
if (_readBuffer != null )
{
@@ -143,7 +153,7 @@
}
else
{
- throw new IOException("No Data to read");
+ return ByteBuffer.allocate(0);
}
}
}
@@ -151,16 +161,9 @@
//hack for testing
@Override public String toString()
{
- try
- {
- ByteBuffer temp = getReadBuffer();
- byte[] b = new byte[temp.remaining()];
- temp.get(b);
- return new String(b);
- }
- catch(IOException e)
- {
- return "No data";
- }
+ ByteBuffer temp = getReadBuffer();
+ byte[] b = new byte[temp.remaining()];
+ temp.get(b);
+ return new String(b);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java Fri Aug 8 11:31:18 2008
@@ -3,9 +3,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.*;
import org.apache.qpid.nclient.MessagePartListener;
/**
@@ -26,16 +24,35 @@
_adaptee = listener;
}
- public void messageTransfer(int transferId)
+ public void messageTransfer(MessageTransfer xfr)
{
- _currentMsg = new ByteBufferMessage(transferId);
- }
+ _currentMsg = new ByteBufferMessage(xfr.getId());
+
+ for (Struct st : xfr.getHeader().getStructs())
+ {
+ if(st instanceof DeliveryProperties)
+ {
+ _currentMsg.setDeliveryProperties((DeliveryProperties)st);
+
+ }
+ else if(st instanceof MessageProperties)
+ {
+ _currentMsg.setMessageProperties((MessageProperties)st);
+ }
+
+ }
+
+
+ ByteBuffer body = xfr.getBody();
+ if (body == null)
+ {
+ body = ByteBuffer.allocate(0);
+ }
+
- public void data(ByteBuffer src)
- {
try
{
- _currentMsg.appendData(src);
+ _currentMsg.appendData(body);
}
catch(IOException e)
{
@@ -43,16 +60,7 @@
// doesn't occur as we are using
// a ByteBuffer
}
- }
- public void messageHeader(Header header)
- {
- _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class));
- _currentMsg.setMessageProperties(header.get(MessageProperties.class));
- }
-
- public void messageReceived()
- {
_adaptee.onMessage(_currentMsg);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Fri Aug 8 11:31:18 2008
@@ -31,7 +31,7 @@
import javax.jms.*;
import java.util.Map;
-public class TestAMQSession extends AMQSession
+public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
public TestAMQSession()
@@ -94,7 +94,7 @@
}
- public BasicMessageConsumer createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable arguments, boolean noConsume, boolean autoClose) throws JMSException
+ public BasicMessageConsumer_0_8 createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable arguments, boolean noConsume, boolean autoClose) throws JMSException
{
return null;
}
@@ -109,12 +109,12 @@
return false;
}
- public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+ public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException
{
}
- public BasicMessageProducer createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
+ public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
{
return null;
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Fri Aug 8 11:31:18 2008
@@ -111,6 +111,8 @@
private final byte[] _data;
private final int _offset;
private int _hashCode;
+ private String _asString = null;
+
private final int _length;
private static final char[] EMPTY_CHAR_ARRAY = new char[0];
@@ -137,7 +139,7 @@
public AMQShortString(String data)
{
this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
-
+ _asString = data;
}
public AMQShortString(char[] data)
@@ -418,15 +420,14 @@
return chars;
}
- private String str = null;
public String asString()
{
- if (str == null)
+ if (_asString == null)
{
- str = new String(asChars());
+ _asString = new String(asChars());
}
- return str;
+ return _asString;
}
public boolean equals(Object o)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java Fri Aug 8 11:31:18 2008
@@ -24,6 +24,8 @@
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.LinkedHashMap;
import java.nio.ByteBuffer;
@@ -35,30 +37,31 @@
public class Header {
- private final List<Struct> structs;
+ private final Struct[] structs;
public Header(List<Struct> structs)
{
- this.structs = structs;
+ this(structs.toArray(new Struct[structs.size()]));
}
public Header(Struct ... structs)
{
- this(Arrays.asList(structs));
+ this.structs = structs;
}
- public List<Struct> getStructs()
+ public Struct[] getStructs()
{
return structs;
}
+
public <T> T get(Class<T> klass)
{
for (Struct st : structs)
{
if (klass.isInstance(st))
{
- return klass.cast(st);
+ return (T) st;
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Fri Aug 8 11:31:18 2008
@@ -128,8 +128,14 @@
{
int id = nextCommandId();
cmd.setId(id);
- log.debug("ID: [%s] %s", this.channel, id);
- if ((id % 65536) == 0)
+
+ if(log.isDebugEnabled())
+ {
+ log.debug("ID: [%s] %s", this.channel, id);
+ }
+
+ //if ((id % 65536) == 0)
+ if ((id & 0xff) == 0)
{
flushProcessed(TIMELY_REPLY);
}
@@ -232,7 +238,11 @@
boolean complete(int lower, int upper)
{
- log.debug("%s complete(%d, %d)", this, lower, upper);
+ //avoid autoboxing
+ if(log.isDebugEnabled())
+ {
+ log.debug("%s complete(%d, %d)", this, lower, upper);
+ }
synchronized (commands)
{
int old = maxComplete;
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Fri Aug 8 11:31:18 2008
@@ -200,7 +200,7 @@
break;
case HEADER:
command = incomplete[channel];
- List<Struct> structs = new ArrayList();
+ List<Struct> structs = new ArrayList(2);
while (dec.hasRemaining())
{
structs.add(dec.readStruct32());
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Fri Aug 8 11:31:18 2008
@@ -34,7 +34,6 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.util.List;
import static org.apache.qpid.transport.network.Frame.*;
@@ -209,11 +208,11 @@
if (payload)
{
final Header hdr = method.getHeader();
- final List<Struct> structs = hdr.getStructs();
- final int nstructs = structs.size();
- for (int i = 0; i < nstructs; i++)
+ final Struct[] structs = hdr.getStructs();
+
+ for (Struct st : structs)
{
- enc.writeStruct32(structs.get(i));
+ enc.writeStruct32(st);
}
headerSeg = enc.segment();
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java Fri Aug 8 11:31:18 2008
@@ -42,6 +42,11 @@
this.log = log;
}
+ public boolean isDebugEnabled()
+ {
+ return log.isDebugEnabled();
+ }
+
public void debug(String message, Object ... args)
{
if (log.isDebugEnabled())