You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/08 11:10:11 UTC
svn commit: r619823 [12/19] - in
/incubator/qpid/branches/thegreatmerge/qpid: ./ cpp/ dotnet/
dotnet/Qpid.Buffer.Tests/Properties/ dotnet/Qpid.Buffer/Properties/
dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/Channel/
dotnet/Qpid.Client.Tests/Commo...
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -33,7 +33,7 @@
/**
* @author Apache Software Foundation
*/
-public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
+public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener<ExchangeBoundOkBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ExchangeBoundOkMethodHandler.class);
private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler();
@@ -46,14 +46,14 @@
private ExchangeBoundOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ExchangeBoundOkBody body, int channelId)
+ throws AMQException
{
if (_logger.isDebugEnabled())
{
- ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod();
- _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: "
- + body.replyText);
+ _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.getReplyCode() + " text: "
+ + body.getReplyText());
}
}
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java Fri Feb 8 02:09:37 2008
@@ -33,7 +33,7 @@
/**
* @author Apache Software Foundation
*/
-public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
+public class QueueDeleteOkMethodHandler implements StateAwareMethodListener<QueueDeleteOkBody>
{
private static final Logger _logger = LoggerFactory.getLogger(QueueDeleteOkMethodHandler.class);
private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
@@ -46,13 +46,14 @@
private QueueDeleteOkMethodHandler()
{ }
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
- throws AMQException
- {
+ public void methodReceived(AMQStateManager stateManager, QueueDeleteOkBody body, int channelId)
+ throws AMQException
+ {
if (_logger.isDebugEnabled())
{
- QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
- _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
+ _logger.debug("Received Queue.Delete-Ok message, message count: " + body.getMessageCount());
}
}
+
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Fri Feb 8 02:09:37 2008
@@ -26,6 +26,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.ContentHeaderBody;
@@ -51,6 +52,18 @@
super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
}
+ public UnprocessedMessage_0_8(int channelId, BasicReturnBody body)
+ {
+ //FIXME: TGM, SRSLY 4RL
+ super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false);
+ }
+
+ public UnprocessedMessage_0_8(int channelId, BasicDeliverBody body)
+ {
+ //FIXME: TGM, SRSLY 4RL
+ super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false);
+ }
+
public void receiveBody(ContentBody body)
{
@@ -119,8 +132,8 @@
}
if(_deliverBody != null)
{
- buf.append("Delivery tag " + _deliverBody.deliveryTag);
- buf.append("Consumer tag " + _deliverBody.consumerTag);
+ buf.append("Delivery tag " + _deliverBody.getDeliveryTag());
+ buf.append("Consumer tag " + _deliverBody.getConsumerTag());
buf.append("Deliver Body " + _deliverBody);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Feb 8 02:09:37 2008
@@ -21,10 +21,16 @@
package org.apache.qpid.client.protocol;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import org.apache.qpid.AMQConnectionClosedException;
import org.apache.qpid.AMQDisconnectedException;
import org.apache.qpid.AMQException;
@@ -39,16 +45,7 @@
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.pool.ReadWriteThreadModel;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -58,6 +55,7 @@
import org.slf4j.LoggerFactory;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
@@ -214,6 +212,36 @@
e.printStackTrace();
}
+ if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio"))
+ {
+ try
+ {
+ //Add IO Protection Filters
+ IoFilterChain chain = session.getFilterChain();
+
+ int buf_size = 32768;
+ if (session.getConfig() instanceof SocketSessionConfig)
+ {
+ buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize();
+ }
+ session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+ ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+ readfilter.setMaximumConnectionBufferSize(buf_size);
+ readfilter.attach(chain);
+
+ WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+ writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+ writefilter.attach(chain);
+ session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+
+ _logger.info("Using IO Read/Write Filter Protection");
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+ }
+ }
_protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
_protocolSession.init();
}
@@ -380,94 +408,109 @@
public void messageReceived(IoSession session, Object message) throws Exception
{
- final boolean debug = _logger.isDebugEnabled();
- final long msgNumber = ++_messageReceivedCount;
-
- if (debug && ((msgNumber % 1000) == 0))
+ if(message instanceof AMQFrame)
{
- _logger.debug("Received " + _messageReceivedCount + " protocol messages");
- }
+ final boolean debug = _logger.isDebugEnabled();
+ final long msgNumber = ++_messageReceivedCount;
- AMQFrame frame = (AMQFrame) message;
+ if (debug && ((msgNumber % 1000) == 0))
+ {
+ _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+ }
- final AMQBody bodyFrame = frame.getBodyFrame();
+ AMQFrame frame = (AMQFrame) message;
- HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+ final AMQBody bodyFrame = frame.getBodyFrame();
- switch (bodyFrame.getFrameType())
- {
- case AMQMethodBody.TYPE:
+ HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- if (debug)
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
- }
+ switch (bodyFrame.getFrameType())
+ {
+ case AMQMethodBody.TYPE:
- final AMQMethodEvent<AMQMethodBody> evt =
- new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+ if (debug)
+ {
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
+ }
- try
- {
+ final AMQMethodEvent<AMQMethodBody> evt =
+ new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
+ try
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
}
- }
- if (!wasAnyoneInterested)
- {
- throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners, null);
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
+ + _frameListeners, null);
+ }
}
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
+ catch (AMQException e)
{
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
+ getStateManager().error(e);
+ if (!_frameListeners.isEmpty())
{
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
}
+
+ exceptionCaught(session, e);
}
- exceptionCaught(session, e);
- }
+ break;
- break;
+ case ContentHeaderBody.TYPE:
- case ContentHeaderBody.TYPE:
+ _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
+ break;
- _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
- break;
+ case ContentBody.TYPE:
- case ContentBody.TYPE:
+ _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
+ break;
- _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
- break;
+ case HeartbeatBody.TYPE:
- case HeartbeatBody.TYPE:
+ if (debug)
+ {
+ _logger.debug("Received heartbeat");
+ }
- if (debug)
- {
- _logger.debug("Received heartbeat");
- }
+ break;
- break;
+ default:
- default:
+ }
+ _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
+ else if (message instanceof ProtocolInitiation)
+ {
+ // We get here if the server sends a response to our initial protocol header
+ // suggesting an alternate ProtocolVersion; the server will then close the
+ // connection.
+ ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+ ProtocolVersion pv = protocolInit.checkVersion();
+ getConnection().setProtocolVersion(pv);
- _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+ // get round a bug in old versions of qpid whereby the connection is not closed
+ _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+ }
}
private static int _messagesOut;
@@ -506,6 +549,12 @@
getStateManager().attainState(s);
}
+ public AMQState attainState(Set<AMQState> states) throws AMQException
+ {
+ return getStateManager().attainState(states);
+ }
+
+
/**
* Convenience method that writes a frame to the protocol session. Equivalent to calling
* getProtocolSession().write().
@@ -600,16 +649,11 @@
{
getStateManager().changeState(AMQState.CONNECTION_CLOSING);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- final AMQFrame frame =
- ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(),
- _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
- 0, // classId
- 0, // methodId
- AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
- new AMQShortString("JMS client is closing the connection.")); // replyText
+ ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ new AMQShortString("JMS client is closing the connection."),0,0);
+
+
+ final AMQFrame frame = body.generateFrame(0);
try
{
@@ -708,5 +752,15 @@
public byte getProtocolMinorVersion()
{
return _protocolSession.getProtocolMinorVersion();
+ }
+
+ public MethodRegistry getMethodRegistry()
+ {
+ return getStateManager().getMethodRegistry();
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolSession.getProtocolVersion();
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Feb 8 02:09:37 2008
@@ -21,40 +21,31 @@
package org.apache.qpid.client.protocol;
import org.apache.commons.lang.StringUtils;
-
import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.ConnectionTuneParameters;
-// import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MainRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.JMSException;
-import javax.security.sasl.SaslClient;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
/**
* Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -101,12 +92,19 @@
protected int _queueId = 1;
protected final Object _queueIdLock = new Object();
- private byte _protocolMinorVersion;
- private byte _protocolMajorVersion;
- private VersionSpecificRegistry _registry =
- MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
+ private ProtocolVersion _protocolVersion;
+// private VersionSpecificRegistry _registry =
+// MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
+
+
+ private MethodRegistry _methodRegistry =
+ MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
- private final AMQConnection _connection;
+
+ private MethodDispatcher _methodDispatcher;
+
+
+ private final AMQConnection _connection;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{
@@ -126,6 +124,9 @@
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = stateManager;
_stateManager.setProtocolSession(this);
+ _protocolVersion = connection.getProtocolVersion();
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
+ stateManager);
_connection = connection;
}
@@ -135,7 +136,7 @@
// start the process of setting up the connection. This is the first place that
// data is written to the server.
- _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+ _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
}
public String getClientID()
@@ -164,6 +165,8 @@
public void setStateManager(AMQStateManager stateManager)
{
_stateManager = stateManager;
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(_protocolVersion,
+ stateManager);
}
public String getVirtualHost()
@@ -440,26 +443,55 @@
session.confirmConsumerCancelled(consumerTag);
}
- public void setProtocolVersion(final byte versionMajor, final byte versionMinor)
+ public void setProtocolVersion(final ProtocolVersion pv)
{
- _protocolMajorVersion = versionMajor;
- _protocolMinorVersion = versionMinor;
- _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+ _protocolVersion = pv;
+ _methodRegistry = MethodRegistry.getMethodRegistry(pv);
+ _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, _stateManager);
+
+ // _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
}
public byte getProtocolMinorVersion()
{
- return _protocolMinorVersion;
+ return _protocolVersion.getMinorVersion();
}
public byte getProtocolMajorVersion()
{
- return _protocolMajorVersion;
+ return _protocolVersion.getMajorVersion();
+ }
+
+ public ProtocolVersion getProtocolVersion()
+ {
+ return _protocolVersion;
}
- public VersionSpecificRegistry getRegistry()
+// public VersionSpecificRegistry getRegistry()
+// {
+// return _registry;
+// }
+
+ public MethodRegistry getMethodRegistry()
{
- return _registry;
+ return _methodRegistry;
}
+ public MethodDispatcher getMethodDispatcher()
+ {
+ return _methodDispatcher;
+ }
+
+
+ public void setTicket(int ticket, int channelId)
+ {
+ final AMQSession session = getSession(channelId);
+ session.setTicket(ticket);
+ }
+
+
+ public void setMethodDispatcher(MethodDispatcher methodDispatcher)
+ {
+ _methodDispatcher = methodDispatcher;
+ }
}
Copied: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java (from r616809, incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java?p2=incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java&p1=incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java&r1=616809&r2=619823&rev=619823&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java Fri Feb 8 02:09:37 2008
@@ -27,6 +27,6 @@
{
public AMQMethodNotImplementedException(AMQMethodBody body)
{
- super("Unexpected Method Received: " + body.getClass().getName());
+ super(null, "Unexpected Method Received: " + body.getClass().getName(), null);
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java Fri Feb 8 02:09:37 2008
@@ -24,8 +24,22 @@
* States used in the AMQ protocol. Used by the finite state machine to determine
* valid responses.
*/
-public class AMQState
+public enum AMQState
{
+
+ CONNECTION_NOT_STARTED(1, "CONNECTION_NOT_STARTED"),
+
+ CONNECTION_NOT_TUNED(2, "CONNECTION_NOT_TUNED"),
+
+ CONNECTION_NOT_OPENED(3, "CONNECTION_NOT_OPENED"),
+
+ CONNECTION_OPEN(4, "CONNECTION_OPEN"),
+
+ CONNECTION_CLOSING(5, "CONNECTION_CLOSING"),
+
+ CONNECTION_CLOSED(6, "CONNECTION_CLOSED");
+
+
private final int _id;
private final String _name;
@@ -41,16 +55,6 @@
return "AMQState: id = " + _id + " name: " + _name;
}
- public static final AMQState CONNECTION_NOT_STARTED = new AMQState(1, "CONNECTION_NOT_STARTED");
-
- public static final AMQState CONNECTION_NOT_TUNED = new AMQState(2, "CONNECTION_NOT_TUNED");
-
- public static final AMQState CONNECTION_NOT_OPENED = new AMQState(3, "CONNECTION_NOT_OPENED");
-
- public static final AMQState CONNECTION_OPEN = new AMQState(4, "CONNECTION_OPEN");
-
- public static final AMQState CONNECTION_CLOSING = new AMQState(5, "CONNECTION_CLOSING");
-
- public static final AMQState CONNECTION_CLOSED = new AMQState(6, "CONNECTION_CLOSED");
-
+
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Fri Feb 8 02:09:37 2008
@@ -21,42 +21,15 @@
package org.apache.qpid.client.state;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.handler.BasicCancelOkMethodHandler;
-import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
-import org.apache.qpid.client.handler.BasicReturnMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
-import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
-import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
-import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler;
-import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
@@ -72,11 +45,11 @@
/** The current state */
private AMQState _currentState;
+
/**
* Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
* AMQFrame.
*/
- protected final Map _state2HandlersMap = new HashMap();
private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
private final Object _stateLock = new Object();
@@ -96,53 +69,10 @@
{
_protocolSession = protocolSession;
_currentState = state;
- if (register)
- {
- registerListeners();
- }
+
}
- protected void registerListeners()
- {
- Map frame2handlerMap = new HashMap();
- // we need to register a map for the null (i.e. all state) handlers otherwise you get
- // a stack overflow in the handler searching code when you present it with a frame for which
- // no handlers are registered
- //
- _state2HandlersMap.put(null, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
-
- //
- // ConnectionOpen handlers
- //
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandler.getInstance());
- frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
- frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
- frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
- frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
- frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
- frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
- }
public AMQState getCurrentState()
{
@@ -176,56 +106,14 @@
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
- StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
- if (handler != null)
- {
- handler.methodReceived(this, _protocolSession, evt);
- return true;
- }
-
- return false;
+ B method = evt.getMethod();
+
+ // StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
+ method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId());
+ return true;
}
- protected StateAwareMethodListener findStateTransitionHandler(AMQState currentState, AMQMethodBody frame)
- // throws IllegalStateTransitionException
- {
- final Class clazz = frame.getClass();
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz);
- }
-
- final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState);
-
- if (classToHandlerMap == null)
- {
- // if no specialised per state handler is registered look for a
- // handler registered for "all" states
- return findStateTransitionHandler(null, frame);
- }
-
- final StateAwareMethodListener handler = (StateAwareMethodListener) classToHandlerMap.get(clazz);
- if (handler == null)
- {
- if (currentState == null)
- {
- _logger.debug("No state transition handler defined for receiving frame " + frame);
-
- return null;
- }
- else
- {
- // if no specialised per state handler is registered look for a
- // handler registered for "all" states
- return findStateTransitionHandler(null, frame);
- }
- }
- else
- {
- return handler;
- }
- }
public void attainState(final AMQState s) throws AMQException
{
@@ -271,5 +159,47 @@
public void setProtocolSession(AMQProtocolSession session)
{
_protocolSession = session;
+ }
+
+ public MethodRegistry getMethodRegistry()
+ {
+ return getProtocolSession().getMethodRegistry();
+ }
+
+ public AMQState attainState(Set<AMQState> stateSet) throws AMQException
+ {
+ synchronized (_stateLock)
+ {
+ final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
+ long waitTime = MAXIMUM_STATE_WAIT_TIME;
+
+ while (!stateSet.contains(_currentState) && (waitTime > 0))
+ {
+ try
+ {
+ _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.warn("Thread interrupted");
+ }
+
+ if (!stateSet.contains(_currentState))
+ {
+ waitTime = waitUntilTime - System.currentTimeMillis();
+ }
+ }
+
+ if (!stateSet.contains(_currentState))
+ {
+ _logger.warn("State not achieved within permitted time. Current state " + _currentState
+ + ", desired state: " + stateSet);
+ throw new AMQException(null, "State not achieved within permitted time. Current state " + _currentState
+ + ", desired state: " + stateSet, null);
+ }
+ return _currentState;
+ }
+
+
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java Fri Feb 8 02:09:37 2008
@@ -21,6 +21,7 @@
package org.apache.qpid.client.state;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -29,8 +30,9 @@
* the opportunity to update state.
*
*/
-public interface StateAwareMethodListener
+public interface StateAwareMethodListener<B extends AMQMethodBody>
{
- void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession,
- AMQMethodEvent evt) throws AMQException;
+
+ void methodReceived(AMQStateManager stateManager, B body, int channelId) throws AMQException;
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Fri Feb 8 02:09:37 2008
@@ -23,6 +23,7 @@
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
@@ -34,7 +35,6 @@
import java.io.IOException;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
/**
@@ -95,28 +95,26 @@
{
SocketConnector result;
// FIXME - this needs to be sorted to use the new Mina MultiThread SA.
- if (Boolean.getBoolean("qpidnio"))
+ if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio"))
{
- _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set.");
- // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
+ _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
+ ? "Qpid NIO is new default"
+ : "Sysproperty 'qpidnio' is set"));
+ result = new MultiThreadSocketConnector();
}
- // else
-
+ else
{
_logger.info("Using Mina NIO");
result = new SocketConnector(); // non-blocking connector
}
-
// Don't have the connector's worker thread wait around for other connections (we only use
// one SocketConnector per connection at the moment anyway). This allows short-running
// clients (like unit tests) to complete quickly.
result.setWorkerTimeout(0);
-
return result;
}
});
break;
-
case VM:
{
_instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
@@ -151,7 +149,15 @@
{
if (AutoCreate)
{
- createVMBroker(port);
+ if (AutoCreate)
+ {
+ createVMBroker(port);
+ }
+ else
+ {
+ throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+ + " does not exist. Auto create disabled.", null);
+ }
}
else
{
@@ -271,8 +277,7 @@
}
AMQVMBrokerCreationException amqbce =
- new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
- amqbce.initCause(e);
+ new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e);
throw amqbce;
}
@@ -282,16 +287,17 @@
public static void killAllVMBrokers()
{
_logger.info("Killing all VM Brokers");
- _acceptor.unbindAll();
-
- Iterator keys = _inVmPipeAddress.keySet().iterator();
-
- while (keys.hasNext())
+ if (_acceptor != null)
{
- int id = (Integer) keys.next();
- _inVmPipeAddress.remove(id);
+ _acceptor.unbindAll();
}
-
+ synchronized (_inVmPipeAddress)
+ {
+ _inVmPipeAddress.clear();
+ }
+ _acceptor = null;
+ _currentInstance = -1;
+ _currentVMPort = -1;
}
public static void killVMBroker(int port)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Fri Feb 8 02:09:37 2008
@@ -22,15 +22,12 @@
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
-
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.pool.ReadWriteThreadModel;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,10 +46,10 @@
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
{
- final VmPipeConnector ioConnector = new VmPipeConnector();
+ final VmPipeConnector ioConnector = new QpidVmPipeConnector();
final IoServiceConfig cfg = ioConnector.getDefaultConfig();
- cfg.setThreadModel(ReadWriteThreadModel.getInstance());
+ cfg.setThreadModel(ReadWriteThreadModel.getInstance());
final VmPipeAddress address = new VmPipeAddress(_port);
_logger.info("Attempting connection to " + address);
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Fri Feb 8 02:09:37 2008
@@ -33,6 +33,7 @@
*/
public static final String OPTIONS_RETRY = "retries";
public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
+ public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
public static final int DEFAULT_PORT = 5672;
public static final String TCP = "tcp";
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Fri Feb 8 02:09:37 2008
@@ -21,6 +21,7 @@
package org.apache.qpid.jms;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
import java.util.List;
@@ -43,6 +44,7 @@
public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
public static final byte URL_0_8 = 1;
public static final byte URL_0_10 = 2;
+ public static final String OPTIONS_PROTOCOL_VERSION = "protocolVersion";
byte getURLVersion();
@@ -91,4 +93,6 @@
AMQShortString getTemporaryQueueExchangeName();
AMQShortString getTemporaryTopicExchangeName();
+
+ ProtocolVersion getProtocolVersion();
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java Fri Feb 8 02:09:37 2008
@@ -22,7 +22,6 @@
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,34 +34,22 @@
/** The default number of times to retry each server */
public static final int DEFAULT_SERVER_RETRIES = 0;
- /**
- * The index into the hostDetails array of the broker to which we are connected
- */
+ /** The index into the hostDetails array of the broker to which we are connected */
private int _currentBrokerIndex = -1;
- /**
- * The number of times to retry connecting for each server
- */
+ /** The number of times to retry connecting for each server */
private int _serverRetries;
- /**
- * The current number of retry attempts made
- */
+ /** The current number of retry attempts made */
private int _currentServerRetry;
- /**
- * The number of times to cycle through the servers
- */
+ /** The number of times to cycle through the servers */
private int _cycleRetries;
- /**
- * The current number of cycles performed.
- */
+ /** The current number of cycles performed. */
private int _currentCycleRetries;
- /**
- * Array of BrokerDetail used to make connections.
- */
+ /** Array of BrokerDetail used to make connections. */
private ConnectionURL _connectionDetails;
public FailoverRoundRobinServers(ConnectionURL connectionDetails)
@@ -128,6 +115,8 @@
public BrokerDetails getNextBrokerDetails()
{
+ boolean doDelay = false;
+
if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1))
{
if (_currentServerRetry < _serverRetries)
@@ -143,6 +132,7 @@
else
{
_logger.info("Retrying " + _connectionDetails.getBrokerDetails(_currentBrokerIndex));
+ doDelay=true;
}
_currentServerRetry++;
@@ -175,6 +165,7 @@
else
{
_logger.info("Retrying " + _connectionDetails.getBrokerDetails(_currentBrokerIndex));
+ doDelay=true;
}
_currentServerRetry++;
@@ -189,7 +180,28 @@
}
}
- return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+ BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+
+ String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
+ if (delayStr != null && doDelay)
+ {
+ Long delay = Long.parseLong(delayStr);
+ _logger.info("Delay between connect retries:" + delay);
+ try
+ {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException ie)
+ {
+ return null;
+ }
+ }
+ else
+ {
+ _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable.");
+ }
+
+ return broker;
}
public void setBroker(BrokerDetails broker)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java Fri Feb 8 02:09:37 2008
@@ -22,25 +22,23 @@
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FailoverSingleServer implements FailoverMethod
{
+ private static final Logger _logger = LoggerFactory.getLogger(FailoverSingleServer.class);
+
/** The default number of times to rety a conection to this server */
public static final int DEFAULT_SERVER_RETRIES = 1;
- /**
- * The details of the Single Server
- */
+ /** The details of the Single Server */
private BrokerDetails _brokerDetail;
- /**
- * The number of times to retry connecting to the sever
- */
+ /** The number of times to retry connecting to the sever */
private int _retries;
- /**
- * The current number of attempts made to the server
- */
+ /** The current number of attempts made to the server */
private int _currentRetries;
@@ -78,7 +76,7 @@
public BrokerDetails getCurrentBrokerDetails()
{
- return _brokerDetail;
+ return _brokerDetail;
}
public BrokerDetails getNextBrokerDetails()
@@ -91,11 +89,29 @@
{
if (_currentRetries < _retries)
{
- _currentRetries ++;
+ _currentRetries++;
}
+ }
+
- return _brokerDetail;
+ String delayStr = _brokerDetail.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
+ if (delayStr != null && _currentRetries != 1)
+ {
+ Long delay = Long.parseLong(delayStr);
+ _logger.info("Delay between connect retries:" + delay);
+ try
+ {
+
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException ie)
+ {
+ _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable.");
+ return null;
+ }
}
+
+ return _brokerDetail;
}
public void setBroker(BrokerDetails broker)
@@ -138,10 +154,10 @@
public String toString()
{
- return "SingleServer:\n"+
- "Max Retries:"+_retries+
- "\nCurrent Retry:"+_currentRetries+
- "\n"+_brokerDetail+"\n";
+ return "SingleServer:\n" +
+ "Max Retries:" + _retries +
+ "\nCurrent Retry:" + _currentRetries +
+ "\n" + _brokerDetail + "\n";
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Fri Feb 8 02:09:37 2008
@@ -99,8 +99,6 @@
_logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL));
}
-
-
createConnectionFactories(data, environment);
createDestinations(data, environment);
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java Fri Feb 8 02:09:37 2008
@@ -126,7 +126,7 @@
_logger.info("Consuming messages");
for (int i = 0; i < NUM_MESSAGES; i++)
{
- Message msg = consumer.receive(1500);
+ Message msg = consumer.receive(3000);
assertNotNull("Message should not be null", msg);
assertTrue("Message should be a text message", msg instanceof TextMessage);
assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText());
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Fri Feb 8 02:09:37 2008
@@ -20,13 +20,17 @@
*/
package org.apache.qpid.client;
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.testutil.QpidTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -34,7 +38,9 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java Fri Feb 8 02:09:37 2008
@@ -20,13 +20,17 @@
*/
package org.apache.qpid.client;
+import junit.framework.TestCase;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
import org.apache.qpid.testutil.QpidTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -34,6 +38,9 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+
+import java.util.Hashtable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java Fri Feb 8 02:09:37 2008
@@ -29,7 +29,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.spi.InitialContextFactory;
@@ -65,12 +74,15 @@
private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(2); // all messages Sent Lock
private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(2); // all messages Sent Lock
private final CountDownLatch _allFirstMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
- private final CountDownLatch _allSecondMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
+ private final CountDownLatch _allSecondMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
+
+ private String oldImmediatePrefetch;
protected void setUp() throws Exception
{
super.setUp();
+ oldImmediatePrefetch = System.getProperty(AMQSession.IMMEDIATE_PREFETCH);
System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
_clientConnection = getConnection("guest", "guest");
@@ -109,8 +121,12 @@
{
_clientConnection.close();
- _producerConnection.close();
super.tearDown();
+ if (oldImmediatePrefetch == null)
+ {
+ oldImmediatePrefetch = AMQSession.IMMEDIATE_PREFETCH_DEFAULT;
+ }
+ System.setProperty(AMQSession.IMMEDIATE_PREFETCH, oldImmediatePrefetch);
}
public void testAsynchronousRecieve()
@@ -238,7 +254,7 @@
try
{
- _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+ _allSecondMessagesSent.await(5000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java Fri Feb 8 02:09:37 2008
@@ -37,7 +37,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener
+public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener<ChannelCloseBody>
{
private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseMethodHandlerNoCloseOk.class);
@@ -48,14 +48,15 @@
return _handler;
}
- public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+ public void methodReceived(AMQStateManager stateManager, ChannelCloseBody method, int channelId)
throws AMQException
{
_logger.debug("ChannelClose method received");
- ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+ final AMQProtocolSession session = stateManager.getProtocolSession();
- AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
- AMQShortString reason = method.replyText;
+
+ AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+ AMQShortString reason = method.getReplyText();
if (_logger.isDebugEnabled())
{
_logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
@@ -95,6 +96,6 @@
}
- protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+ session.channelClosed(channelId, errorCode, String.valueOf(reason));
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java Fri Feb 8 02:09:37 2008
@@ -25,17 +25,13 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
@@ -52,6 +48,9 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener
{
@@ -135,8 +134,11 @@
/*
close channel and send guff then send ok no errors
+ REMOVE TEST - The behaviour after server has sent close is undefined.
+ the server should be free to fail as it may wish to reclaim its resources
+ immediately after close.
*/
- public void testSendingMethodsAfterClose() throws Exception
+ /*public void testSendingMethodsAfterClose() throws Exception
{
try
{
@@ -158,6 +160,17 @@
// Set StateManager to manager that ignores Close-oks
AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession();
+
+ MethodDispatcher d = protocolSession.getMethodDispatcher();
+
+ MethodDispatcher wrappedDispatcher = (MethodDispatcher)
+ Proxy.newProxyInstance(d.getClass().getClassLoader(),
+ d.getClass().getInterfaces(),
+ new MethodDispatcherProxyHandler(
+ (ClientMethodDispatcherImpl) d));
+
+ protocolSession.setMethodDispatcher(wrappedDispatcher);
+
AMQStateManager newStateManager = new NoCloseOKStateManager(protocolSession);
newStateManager.changeState(oldStateManager.getCurrentState());
@@ -247,7 +260,7 @@
}
}
}
-
+*/
private void createChannelAndTest(int channel) throws FailoverException
{
// Create A channel
@@ -274,10 +287,9 @@
private void sendClose(int channel)
{
- AMQFrame frame =
- ChannelCloseOkBody.createAMQFrame(channel,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion());
+ ChannelCloseOkBody body =
+ ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelCloseOkBody();
+ AMQFrame frame = body.generateFrame(channel);
((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
}
@@ -335,35 +347,43 @@
private void declareExchange(int channelId, String _type, String _name, boolean nowait)
throws AMQException, FailoverException
{
- AMQFrame exchangeDeclare =
- ExchangeDeclareBody.createAMQFrame(channelId,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments
- false, // autoDelete
- false, // durable
- new AMQShortString(_name), // exchange
- false, // internal
- nowait, // nowait
- true, // passive
- 0, // ticket
- new AMQShortString(_type)); // type
+ ExchangeDeclareBody body =
+ ((AMQConnection) _connection).getProtocolHandler()
+ .getMethodRegistry()
+ .createExchangeDeclareBody(0,
+ new AMQShortString(_name),
+ new AMQShortString(_type),
+ true,
+ false,
+ false,
+ false,
+ nowait,
+ null);
+ AMQFrame exchangeDeclare = body.generateFrame(channelId);
+ AMQProtocolHandler protocolHandler = ((AMQConnection) _connection).getProtocolHandler();
+
+
+ if (nowait)
+ {
+ protocolHandler.writeFrame(exchangeDeclare);
+ }
+ else
+ {
+ protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+ }
+
+// return null;
+// }
+// }, (AMQConnection)_connection).execute();
- if (nowait)
- {
- ((AMQConnection) _connection).getProtocolHandler().writeFrame(exchangeDeclare);
- }
- else
- {
- ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class,
- SYNC_TIMEOUT);
- }
}
private void createChannel(int channelId) throws AMQException, FailoverException
{
- ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId,
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand
+ ChannelOpenBody body =
+ ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
+
+ ((AMQConnection) _connection).getProtocolHandler().syncWrite(body.generateFrame(channelId), // outOfBand
ChannelOpenOkBody.class);
}
@@ -392,4 +412,28 @@
public void failoverComplete()
{ }
+
+ private static final class MethodDispatcherProxyHandler implements InvocationHandler
+ {
+ private final ClientMethodDispatcherImpl _underlyingDispatcher;
+ private final ChannelCloseMethodHandlerNoCloseOk _handler = ChannelCloseMethodHandlerNoCloseOk.getInstance();
+
+
+ public MethodDispatcherProxyHandler(ClientMethodDispatcherImpl dispatcher)
+ {
+ _underlyingDispatcher = dispatcher;
+ }
+
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+ {
+ if(method.getName().equals("dispatchChannelClose"))
+ {
+ _handler.methodReceived(_underlyingDispatcher.getStateManager(),
+ (ChannelCloseBody) args[0], (Integer)args[1]);
+ }
+ Method dispatcherMethod = _underlyingDispatcher.getClass().getMethod(method.getName(), method.getParameterTypes());
+ return dispatcherMethod.invoke(_underlyingDispatcher, args);
+
+ }
+ }
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java Fri Feb 8 02:09:37 2008
@@ -59,49 +59,7 @@
super(protocolSession);
}
- protected void registerListeners()
- {
- Map frame2handlerMap = new HashMap();
-
- // we need to register a map for the null (i.e. all state) handlers otherwise you get
- // a stack overflow in the handler searching code when you present it with a frame for which
- // no handlers are registered
- //
- _state2HandlersMap.put(null, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
-
- frame2handlerMap = new HashMap();
- frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
-
- //
- // ConnectionOpen handlers
- //
- frame2handlerMap = new HashMap();
- // Use Test Handler for Close methods to not send Close-OKs
- frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance());
-
- frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
- frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
- frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
- frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
- frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
- frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
- frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
- frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
- _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
- }
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Fri Feb 8 02:09:37 2008
@@ -41,6 +41,14 @@
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
+/**
+ * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
+ * a static on a base test helper class, e.g. TestUtils.
+ *
+ * @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored
+ * out to make creating this test variation simpler. Want to make this variation available through LocalCircuit,
+ * driven by the test model.
+ */
public class DurableSubscriptionTest extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
@@ -113,12 +121,26 @@
con.close();
}
- public void testDurability() throws Exception
+ public void testDurabilityAUTOACK() throws AMQException, JMSException, URLSyntaxException
{
+ durabilityImpl(Session.AUTO_ACKNOWLEDGE);
+ }
- AMQConnection con = (AMQConnection) getConnection("guest", "guest");
- AMQTopic topic = new AMQTopic(con, "MyDurableSubscriptionTestTopic");
- Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ public void testDurabilityNOACKSessionPerConnection() throws AMQException, JMSException, URLSyntaxException
+ {
+ durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE);
+ }
+
+ public void testDurabilityAUTOACKSessionPerConnection() throws AMQException, JMSException, URLSyntaxException
+ {
+ durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE);
+ }
+
+ private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException
+ {
+ AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ AMQTopic topic = new AMQTopic(con, "MyTopic");
+ Session session1 = con.createSession(false, ackMode);
MessageConsumer consumer1 = session1.createConsumer(topic);
Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -144,10 +166,83 @@
consumer2.close();
- Session session3 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+ producer.send(session1.createTextMessage("B"));
+
+ _logger.info("Receive message on consumer 1 :expecting B");
+ msg = consumer1.receive(500);
+ assertNotNull("Consumer 1 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 1 :expecting null");
+ msg = consumer1.receive(500);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+ Session session3 = con.createSession(false, ackMode);
MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
- producer.send(session1.createTextMessage("B"));
+ _logger.info("Receive message on consumer 3 :expecting B");
+ msg = consumer3.receive(500);
+ assertNotNull("Consumer 3 should get message 'B'.", msg);
+ assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+ _logger.info("Receive message on consumer 3 :expecting null");
+ msg = consumer3.receive(500);
+ assertNull("There should be no more messages for consumption on consumer3.", msg);
+
+ consumer1.close();
+ consumer3.close();
+
+ con.close();
+ }
+
+ private void durabilityImplSessionPerConnection(int ackMode) throws AMQException, JMSException, URLSyntaxException
+ {
+ Message msg;
+
+ // Create producer.
+ AMQConnection con0 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ con0.start();
+ Session session0 = con0.createSession(false, ackMode);
+
+ AMQTopic topic = new AMQTopic(con0, "MyTopic");
+
+ Session sessionProd = con0.createSession(false, ackMode);
+ MessageProducer producer = sessionProd.createProducer(topic);
+
+ // Create consumer 1.
+ AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ con1.start();
+ Session session1 = con1.createSession(false, ackMode);
+
+ MessageConsumer consumer1 = session0.createConsumer(topic);
+
+ // Create consumer 2.
+ AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ con2.start();
+ Session session2 = con2.createSession(false, ackMode);
+
+ TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
+
+ // Send message and check that both consumers get it and only it.
+ producer.send(session0.createTextMessage("A"));
+
+ msg = consumer1.receive(500);
+ assertNotNull("Message should be available", msg);
+ assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
+ msg = consumer1.receive(500);
+ assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+ msg = consumer2.receive();
+ assertNotNull(msg);
+ assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
+ msg = consumer2.receive(500);
+ assertNull("There should be no more messages for consumption on consumer2.", msg);
+
+ // Detach the durable subscriber.
+ consumer2.close();
+ session2.close();
+ con2.close();
+
+ // Send message and receive on open consumer.
+ producer.send(session0.createTextMessage("B"));
_logger.info("Receive message on consumer 1 :expecting B");
msg = consumer1.receive(100);
@@ -156,14 +251,26 @@
msg = consumer1.receive(100);
assertEquals(null, msg);
+ // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
+ AMQConnection con3 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+ con3.start();
+ Session session3 = con3.createSession(false, ackMode);
+
+ TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
+
_logger.info("Receive message on consumer 3 :expecting B");
msg = consumer3.receive(100);
assertEquals("B", ((TextMessage) msg).getText());
_logger.info("Receive message on consumer 3 :expecting null");
- msg = consumer3.receive(100);
- assertEquals(null, msg);
+ msg = consumer3.receive(500);
+ assertNull("There should be no more messages for consumption on consumer3.", msg);
- con.close();
+ consumer1.close();
+ consumer3.close();
+
+ con0.close();
+ con1.close();
+ con3.close();
}
public static junit.framework.Test suite()