You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC
svn commit: r686136 [14/17] - in /incubator/qpid/branches/qpid.0-10/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/bin/ broker/etc/ broker...
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Thu Aug 14 20:40:49 2008
@@ -26,7 +26,6 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
/**
@@ -35,7 +34,7 @@
public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage
{
public static final String MIME_TYPE="jms/stream-message";
- private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
/**
@@ -44,38 +43,40 @@
*/
private int _byteArrayRemaining = -1;
- public JMSStreamMessage()
+ public JMSStreamMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(null);
+ this(delegateFactory,null);
+
}
/**
* Construct a stream message with existing data.
*
+ * @param delegateFactory
* @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- * set to auto expand
*/
- JMSStreamMessage(ByteBuffer data)
+ JMSStreamMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
{
- super(data); // this instanties a content header
- }
+ super(delegateFactory, data); // this instanties a content header
+ }
- JMSStreamMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data) throws AMQException
+ JMSStreamMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(messageNbr, contentHeader, exchange, routingKey, data);
+
+ super(delegate, data);
}
+
public void reset()
{
super.reset();
_readableMessage = true;
}
- public AMQShortString getMimeTypeAsShortString()
+ protected String getMimeType()
{
- return MIME_TYPE_SHORT_STRING;
+ return MIME_TYPE;
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -25,19 +25,16 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSStreamMessageFactory extends AbstractJMSMessageFactory
{
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
- AMQShortString exchange, AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- return new JMSStreamMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+ return new JMSStreamMessage(delegate, data);
}
- public AbstractJMSMessage createMessage() throws JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- return new JMSStreamMessage();
+ return new JMSStreamMessage(delegateFactory);
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Thu Aug 14 20:40:49 2008
@@ -31,60 +31,49 @@
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.util.Strings;
public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
{
private static final String MIME_TYPE = "text/plain";
- private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
-
private String _decodedValue;
/**
* This constant represents the name of a property that is set when the message payload is null.
*/
- private static final AMQShortString PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.getShortStringName();
+ private static final String PAYLOAD_NULL_PROPERTY = CustomJMSXProperty.JMS_AMQP_NULL.toString();
private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
- public JMSTextMessage() throws JMSException
+ public JMSTextMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- this(null, null);
+ this(delegateFactory, null, null);
}
- JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
+ JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data, String encoding) throws JMSException
{
- super(data); // this instantiates a content header
- getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
- getContentHeaderProperties().setEncoding(encoding);
+ super(delegateFactory, data); // this instantiates a content header
+ setContentType(getMimeType());
+ setEncoding(encoding);
}
- JMSTextMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
- AMQShortString routingKey, ByteBuffer data)
+ JMSTextMessage(AMQMessageDelegate delegate, ByteBuffer data)
throws AMQException
{
- super(deliveryTag, contentHeader, exchange, routingKey, data);
- contentHeader.setContentType(MIME_TYPE_SHORT_STRING);
+ super(delegate, data);
+ setContentType(getMimeType());
_data = data;
}
- JMSTextMessage(ByteBuffer data) throws JMSException
- {
- this(data, null);
- }
-
- JMSTextMessage(String text) throws JMSException
- {
- super((ByteBuffer) null);
- setText(text);
- }
public void clearBodyImpl() throws JMSException
{
if (_data != null)
{
_data.release();
+ _data = null;
}
- _data = null;
+
_decodedValue = null;
}
@@ -93,14 +82,9 @@
return getText();
}
- public void setData(ByteBuffer data)
+ protected String getMimeType()
{
- _data = data;
- }
-
- public AMQShortString getMimeTypeAsShortString()
- {
- return MIME_TYPE_SHORT_STRING;
+ return MIME_TYPE;
}
public void setText(String text) throws JMSException
@@ -111,20 +95,17 @@
try
{
if (text != null)
- {
- _data = ByteBuffer.allocate(text.length());
- _data.limit(text.length()) ;
- //_data.sweep();
- _data.setAutoExpand(true);
- final String encoding = getContentHeaderProperties().getEncodingAsString();
- if (encoding == null)
+ {
+ final String encoding = getEncoding();
+ if (encoding == null || encoding.equalsIgnoreCase("UTF-8"))
{
- _data.put(text.getBytes(DEFAULT_CHARSET.name()));
+ _data = ByteBuffer.wrap(Strings.toUTF8(text));
}
else
{
- _data.put(text.getBytes(encoding));
+ _data = ByteBuffer.wrap(text.getBytes(encoding));
}
+ _data.position(_data.limit());
_changedData=true;
}
_decodedValue = text;
@@ -156,11 +137,11 @@
{
return null;
}
- if (getContentHeaderProperties().getEncodingAsString() != null)
+ if (getEncoding() != null)
{
try
{
- _decodedValue = _data.getString(Charset.forName(getContentHeaderProperties().getEncodingAsString()).newDecoder());
+ _decodedValue = _data.getString(Charset.forName(getEncoding()).newDecoder());
}
catch (CharacterCodingException e)
{
@@ -199,4 +180,6 @@
removeProperty(PAYLOAD_NULL_PROPERTY);
}
}
+
+
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -26,21 +26,17 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
public class JMSTextMessageFactory extends AbstractJMSMessageFactory
{
- public AbstractJMSMessage createMessage() throws JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
- return new JMSTextMessage();
+ return new JMSTextMessage(delegateFactory);
}
- protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
- AMQShortString exchange, AMQShortString routingKey,
- BasicContentHeaderProperties contentHeader) throws AMQException
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- return new JMSTextMessage(deliveryTag, contentHeader,
- exchange, routingKey, data);
+ return new JMSTextMessage(delegate, data);
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageConverter.java Thu Aug 14 20:40:49 2008
@@ -22,15 +22,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.client.AMQSession;
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageEOFException;
-import javax.jms.ObjectMessage;
-import javax.jms.StreamMessage;
-import javax.jms.TextMessage;
+import javax.jms.*;
import java.util.Enumeration;
@@ -52,12 +46,11 @@
_newMessage = message;
}
- public MessageConverter(BytesMessage message) throws JMSException
+ public MessageConverter(AMQSession session, BytesMessage bytesMessage) throws JMSException
{
- BytesMessage bytesMessage = (BytesMessage) message;
bytesMessage.reset();
- JMSBytesMessage nativeMsg = new JMSBytesMessage();
+ JMSBytesMessage nativeMsg = (JMSBytesMessage) session.createBytesMessage();
byte[] buf = new byte[1024];
@@ -69,12 +62,12 @@
}
_newMessage = nativeMsg;
- setMessageProperties(message);
+ setMessageProperties(bytesMessage);
}
- public MessageConverter(MapMessage message) throws JMSException
+ public MessageConverter(AMQSession session, MapMessage message) throws JMSException
{
- MapMessage nativeMessage = new JMSMapMessage();
+ MapMessage nativeMessage = session.createMapMessage();
Enumeration mapNames = message.getMapNames();
while (mapNames.hasMoreElements())
@@ -87,21 +80,21 @@
setMessageProperties(message);
}
- public MessageConverter(ObjectMessage message) throws JMSException
+ public MessageConverter(AMQSession session, ObjectMessage origMessage) throws JMSException
{
- ObjectMessage origMessage = (ObjectMessage) message;
- ObjectMessage nativeMessage = new JMSObjectMessage();
+
+ ObjectMessage nativeMessage = session.createObjectMessage();
nativeMessage.setObject(origMessage.getObject());
_newMessage = (AbstractJMSMessage) nativeMessage;
- setMessageProperties(message);
+ setMessageProperties(origMessage);
}
- public MessageConverter(TextMessage message) throws JMSException
+ public MessageConverter(AMQSession session, TextMessage message) throws JMSException
{
- TextMessage nativeMessage = new JMSTextMessage();
+ TextMessage nativeMessage = session.createTextMessage();
nativeMessage.setText(message.getText());
@@ -109,9 +102,9 @@
setMessageProperties(message);
}
- public MessageConverter(StreamMessage message) throws JMSException
+ public MessageConverter(AMQSession session, StreamMessage message) throws JMSException
{
- StreamMessage nativeMessage = new JMSStreamMessage();
+ StreamMessage nativeMessage = session.createStreamMessage();
try
{
@@ -130,11 +123,11 @@
setMessageProperties(message);
}
- public MessageConverter(Message message) throws JMSException
+ public MessageConverter(AMQSession session, Message message) throws JMSException
{
// Send a message with just properties.
// Throwing away content
- BytesMessage nativeMessage = new JMSBytesMessage();
+ Message nativeMessage = session.createMessage();
_newMessage = (AbstractJMSMessage) nativeMessage;
setMessageProperties(message);
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Thu Aug 14 20:40:49 2008
@@ -27,7 +27,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpidity.transport.Struct;
+import org.apache.qpid.transport.Struct;
public interface MessageFactory
@@ -39,10 +39,9 @@
throws JMSException, AMQException;
AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
- Struct[] contentHeader,
- AMQShortString exchange, AMQShortString routingKey,
- List bodies, String replyToURL)
+ Struct[] contentHeader,
+ java.nio.ByteBuffer body)
throws JMSException, AMQException;
- AbstractJMSMessage createMessage() throws JMSException;
+ AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException;
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Thu Aug 14 20:40:49 2008
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
@@ -30,9 +31,10 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpidity.transport.Struct;
-import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.DeliveryProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -92,8 +94,7 @@
* @param deliveryTag the AMQ message id
* @param redelivered true if redelivered
* @param contentHeader the content header that was received
- * @param bodies a list of ContentBody instances
- * @return the message.
+ * @param bodies a list of ContentBody instances @return the message.
* @throws AMQException
* @throws JMSException
*/
@@ -120,30 +121,35 @@
}
}
- public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered, AMQShortString exchange,
- AMQShortString routingKey, Struct[] contentHeader, List bodies,
- String replyTo) throws AMQException, JMSException
+ public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException
{
- MessageProperties mprop = (MessageProperties) contentHeader[0];
+
+ MessageProperties mprop = transfer.getHeader().get(MessageProperties.class);
String messageType = mprop.getContentType();
if (messageType == null)
{
_logger.debug("no message type specified, building a byte message");
messageType = JMSBytesMessage.MIME_TYPE;
}
- MessageFactory mf = _mimeShortStringToFactoryMap.get(new AMQShortString(messageType));
+ MessageFactory mf = _mimeStringToFactoryMap.get(messageType);
if (mf == null)
{
throw new AMQException(null, "Unsupport MIME type of " + messageType, null);
}
else
{
- return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, replyTo);
+ boolean redelivered = false;
+ DeliveryProperties deliverProps;
+ if((deliverProps = transfer.getHeader().get(DeliveryProperties.class)) != null)
+ {
+ redelivered = deliverProps.getRedelivered();
+ }
+ return mf.createMessage(transfer.getId(), redelivered, transfer.getHeader().getStructs(), transfer.getBody());
}
}
- public AbstractJMSMessage createMessage(String mimeType) throws AMQException, JMSException
+ public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory, String mimeType) throws AMQException, JMSException
{
if (mimeType == null)
{
@@ -157,7 +163,7 @@
}
else
{
- return mf.createMessage();
+ return mf.createMessage(delegateFactory);
}
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java Thu Aug 14 20:40:49 2008
@@ -7,9 +7,9 @@
final private AMQShortString _replyText;
final private int _replyCode;
- public ReturnMessage(int channelId,AMQShortString exchange,AMQShortString routingKey,AMQShortString replyText,int replyCode)
+ public ReturnMessage(AMQShortString exchange, AMQShortString routingKey, AMQShortString replyText, int replyCode)
{
- super(channelId,-1,null,exchange,routingKey,false);
+ super(-1,0,exchange,routingKey,false);
_replyText = replyText;
_replyCode = replyCode;
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Thu Aug 14 20:40:49 2008
@@ -20,23 +20,7 @@
*/
package org.apache.qpid.client.message;
-import java.util.List;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQChannelException;
-import org.apache.qpid.AMQConnectionException;
-import org.apache.qpid.AMQException;
import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
@@ -46,65 +30,24 @@
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public abstract class UnprocessedMessage<H,B>
+public abstract class UnprocessedMessage
{
- private final int _channelId;
- private final long _deliveryId;
- private final AMQShortString _consumerTag;
- protected AMQShortString _exchange;
- protected AMQShortString _routingKey;
- protected boolean _redelivered;
+ private final int _consumerTag;
+
- public UnprocessedMessage(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+ public UnprocessedMessage(int consumerTag)
{
- _channelId = channelId;
- _deliveryId = deliveryId;
_consumerTag = consumerTag;
- _exchange = exchange;
- _routingKey = routingKey;
- _redelivered = redelivered;
}
- public abstract void receiveBody(B nativeMessageBody);
- public abstract void setContentHeader(H nativeMessageHeader);
-
- public int getChannelId()
- {
- return _channelId;
- }
+ abstract public long getDeliveryTag();
- public long getDeliveryTag()
- {
- return _deliveryId;
- }
- public AMQShortString getConsumerTag()
+ public int getConsumerTag()
{
return _consumerTag;
}
- public AMQShortString getExchange()
- {
- return _exchange;
- }
-
- public AMQShortString getRoutingKey()
- {
- return _routingKey;
- }
- public boolean isRedelivered()
- {
- return _redelivered;
- }
- public abstract List<B> getBodies();
-
- public abstract H getContentHeader();
-
- // specific to 0_10
- public String getReplyToURL()
- {
- return "";
- }
-}
+}
\ No newline at end of file
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Thu Aug 14 20:40:49 2008
@@ -20,13 +20,7 @@
*/
package org.apache.qpid.client.message;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpidity.transport.DeliveryProperties;
-import org.apache.qpidity.transport.Struct;
+import org.apache.qpid.transport.MessageTransfer;
/**
* This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -35,58 +29,25 @@
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer>
+public class UnprocessedMessage_0_10 extends UnprocessedMessage
{
- private Struct[] _headers;
- private String _replyToURL;
+ private MessageTransfer _transfer;
- /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
- private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
-
- public UnprocessedMessage_0_10(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
- {
- super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
- }
-
- public void receiveBody(ByteBuffer body)
- {
-
- _bodies.add(body);
- }
-
- public void setContentHeader(Struct[] headers)
+ public UnprocessedMessage_0_10(int consumerTag, MessageTransfer xfr)
{
- this._headers = headers;
- for(Struct s: headers)
- {
- if (s instanceof DeliveryProperties)
- {
- DeliveryProperties props = (DeliveryProperties)s;
- _exchange = new AMQShortString(props.getExchange());
- _routingKey = new AMQShortString(props.getRoutingKey());
- _redelivered = props.getRedelivered();
- }
- }
- }
-
- public Struct[] getContentHeader()
- {
- return _headers;
- }
-
- public List<ByteBuffer> getBodies()
- {
- return _bodies;
+ super(consumerTag);
+ _transfer = xfr;
}
// additional 0_10 method
- public String getReplyToURL()
+
+ public long getDeliveryTag()
{
- return _replyToURL;
+ return _transfer.getId();
}
- public void setReplyToURL(String url)
+ public MessageTransfer getMessageTransfer()
{
- _replyToURL = url;
+ return _transfer;
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Thu Aug 14 20:40:49 2008
@@ -26,7 +26,6 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -37,32 +36,54 @@
* Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
* thread in order to minimise the amount of work done in the MINA dispatcher thread.
*/
-public class UnprocessedMessage_0_8 extends UnprocessedMessage<ContentHeaderBody,ContentBody>
+public class UnprocessedMessage_0_8 extends UnprocessedMessage
{
private long _bytesReceived = 0;
+
+ private AMQShortString _exchange;
+ private AMQShortString _routingKey;
+ private final long _deliveryId;
+ protected boolean _redelivered;
+
private BasicDeliverBody _deliverBody;
private ContentHeaderBody _contentHeader;
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
- public UnprocessedMessage_0_8(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+ public UnprocessedMessage_0_8(long deliveryId, int consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
+ {
+ super(consumerTag);
+ _exchange = exchange;
+ _routingKey = routingKey;
+
+ _redelivered = redelivered;
+ _deliveryId = deliveryId;
+ }
+
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public AMQShortString getRoutingKey()
{
- super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
+ return _routingKey;
}
- public UnprocessedMessage_0_8(int channelId, BasicReturnBody body)
+ public long getDeliveryTag()
{
- //FIXME: TGM, SRSLY 4RL
- super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false);
+ return _deliveryId;
}
- public UnprocessedMessage_0_8(int channelId, BasicDeliverBody body)
+ public boolean isRedelivered()
{
- super(channelId, body.getDeliveryTag(), body.getConsumerTag(), body.getExchange(), body.getRoutingKey(), false);
+ return _redelivered;
}
+
public void receiveBody(ContentBody body)
{
@@ -124,7 +145,7 @@
public String toString()
{
StringBuilder buf = new StringBuilder();
- buf.append("Channel Id : " + this.getChannelId());
+
if (_contentHeader != null)
{
buf.append("ContentHeader " + _contentHeader);
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Aug 14 20:40:49 2008
@@ -43,14 +43,17 @@
import org.apache.qpid.client.failover.FailoverState;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
import org.apache.qpid.framing.*;
+import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.ssl.SSLContextFactory;
+import org.apache.qpid.transport.network.io.IoTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -100,28 +103,29 @@
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Create the filter chain to filter this handlers events.
- * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
+ * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
*
* <tr><td> Maintain fail-over state.
* <tr><td>
* </table>
*
* @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
- * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
- * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
- * filter before it mean not doing the read/write asynchronously but in the main filter thread?
- *
+ * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
+ * filter before it mean not doing the read/write asynchronously but in the main filter thread?
* @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
- * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
- * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
- * that lifecycles of the fields match lifecycles of their containing objects.
+ * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
+ * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
+ * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ * that lifecycles of the fields match lifecycles of their containing objects.
*/
public class AMQProtocolHandler extends IoHandlerAdapter
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQProtocolHandler.class);
+ private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol");
+ private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null);
/**
* The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
@@ -136,7 +140,7 @@
private AMQStateManager _stateManager = new AMQStateManager();
/** Holds the method listeners, */
- private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
+ private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
/**
* We create the failover handler when the session is created since it needs a reference to the IoSession in order
@@ -154,14 +158,12 @@
/** Used to provide a condition to wait upon for operations that are required to wait for failover to complete. */
private CountDownLatch _failoverLatch;
-
/** The last failover exception that occured */
private FailoverException _lastFailoverException;
/** Defines the default timeout to use for synchronous protocol commands. */
private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
-
/**
* Creates a new protocol handler, associated with the specified client connection instance.
*
@@ -245,11 +247,27 @@
_logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
}
}
- _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
+ _protocolSession = new AMQProtocolSession(this, session, _connection);
+
+ _stateManager.setProtocolSession(_protocolSession);
+
_protocolSession.init();
}
/**
+ * Called when we want to create a new IoTransport session
+ * @param brokerDetail
+ */
+ public void createIoTransportSession(BrokerDetails brokerDetail)
+ {
+ _protocolSession = new AMQProtocolSession(this, _connection);
+ _stateManager.setProtocolSession(_protocolSession);
+ IoTransport.connect_0_9(getProtocolSession(),
+ brokerDetail.getHost(), brokerDetail.getPort());
+ _protocolSession.init();
+ }
+
+ /**
* Called when the network connection is closed. This can happen, either because the client explicitly requested
* that the connection be closed, in which case nothing is done, or because the connection died. In the case
* where the connection died, an attempt to failover automatically to a new connection may be started. The failover
@@ -263,7 +281,7 @@
* @param session The MINA session.
*
* @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
- * not otherwise? The above comment doesn't make that clear.
+ * not otherwise? The above comment doesn't make that clear.
*/
public void sessionClosed(IoSession session)
{
@@ -374,7 +392,7 @@
"cause isn't AMQConnectionClosedException: " + cause, cause);
AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- propagateExceptionToWaiters(amqe);
+ propagateExceptionToAllWaiters(amqe);
}
_connection.exceptionReceived(cause);
@@ -395,7 +413,7 @@
// we notify the state manager of the error in case we have any clients waiting on a state
// change. Those "waiters" will be interrupted and can handle the exception
AMQException amqe = new AMQException("Protocol handler error: " + cause, cause);
- propagateExceptionToWaiters(amqe);
+ propagateExceptionToAllWaiters(amqe);
_connection.exceptionReceived(cause);
}
}
@@ -405,11 +423,33 @@
* These are for the state manager (waiting for a state change) or a frame listener (waiting for a particular type
* of frame to arrive). When an error occurs we need to notify these waiters so that they can react appropriately.
*
+ * This should be called only when the exception is fatal for the connection.
+ *
* @param e the exception to propagate
+ *
+ * @see #propagateExceptionToFrameListeners
+ * @see #propagateExceptionToStateWaiters
*/
- public void propagateExceptionToWaiters(Exception e)
+ public void propagateExceptionToAllWaiters(Exception e)
+ {
+ propagateExceptionToFrameListeners(e);
+ propagateExceptionToStateWaiters(e);
+ }
+
+ /**
+ * This caters for the case where we only need to propogate an exception to the the frame listeners to interupt any
+ * protocol level waits.
+ *
+ * This will would normally be used to notify all Frame Listeners that Failover is about to occur and they should
+ * stop waiting and relinquish the Failover lock {@see FailoverHandler}.
+ *
+ * Once the {@link FailoverHandler} has re-established the connection then the listeners will be able to re-attempt
+ * their protocol request and so listen again for the correct frame.
+ *
+ * @param e the exception to propagate
+ */
+ public void propagateExceptionToFrameListeners(Exception e)
{
-
if (!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -421,6 +461,22 @@
}
}
+ /**
+ * This caters for the case where we only need to propogate an exception to the the state manager to interupt any
+ * thing waiting for a state change.
+ *
+ * Currently (2008-07-15) the state manager is only used during 0-8/0-9 Connection establishement.
+ *
+ * Normally the state manager would not need to be notified without notifiying the frame listeners so in normal
+ * cases {@link #propagateExceptionToAllWaiters} would be the correct choice.
+ *
+ * @param e the exception to propagate
+ */
+ public void propagateExceptionToStateWaiters(Exception e)
+ {
+ getStateManager().error(e);
+ }
+
public void notifyFailoverStarting()
{
// Set the last exception in the sync block to ensure the ordering with add.
@@ -431,7 +487,9 @@
_lastFailoverException = new FailoverException("Failing over about to start");
}
- propagateExceptionToWaiters(_lastFailoverException);
+ //Only notify the Frame listeners that failover is going to occur as the State listeners shouldn't be
+ // interupted unless failover cannot restore the state.
+ propagateExceptionToFrameListeners(_lastFailoverException);
}
public void failoverInProgress()
@@ -443,6 +501,11 @@
public void messageReceived(IoSession session, Object message) throws Exception
{
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.info(String.format("RECV: [%s] %s", this, message));
+ }
+
if(message instanceof AMQFrame)
{
final boolean debug = _logger.isDebugEnabled();
@@ -459,7 +522,7 @@
HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- bodyFrame.handle(frame.getChannel(),_protocolSession);
+ bodyFrame.handle(frame.getChannel(), _protocolSession);
_connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
@@ -508,20 +571,12 @@
if (!wasAnyoneInterested)
{
throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners, null);
+ + _frameListeners, null);
}
}
catch (AMQException e)
- {
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
- }
- }
+ {
+ propagateExceptionToFrameListeners(e);
exceptionCaught(session, e);
}
@@ -532,6 +587,11 @@
public void messageSent(IoSession session, Object message) throws Exception
{
+ if (PROTOCOL_DEBUG)
+ {
+ _protocolLogger.debug(String.format("SEND: [%s] %s", this, message));
+ }
+
final long sentMessages = _messagesOut++;
final boolean debug = _logger.isDebugEnabled();
@@ -542,34 +602,13 @@
}
_connection.bytesSent(session.getWrittenBytes());
- if (debug)
- {
- _logger.debug("Sent frame " + message);
- }
- }
-
- /*
- public void addFrameListener(AMQMethodListener listener)
- {
- _frameListeners.add(listener);
- }
-
- public void removeFrameListener(AMQMethodListener listener)
- {
- _frameListeners.remove(listener);
- }
- */
- public void attainState(AMQState s) throws AMQException
- {
- getStateManager().attainState(s);
}
- public AMQState attainState(Set<AMQState> states) throws AMQException
+ public StateWaiter createWaiter(Set<AMQState> states) throws AMQException
{
- return getStateManager().attainState(states);
+ return getStateManager().createWaiter(states);
}
-
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
@@ -617,14 +656,12 @@
{
throw _lastFailoverException;
}
-
+
_frameListeners.add(listener);
}
_protocolSession.writeFrame(frame);
- AMQMethodEvent e = listener.blockForFrame(timeout);
-
- return e;
+ return listener.blockForFrame(timeout);
// When control resumes before this line, a reply will have been received
// that matches the criteria defined in the blocking listener
}
@@ -669,8 +706,7 @@
getStateManager().changeState(AMQState.CONNECTION_CLOSING);
ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection."),0,0);
-
+ new AMQShortString("JMS client is closing the connection."), 0, 0);
final AMQFrame frame = body.generateFrame(0);
@@ -745,10 +781,6 @@
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
- if (_protocolSession != null)
- {
- _protocolSession.setStateManager(stateManager);
- }
}
public AMQProtocolSession getProtocolSession()
@@ -778,7 +810,7 @@
public MethodRegistry getMethodRegistry()
{
- return getStateManager().getMethodRegistry();
+ return _protocolSession.getMethodRegistry();
}
public ProtocolVersion getProtocolVersion()
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Aug 14 20:40:49 2008
@@ -30,7 +30,6 @@
import javax.jms.JMSException;
import javax.security.sasl.SaslClient;
-import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -38,13 +37,14 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
-import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
/**
@@ -67,8 +67,6 @@
protected final IoSession _minaProtocolSession;
- private AMQStateManager _stateManager;
-
protected WriteFuture _lastWriteFuture;
/**
@@ -86,7 +84,7 @@
* Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
* first) with the subsequent content header and content bodies.
*/
- private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>();
+ private final ConcurrentMap<Integer, UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer, UnprocessedMessage>();
private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
/** Counter to ensure unique queue names */
@@ -97,26 +95,17 @@
// private VersionSpecificRegistry _registry =
// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
-
private MethodRegistry _methodRegistry =
MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
-
private MethodDispatcher _methodDispatcher;
+ protected final AMQConnection _connection;
- private final AMQConnection _connection;
private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{
- this(protocolHandler, protocolSession, connection, new AMQStateManager());
-
- }
-
- public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection,
- AMQStateManager stateManager)
- {
_protocolHandler = protocolHandler;
_minaProtocolSession = protocolSession;
_minaProtocolSession.setAttachment(this);
@@ -124,20 +113,27 @@
_minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
// fixme - real value needed
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _stateManager = stateManager;
- _stateManager.setProtocolSession(this);
_protocolVersion = connection.getProtocolVersion();
_methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
- stateManager);
+ this);
_connection = connection;
}
+ public AMQProtocolSession(AMQProtocolHandler protocolHandler, AMQConnection connection)
+ {
+ _protocolHandler = protocolHandler;
+ _minaProtocolSession = null;
+ _protocolVersion = connection.getProtocolVersion();
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
+ this);
+ _connection = connection;
+ }
+
public void init()
{
// start the process of setting up the connection. This is the first place that
// data is written to the server.
-
_minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
}
@@ -161,14 +157,7 @@
public AMQStateManager getStateManager()
{
- return _stateManager;
- }
-
- public void setStateManager(AMQStateManager stateManager)
- {
- _stateManager = stateManager;
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(_protocolVersion,
- stateManager);
+ return _protocolHandler.getStateManager();
}
public String getVirtualHost()
@@ -193,7 +182,7 @@
public SaslClient getSaslClient()
{
- return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
+ return (SaslClient) _minaProtocolSession.getAttribute(SASL_CLIENT);
}
/**
@@ -235,12 +224,11 @@
*
* @throws AMQException if this was not expected
*/
- public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
- {
- final int channelId = message.getChannelId();
- if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
+ {
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
- _channelId2UnprocessedMsgArray[channelId] = message;
+ _channelId2UnprocessedMsgArray[channelId] = message;
}
else
{
@@ -250,18 +238,17 @@
public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
{
- final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
- : _channelId2UnprocessedMsgMap.get(channelId);
-
+ final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+ : _channelId2UnprocessedMsgMap.get(channelId));
if (msg == null)
{
- throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null);
+ throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first on session:" + this, null);
}
if (msg.getContentHeader() != null)
{
- throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames", null);
+ throw new AMQException(null, "Error: received duplicate content header or did not receive correct number of content body frames on session:" + this, null);
}
msg.setContentHeader(contentHeader);
@@ -275,7 +262,7 @@
{
UnprocessedMessage_0_8 msg;
final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
- if(fastAccess)
+ if (fastAccess)
{
msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId];
}
@@ -291,7 +278,7 @@
if (msg.getContentHeader() == null)
{
- if(fastAccess)
+ if (fastAccess)
{
_channelId2UnprocessedMsgArray[channelId] = null;
}
@@ -302,15 +289,7 @@
throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
}
- /*try
- {*/
msg.receiveBody(contentBody);
- /*}
- catch (UnexpectedBodyReceivedException e)
- {
- _channelId2UnprocessedMsgMap.remove(channelId);
- throw e;
- }*/
if (msg.isAllBodyDataReceived())
{
@@ -333,7 +312,7 @@
{
AMQSession session = getSession(channelId);
session.messageReceived(msg);
- if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
{
_channelId2UnprocessedMsgArray[channelId] = null;
}
@@ -431,12 +410,12 @@
return (AMQConnection) _minaProtocolSession.getAttribute(AMQ_CONNECTION);
}
- public void closeProtocolSession()
+ public void closeProtocolSession() throws AMQException
{
closeProtocolSession(true);
}
- public void closeProtocolSession(boolean waitLast)
+ public void closeProtocolSession(boolean waitLast) throws AMQException
{
_logger.debug("Waiting for last write to join.");
if (waitLast && (_lastWriteFuture != null))
@@ -445,7 +424,15 @@
}
_logger.debug("Closing protocol session");
+
final CloseFuture future = _minaProtocolSession.close();
+
+ // There is no recovery we can do if the join on the close failes so simply mark the connection CLOSED
+ // then wait for the connection to close.
+ // ritchiem: Could this release BlockingWaiters to early? The close has been done as much as possible so any
+ // error now shouldn't matter.
+
+ _protocolHandler.getStateManager().changeState(AMQState.CONNECTION_CLOSED);
future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
}
@@ -482,16 +469,16 @@
{
final AMQSession session = getSession(channelId);
- session.confirmConsumerCancelled(consumerTag);
+ session.confirmConsumerCancelled(consumerTag.toIntValue());
}
public void setProtocolVersion(final ProtocolVersion pv)
{
_protocolVersion = pv;
_methodRegistry = MethodRegistry.getMethodRegistry(pv);
- _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, _stateManager);
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, this);
- // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+ // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
}
public byte getProtocolMinorVersion()
@@ -524,12 +511,12 @@
return _methodDispatcher;
}
-
public void setTicket(int ticket, int channelId)
{
final AMQSession session = getSession(channelId);
session.setTicket(ticket);
}
+
public void setMethodDispatcher(MethodDispatcher methodDispatcher)
{
_methodDispatcher = methodDispatcher;
@@ -545,4 +532,14 @@
{
_protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
}
+
+ public void notifyError(Exception error)
+ {
+ _protocolHandler.propagateExceptionToAllWaiters(error);
+ }
+
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
+ {
+ // No-op, interface munging
+ }
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java Thu Aug 14 20:40:49 2008
@@ -20,9 +20,14 @@
*/
package org.apache.qpid.client.protocol;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.util.BlockingWaiter;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
@@ -54,38 +59,17 @@
* </table>
*
* @todo Might be neater if this method listener simply wrapped another that provided the method handling using a
- * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
- * seem to use it. So wrapping the listeners is possible.
- *
- * @todo What is to stop a blocking method listener, receiving a second method whilst it is registered as a listener,
- * overwriting the first one before the caller of the block method has had a chance to examine it? If one-shot
- * behaviour is to be intended it should be enforced, perhaps by always returning false once the blocked for
- * method has been received.
- *
- * @todo Interuption is caught but not handled. This could be allowed to fall through. This might actually be usefull
- * for fail-over where a thread is blocking when failure happens, it could be interrupted to abandon or retry
- * when this happens. At the very least, restore the interrupted status flag.
- *
+ * methodRecevied method. The processMethod takes an additional channelId, however none of the implementations
+ * seem to use it. So wrapping the listeners is possible.
* @todo If the retrotranslator can handle it, could use a SynchronousQueue to implement this rendezvous. Need to
- * check that SynchronousQueue has a non-blocking put method available.
+ * check that SynchronousQueue has a non-blocking put method available.
*/
-public abstract class BlockingMethodFrameListener implements AMQMethodListener
+public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener
{
- /** This flag is used to indicate that the blocked for method has been received. */
- private volatile boolean _ready = false;
-
- /** Used to protect the shared event and ready flag between the producer and consumer. */
- private final Object _lock = new Object();
-
- /** Used to hold the most recent exception that is passed to the {@link #error(Exception)} method. */
- private volatile Exception _error;
/** Holds the channel id for the channel upon which this listener is waiting for a response. */
protected int _channelId;
- /** Holds the incoming method. */
- protected AMQMethodEvent _doneEvt = null;
-
/**
* Creates a new method listener, that filters incoming method to just those that match the specified channel id.
*
@@ -104,7 +88,14 @@
*
* @return <tt>true</tt> if the method was handled, <tt>false</tt> otherwise.
*/
- public abstract boolean processMethod(int channelId, AMQMethodBody frame); // throws AMQException;
+ public abstract boolean processMethod(int channelId, AMQMethodBody frame);
+
+ public boolean process(AMQMethodEvent evt)
+ {
+ AMQMethodBody method = evt.getMethod();
+
+ return (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
+ }
/**
* Informs this listener that an AMQP method has been received.
@@ -113,37 +104,9 @@
*
* @return <tt>true</tt> if this listener has handled the method, <tt>false</tt> otherwise.
*/
- public boolean methodReceived(AMQMethodEvent evt) // throws AMQException
+ public boolean methodReceived(AMQMethodEvent evt)
{
- AMQMethodBody method = evt.getMethod();
-
- /*try
- {*/
- boolean ready = (evt.getChannelId() == _channelId) && processMethod(evt.getChannelId(), method);
-
- if (ready)
- {
- // we only update the flag from inside the synchronized block
- // so that the blockForFrame method cannot "miss" an update - it
- // will only ever read the flag from within the synchronized block
- synchronized (_lock)
- {
- _doneEvt = evt;
- _ready = ready;
- _lock.notify();
- }
- }
-
- return ready;
-
- /*}
- catch (AMQException e)
- {
- error(e);
- // we rethrow the error here, and the code in the frame dispatcher will go round
- // each listener informing them that an exception has been thrown
- throw e;
- }*/
+ return received(evt);
}
/**
@@ -159,75 +122,15 @@
*/
public AMQMethodEvent blockForFrame(long timeout) throws AMQException, FailoverException
{
- synchronized (_lock)
+ try
{
- while (!_ready)
- {
- try
- {
- if (timeout == -1)
- {
- _lock.wait();
- }
- else
- {
-
- _lock.wait(timeout);
- if (!_ready)
- {
- _error = new AMQTimeoutException("Server did not respond in a timely fashion", null);
- _ready = true;
- }
- }
- }
- catch (InterruptedException e)
- {
- // IGNORE -- //fixme this isn't ideal as being interrupted isn't equivellant to sucess
- // if (!_ready && timeout != -1)
- // {
- // _error = new AMQException("Server did not respond timely");
- // _ready = true;
- // }
- }
- }
+ return (AMQMethodEvent) block(timeout);
}
-
- if (_error != null)
+ finally
{
- if (_error instanceof AMQException)
- {
- throw (AMQException) _error;
- }
- else if (_error instanceof FailoverException)
- {
- // This should ensure that FailoverException is not wrapped and can be caught.
- throw (FailoverException) _error; // needed to expose FailoverException.
- }
- else
- {
- throw new AMQException(null, "Woken up due to " + _error.getClass(), _error);
- }
+ //Prevent any more errors being notified to this waiter.
+ close();
}
-
- return _doneEvt;
}
- /**
- * This is a callback, called by the MINA dispatcher thread only. It is also called from within this
- * class to avoid code repetition but again is only called by the MINA dispatcher thread.
- *
- * @param e
- */
- public void error(Exception e)
- {
- // set the error so that the thread that is blocking (against blockForFrame())
- // can pick up the exception and rethrow to the caller
- _error = e;
-
- synchronized (_lock)
- {
- _ready = true;
- _lock.notify();
- }
- }
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Aug 14 20:40:49 2008
@@ -28,15 +28,28 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Iterator;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
/**
- * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
- * there is a separate state manager.
+ * The state manager is responsible for managing the state of the protocol session. <p/>
+ * For each {@link org.apache.qpid.client.protocol.AMQProtocolHandler} there is a separate state manager.
+ *
+ * The AMQStateManager is now attached to the {@link org.apache.qpid.client.protocol.AMQProtocolHandler} and that is the sole point of reference so that
+ * As the {@link AMQProtocolSession} changes due to failover the AMQStateManager need not be copied around.
+ *
+ * The StateManager works by any component can wait for a state change to occur by using the following sequence.
+ *
+ * <li>StateWaiter waiter = stateManager.createWaiter(Set<AMQState> states);
+ * <li> // Perform action that will cause state change
+ * <li>waiter.await();
+ *
+ * The two step process is required as there is an inherit race condition between starting a process that will cause
+ * the state to change and then attempting to wait for that change. The interest in the change must be first set up so
+ * that any asynchrous errors that occur can be delivered to the correct waiters.
*/
-public class AMQStateManager
+public class AMQStateManager implements AMQMethodListener
{
private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
@@ -45,16 +58,13 @@
/** The current state */
private AMQState _currentState;
-
- /**
- * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
- * AMQFrame.
- */
-
-
private final Object _stateLock = new Object();
+
private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
+ protected final List<StateWaiter> _waiters = new CopyOnWriteArrayList<StateWaiter>();
+ private Exception _lastException;
+
public AMQStateManager()
{
this(null);
@@ -62,18 +72,15 @@
public AMQStateManager(AMQProtocolSession protocolSession)
{
- this(AMQState.CONNECTION_NOT_STARTED, true, protocolSession);
+ this(AMQState.CONNECTION_NOT_STARTED, protocolSession);
}
- protected AMQStateManager(AMQState state, boolean register, AMQProtocolSession protocolSession)
+ protected AMQStateManager(AMQState state, AMQProtocolSession protocolSession)
{
_protocolSession = protocolSession;
_currentState = state;
-
}
-
-
public AMQState getCurrentState()
{
return _currentState;
@@ -86,107 +93,107 @@
synchronized (_stateLock)
{
_currentState = newState;
- _stateLock.notifyAll();
+
+ _logger.debug("Notififying State change to " + _waiters.size() + " : " + _waiters);
+
+ for (StateWaiter waiter : _waiters)
+ {
+ waiter.received(newState);
+ }
}
}
-
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
-
B method = evt.getMethod();
-
+
// StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId());
return true;
}
-
- public void attainState(final AMQState s) throws AMQException
+ /**
+ * Setting of the ProtocolSession will be required when Failover has been successfuly compeleted.
+ *
+ * The new {@link AMQProtocolSession} that has been re-established needs to be provided as that is now the
+ * connection to the network.
+ *
+ * @param session The new protocol session
+ */
+ public void setProtocolSession(AMQProtocolSession session)
{
- synchronized (_stateLock)
+ if (_logger.isInfoEnabled())
{
- final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
- long waitTime = MAXIMUM_STATE_WAIT_TIME;
-
- while ((_currentState != s) && (waitTime > 0))
- {
- try
- {
- _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
- }
- catch (InterruptedException e)
- {
- _logger.warn("Thread interrupted");
- }
-
- if (_currentState != s)
- {
- waitTime = waitUntilTime - System.currentTimeMillis();
- }
- }
-
- if (_currentState != s)
- {
- _logger.warn("State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + s);
- throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + s, null);
- }
+ _logger.info("Setting ProtocolSession:" + session);
}
-
- // at this point the state will have changed.
+ _protocolSession = session;
}
- public AMQProtocolSession getProtocolSession()
+ /**
+ * Propogate error to waiters
+ *
+ * @param error The error to propogate.
+ */
+ public void error(Exception error)
{
- return _protocolSession;
+ if (_waiters.size() == 0)
+ {
+ _logger.error("No Waiters for error saving as last error:" + error.getMessage());
+ _lastException = error;
+ }
+ for (StateWaiter waiter : _waiters)
+ {
+ _logger.error("Notifying Waiters(" + _waiters + ") for error:" + error.getMessage());
+ waiter.error(error);
+ }
}
- public void setProtocolSession(AMQProtocolSession session)
+ /**
+ * This provides a single place that the maximum time for state change to occur can be accessed.
+ * It is currently set via System property amqj.MaximumStateWait
+ *
+ * @return long Milliseconds value for a timeout
+ */
+ public long getWaitTimeout()
{
- _protocolSession = session;
+ return MAXIMUM_STATE_WAIT_TIME;
}
- public MethodRegistry getMethodRegistry()
+ /**
+ * Create and add a new waiter to the notifcation list.
+ *
+ * @param states The waiter will attempt to wait for one of these desired set states to be achived.
+ *
+ * @return the created StateWaiter.
+ */
+ public StateWaiter createWaiter(Set<AMQState> states)
{
- return getProtocolSession().getMethodRegistry();
+ final StateWaiter waiter;
+ synchronized (_stateLock)
+ {
+ waiter = new StateWaiter(this, _currentState, states);
+
+ _waiters.add(waiter);
+ }
+
+ return waiter;
}
- public AMQState attainState(Set<AMQState> stateSet) throws AMQException
+ /**
+ * Remove the waiter from the notification list.
+ *
+ * @param waiter The waiter to remove.
+ */
+ public void removeWaiter(StateWaiter waiter)
{
synchronized (_stateLock)
{
- final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
- long waitTime = MAXIMUM_STATE_WAIT_TIME;
-
- while (!stateSet.contains(_currentState) && (waitTime > 0))
- {
- try
- {
- _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
- }
- catch (InterruptedException e)
- {
- _logger.warn("Thread interrupted");
- }
-
- if (!stateSet.contains(_currentState))
- {
- waitTime = waitUntilTime - System.currentTimeMillis();
- }
- }
-
- if (!stateSet.contains(_currentState))
- {
- _logger.warn("State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + stateSet);
- throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState
- + ", desired state: " + stateSet, null);
- }
- return _currentState;
+ _waiters.remove(waiter);
}
+ }
-
+ public Exception getLastException()
+ {
+ return _lastException;
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java Thu Aug 14 20:40:49 2008
@@ -33,6 +33,6 @@
public interface StateAwareMethodListener<B extends AMQMethodBody>
{
- void methodReceived(AMQStateManager stateManager, B body, int channelId) throws AMQException;
+ void methodReceived(AMQProtocolSession session, B body, int channelId) throws AMQException;
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java Thu Aug 14 20:40:49 2008
@@ -20,103 +20,110 @@
*/
package org.apache.qpid.client.state;
+import org.apache.qpid.client.util.BlockingWaiter;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.AMQException;
-
-import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.Logger;
+
+import java.util.Set;
/**
- * Waits for a particular state to be reached.
+ * This is an implementation of the {@link BlockingWaiter} to provide error handing and a waiting mechanism for state
+ * changes.
+ *
+ * On construction the current state and a set of States to await for is provided.
+ *
+ * When await() is called the state at constuction is compared against the awaitStates. If the state at construction is
+ * a desired state then await() returns immediately.
+ *
+ * Otherwise it will block for the set timeout for a desired state to be achieved.
+ *
+ * The state changes are notified via the {@link #process} method.
+ *
+ * Any notified error is handled by the BlockingWaiter and thrown from the {@link #block} method.
+ *
*/
-public class StateWaiter implements StateListener
+public class StateWaiter extends BlockingWaiter<AMQState>
{
private static final Logger _logger = LoggerFactory.getLogger(StateWaiter.class);
- private final AMQState _state;
-
- private volatile boolean _newStateAchieved;
-
- private volatile Throwable _throwable;
-
- private final Object _monitor = new Object();
- private static final long TIME_OUT = 1000 * 60 * 2;
-
- public StateWaiter(AMQState state)
+ Set<AMQState> _awaitStates;
+ private AMQState _startState;
+ private AMQStateManager _stateManager;
+
+ /**
+ *
+ * @param stateManager The StateManager
+ * @param currentState
+ * @param awaitStates
+ */
+ public StateWaiter(AMQStateManager stateManager, AMQState currentState, Set<AMQState> awaitStates)
{
- _state = state;
+ _logger.info("New StateWaiter :" + currentState + ":" + awaitStates);
+ _stateManager = stateManager;
+ _awaitStates = awaitStates;
+ _startState = currentState;
}
- public void waituntilStateHasChanged() throws AMQException
+ /**
+ * When the state is changed this StateWaiter is notified to process the change.
+ *
+ * @param state The new state that has been achieved.
+ * @return
+ */
+ public boolean process(AMQState state)
{
- synchronized (_monitor)
- {
- //
- // The guard is required in case we are woken up by a spurious
- // notify().
- //
- while (!_newStateAchieved && (_throwable == null))
- {
- try
- {
- _logger.debug("State " + _state + " not achieved so waiting...");
- _monitor.wait(TIME_OUT);
- // fixme this won't cause the timeout to exit the loop. need to set _throwable
- }
- catch (InterruptedException e)
- {
- _logger.debug("Interrupted exception caught while waiting: " + e, e);
- }
- }
- }
+ return _awaitStates.contains(state);
+ }
- if (_throwable != null)
- {
- _logger.debug("Throwable reached state waiter: " + _throwable);
- if (_throwable instanceof AMQException)
- {
- throw (AMQException) _throwable;
- }
- else
- {
- throw new AMQException(null, "Error: " + _throwable, _throwable); // FIXME: this will wrap FailoverException in throwable which will prevent it being caught.
- }
- }
+ /**
+ * Await for the requried State to be achieved within the default timeout.
+ * @return The achieved state that was requested.
+ * @throws AMQException The exception that prevented the required state from being achived.
+ */
+ public AMQState await() throws AMQException
+ {
+ return await(_stateManager.getWaitTimeout());
}
- public void stateChanged(AMQState oldState, AMQState newState)
+ /**
+ * Await for the requried State to be achieved.
+ *
+ * <b>It is the responsibility of this class to remove the waiter from the StateManager
+ *
+ * @param timeout The time in milliseconds to wait for any of the states to be achived.
+ * @return The achieved state that was requested.
+ * @throws AMQException The exception that prevented the required state from being achived.
+ */
+ public AMQState await(long timeout) throws AMQException
{
- synchronized (_monitor)
+ try
{
- if (_logger.isDebugEnabled())
+ if (process(_startState))
{
- _logger.debug("stateChanged called changing from :" + oldState + " to :" + newState);
+ return _startState;
}
- if (_state == newState)
+ try
{
- _newStateAchieved = true;
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("New state reached so notifying monitor");
- }
+ return (AMQState) block(timeout);
+ }
+ catch (FailoverException e)
+ {
+ _logger.error("Failover occured whilst waiting for states:" + _awaitStates);
- _monitor.notifyAll();
+ e.printStackTrace();
+ return null;
}
}
- }
-
- public void error(Throwable t)
- {
- synchronized (_monitor)
+ finally
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("exceptionThrown called");
- }
+ //Prevent any more errors being notified to this waiter.
+ close();
- _throwable = t;
- _monitor.notifyAll();
+ //Remove the waiter from the notifcation list in the statee manager
+ _stateManager.removeWaiter(this);
}
}
}
Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java Thu Aug 14 20:40:49 2008
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.client.state.listener;
-import org.apache.qpid.AMQException;
+
import org.apache.qpid.client.protocol.BlockingMethodFrameListener;
import org.apache.qpid.framing.AMQMethodBody;
@@ -34,7 +34,7 @@
_expectedClass = expectedClass;
}
- public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
+ public boolean processMethod(int channelId, AMQMethodBody frame)
{
return _expectedClass.isInstance(frame);
}