You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/30 17:53:19 UTC
svn commit: r524144 [1/2] - in
/incubator/qpid/branches/client_restructure/java: ./
common/src/main/java/org/apache/qpid/framing/ newclient/
newclient/src/main/java/org/apache/qpid/nclient/amqp/
newclient/src/main/java/org/apache/qpid/nclient/amqp/samp...
Author: rajith
Date: Fri Mar 30 08:53:18 2007
New Revision: 524144
URL: http://svn.apache.org/viewvc?view=rev&rev=524144
Log:
added state support to the new client and modified the example to illustrate it
Added:
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java
Modified:
incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java
incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java
incubator/qpid/branches/client_restructure/java/newclient/.project
incubator/qpid/branches/client_restructure/java/newclient/pom.xml
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
incubator/qpid/branches/client_restructure/java/pom.xml
Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Fri Mar 30 08:53:18 2007
@@ -24,10 +24,6 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import java.util.HashMap;
-import java.util.Map;
public class AMQDataBlockDecoder
{
@@ -68,13 +64,11 @@
BodyFactory bodyFactory;
if (type == AMQRequestBody.TYPE)
{
- AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
- bodyFactory = new AMQRequestBodyFactory(protocolSession);
+ bodyFactory = new AMQRequestBodyFactory();
}
else if (type == AMQResponseBody.TYPE)
{
- AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
- bodyFactory = new AMQResponseBodyFactory(protocolSession);
+ bodyFactory = new AMQResponseBodyFactory();
}
else
{
Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java Fri Mar 30 08:53:18 2007
@@ -22,21 +22,19 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-public class AMQMethodBodyFactory implements BodyFactory
+public class AMQMethodBodyFactory implements BodyFactory, ProtocolVersionList
{
private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
- private final AMQVersionAwareProtocolSession _protocolSession;
+ VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
- public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+ public AMQMethodBodyFactory()
{
- _protocolSession = protocolSession;
}
public AMQMethodBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
- return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize);
+ return _registry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize);
}
}
Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java Fri Mar 30 08:53:18 2007
@@ -21,7 +21,6 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQRequestBody extends AMQBody
{
@@ -31,12 +30,10 @@
protected long requestId;
protected long responseMark;
protected AMQMethodBody methodPayload;
- protected AMQVersionAwareProtocolSession protocolSession;
-
+
// Constructor
- public AMQRequestBody(AMQVersionAwareProtocolSession protocolSession)
+ public AMQRequestBody()
{
- this.protocolSession = protocolSession;
}
public AMQRequestBody(long requestId, long responseMark,
@@ -45,7 +42,6 @@
this.requestId = requestId;
this.responseMark = responseMark;
this.methodPayload = methodPayload;
- protocolSession = null;
}
@@ -75,17 +71,12 @@
protected void populateFromBuffer(ByteBuffer buffer, long size)
throws AMQFrameDecodingException
- {
- if (protocolSession == null)
- {
- throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor.");
- }
-
+ {
requestId = EncodingUtils.readLong(buffer);
responseMark = EncodingUtils.readLong(buffer);
int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away
- AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession);
+ AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory();
methodPayload = methodBodyFactory.createBody(buffer, size);
}
Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java Fri Mar 30 08:53:18 2007
@@ -21,20 +21,16 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQRequestBodyFactory implements BodyFactory
-{
- private final AMQVersionAwareProtocolSession protocolSession;
-
- public AMQRequestBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+{
+ public AMQRequestBodyFactory()
{
- this.protocolSession = protocolSession;
}
public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
- AMQRequestBody rb = new AMQRequestBody(protocolSession);
+ AMQRequestBody rb = new AMQRequestBody();
rb.populateFromBuffer(in, bodySize);
return rb;
}
Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java Fri Mar 30 08:53:18 2007
@@ -21,7 +21,6 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public class AMQResponseBody extends AMQBody
{
@@ -32,12 +31,10 @@
protected long requestId;
protected int batchOffset;
protected AMQMethodBody methodPayload;
- protected AMQVersionAwareProtocolSession protocolSession;
// Constructor
- public AMQResponseBody(AMQVersionAwareProtocolSession protocolSession)
+ public AMQResponseBody()
{
- this.protocolSession = protocolSession;
}
public AMQResponseBody(long responseId, long requestId,
@@ -47,7 +44,6 @@
this.requestId = requestId;
this.batchOffset = batchOffset;
this.methodPayload = methodPayload;
- protocolSession = null;
}
// Field methods
@@ -77,16 +73,13 @@
protected void populateFromBuffer(ByteBuffer buffer, long size)
throws AMQFrameDecodingException
- {
- if (protocolSession == null)
- throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor.");
-
+ {
responseId = EncodingUtils.readLong(buffer);
requestId = EncodingUtils.readLong(buffer);
// XXX
batchOffset = EncodingUtils.readInteger(buffer);
- AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession);
+ AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory();
methodPayload = methodBodyFactory.createBody(buffer, size);
}
Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java Fri Mar 30 08:53:18 2007
@@ -25,16 +25,13 @@
public class AMQResponseBodyFactory implements BodyFactory
{
- private final AMQVersionAwareProtocolSession protocolSession;
-
- public AMQResponseBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+ public AMQResponseBodyFactory()
{
- this.protocolSession = protocolSession;
}
public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
- AMQResponseBody rb = new AMQResponseBody(protocolSession);
+ AMQResponseBody rb = new AMQResponseBody();
rb.populateFromBuffer(in, bodySize);
return rb;
}
Modified: incubator/qpid/branches/client_restructure/java/newclient/.project
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/.project?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/.project (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/.project Fri Mar 30 08:53:18 2007
@@ -1,7 +1,10 @@
<projectDescription>
<name>qpid-newclient</name>
<comment/>
- <projects/>
+ <projects>
+ <project>qpid-broker</project>
+ <project>qpid-common</project>
+ </projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
Modified: incubator/qpid/branches/client_restructure/java/newclient/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/pom.xml?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/pom.xml (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/pom.xml Fri Mar 30 08:53:18 2007
@@ -58,6 +58,12 @@
</dependency>
<dependency>
+ <groupId>commons-configuration</groupId>
+ <artifactId>commons-configuration</artifactId>
+ <version>1.3</version>
+ </dependency>
+
+ <dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
</dependency>
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java Fri Mar 30 08:53:18 2007
@@ -40,7 +40,10 @@
import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
@@ -58,295 +61,309 @@
public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener
{
- private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
+ private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
- // the channelId assigned for this channel
- private int _channelId;
+ // the channelId assigned for this channel
+ private int _channelId;
- private Phase _phase;
+ private Phase _phase;
- private AMQPState _currentState;
+ private AMQPState _currentState;
- private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
-
- private final AMQPState[] _validResumeStates = new AMQPState[] { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
-
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
-
- private final Lock _lock = new ReentrantLock();
-
- private final Condition _channelNotOpend = _lock.newCondition();
-
- private final Condition _channelNotClosed = _lock.newCondition();
-
- private final Condition _channelFlowNotResponded = _lock.newCondition();
-
- private final Condition _channelNotResumed = _lock.newCondition();
-
- private ChannelOpenOkBody _channelOpenOkBody;
-
- private ChannelCloseOkBody _channelCloseOkBody;
-
- private ChannelFlowOkBody _channelFlowOkBody;
-
- private ChannelOkBody _channelOkBody;
-
- private ChannelCloseBody _channelCloseBody;
-
- protected AMQPChannel(int channelId, Phase phase)
- {
- _channelId = channelId;
- _phase = phase;
- _currentState = AMQPState.CHANNEL_NOT_OPENED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
- }
-
- /**
- * -------------------------------------------
- * API Methods
- * --------------------------------------------
- */
-
- /**
- * Opens the channel
- */
- public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotOpend.await();
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOpenOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.open", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * Close the channel
- */
- public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotClosed.await();
- AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
- _currentState = AMQPState.CHANNEL_CLOSED;
- return _channelCloseOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.close", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * Channel Flow
- */
- public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelFlowOkBody = null;
- if (channelFlowBody.active)
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
- }
- else
- {
- checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
- }
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelFlowNotResponded.await();
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
- handleChannelFlowState(_channelFlowOkBody.active);
- return _channelFlowOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.flow", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * Close the channel
- */
- public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _channelOkBody = null;
- checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
- AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _channelNotResumed.await();
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_channelOkBody,
- "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
- _currentState = AMQPState.CHANNEL_OPENED;
- return _channelOkBody;
- }
- catch (Exception e)
- {
- throw new AMQPException("Error in channel.resume", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * -------------------------------------------
- * AMQPMethodListener methods
- * --------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- _lock.lock();
- try
- {
- if (evt.getMethod() instanceof ChannelOpenOkBody)
- {
- _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
- _channelNotOpend.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseOkBody)
- {
- _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
- _channelNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelCloseBody)
- {
- _channelCloseBody = (ChannelCloseBody)evt.getMethod();
- // release the correct lock as u may have some conditions waiting.
- // while an error occured and the broker has sent a close.
- releaseLocks();
- handleChannelClose(_channelCloseBody);
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowOkBody)
- {
- _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
- _channelFlowNotResponded.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ChannelFlowBody)
- {
- handleChannelFlow((ChannelFlowBody) evt.getMethod());
- return true;
- }
- else if (evt.getMethod() instanceof ChannelOkBody)
- {
- _channelOkBody = (ChannelOkBody) evt.getMethod();
- // In this case the only method expecting channel-ok is channel-resume
- // haven't implemented ping and pong.
- _channelNotResumed.signal();
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- private void handleChannelClose(ChannelCloseBody channelCloseBody)
- {
- _currentState = AMQPState.CHANNEL_CLOSED;
- // handle channel related cleanup
- }
-
- private void releaseLocks()
- {
- if(_currentState == AMQPState.CHANNEL_NOT_OPENED)
- {
- _channelNotOpend.signal();
- _channelNotResumed.signal(); // It could be a channel.resume call
- }
- else if(_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
- {
- _channelFlowNotResponded.signal();
- }
- else if(_currentState == AMQPState.CHANNEL_CLOSED)
- {
- _channelNotResumed.signal();
- }
- }
-
- private void checkIfConnectionClosed()throws AMQPException
- {
- if (_channelCloseBody != null)
- {
- String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() +
- " with reply code (" + _channelCloseBody.getReplyCode() + ") " +
- "caused by class " + _channelCloseBody.getClassId() +
- " and method " + _channelCloseBody.getMethod();
-
- throw new AMQPException(error);
- }
- }
-
- private void handleChannelFlow(ChannelFlowBody channelFlowBody)
- {
- _lock.lock();
- try
- {
- handleChannelFlowState(channelFlowBody.active);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- private void handleChannelFlowState(boolean flow)
- {
- _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
- }
+ private AMQPStateManager _stateManager;
+
+ private final AMQPState[] _validCloseStates = new AMQPState[]
+ { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+
+ private final AMQPState[] _validResumeStates = new AMQPState[]
+ { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
+
+ private final Lock _lock = new ReentrantLock();
+
+ private final Condition _channelNotOpend = _lock.newCondition();
+
+ private final Condition _channelNotClosed = _lock.newCondition();
+
+ private final Condition _channelFlowNotResponded = _lock.newCondition();
+
+ private final Condition _channelNotResumed = _lock.newCondition();
+
+ private ChannelOpenOkBody _channelOpenOkBody;
+
+ private ChannelCloseOkBody _channelCloseOkBody;
+
+ private ChannelFlowOkBody _channelFlowOkBody;
+
+ private ChannelOkBody _channelOkBody;
+
+ private ChannelCloseBody _channelCloseBody;
+
+ protected AMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager)
+ {
+ _channelId = channelId;
+ _phase = phase;
+ _stateManager = stateManager;
+ _currentState = AMQPState.CHANNEL_NOT_OPENED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ }
+
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
+
+ /**
+ * Opens the channel
+ */
+ public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotOpend.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
+ notifyState(AMQPState.CHANNEL_OPENED);
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOpenOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.open", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Close the channel
+ */
+ public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotClosed.await();
+ AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
+ notifyState(AMQPState.CHANNEL_CLOSED);
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ return _channelCloseOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.close", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Channel Flow
+ */
+ public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelFlowOkBody = null;
+ if (channelFlowBody.active)
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
+ }
+ else
+ {
+ checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
+ }
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelFlowNotResponded.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
+ handleChannelFlowState(_channelFlowOkBody.active);
+ return _channelFlowOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.flow", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Close the channel
+ */
+ public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _channelOkBody = null;
+ checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _channelNotResumed.await();
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_channelOkBody,
+ "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
+ notifyState(AMQPState.CHANNEL_OPENED);
+ _currentState = AMQPState.CHANNEL_OPENED;
+ return _channelOkBody;
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error in channel.resume", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * -------------------------------------------
+ * AMQPMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ if (evt.getMethod() instanceof ChannelOpenOkBody)
+ {
+ _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
+ _channelNotOpend.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseOkBody)
+ {
+ _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
+ _channelNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelCloseBody)
+ {
+ _channelCloseBody = (ChannelCloseBody) evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleChannelClose(_channelCloseBody);
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowOkBody)
+ {
+ _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
+ _channelFlowNotResponded.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelFlowBody)
+ {
+ handleChannelFlow((ChannelFlowBody) evt.getMethod());
+ return true;
+ }
+ else if (evt.getMethod() instanceof ChannelOkBody)
+ {
+ _channelOkBody = (ChannelOkBody) evt.getMethod();
+ // In this case the only method expecting channel-ok is channel-resume
+ // haven't implemented ping and pong.
+ _channelNotResumed.signal();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException
+ {
+ notifyState(AMQPState.CHANNEL_CLOSED);
+ _currentState = AMQPState.CHANNEL_CLOSED;
+ // handle channel related cleanup
+ }
+
+ private void releaseLocks()
+ {
+ if (_currentState == AMQPState.CHANNEL_NOT_OPENED)
+ {
+ _channelNotOpend.signal();
+ _channelNotResumed.signal(); // It could be a channel.resume call
+ }
+ else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
+ {
+ _channelFlowNotResponded.signal();
+ }
+ else if (_currentState == AMQPState.CHANNEL_CLOSED)
+ {
+ _channelNotResumed.signal();
+ }
+ }
+
+ private void checkIfConnectionClosed() throws AMQPException
+ {
+ if (_channelCloseBody != null)
+ {
+ String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code ("
+ + _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method "
+ + _channelCloseBody.getMethod();
+
+ throw new AMQPException(error);
+ }
+ }
+
+ private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ handleChannelFlowState(channelFlowBody.active);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ private void handleChannelFlowState(boolean flow)throws AMQPException
+ {
+ notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND);
+ _currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
+ }
+
+ private void notifyState(AMQPState newState) throws AMQPException
+ {
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
+ }
}
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java Fri Mar 30 08:53:18 2007
@@ -78,178 +78,178 @@
*/
public class AMQPClassFactory
{
- //Need an event manager per connection
- private AMQPEventManager _eventManager = new QpidEventManager();
-
- // Need a state manager per connection
- private AMQPStateManager _stateManager = new QpidStateManager();
-
- //Need a phase pipe per connection
- private Phase _phase;
-
- //One instance per connection
- private AMQPConnection _amqpConnection;
-
- public AMQPClassFactory()
- {
-
- }
-
- public AMQPConnection createConnection(String urlStr,ConnectionType type)throws AMQPException, URLSyntaxException
- {
- AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
- return createConnectionClass(url,type);
- }
-
- public AMQPConnection createConnectionClass(ConnectionURL url,ConnectionType type)throws AMQPException
- {
- if (_amqpConnection == null)
- {
- PhaseContext ctx = new DefaultPhaseContext();
- ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
-
- TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type,ctx);
- _amqpConnection = new AMQPConnection(conn);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class,_amqpConnection);
- _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class,_amqpConnection);
- }
- return _amqpConnection;
- }
-
- public AMQPChannel createChannelClass(int channel)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPChannel amqpChannel = new AMQPChannel(channel,_phase);
- _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
- _eventManager.addMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
- return amqpChannel;
- }
-
- public void destroyChannelClass(int channel,AMQPChannel amqpChannel)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
- _eventManager.removeMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
- }
-
- public AMQPExchange createExchangeClass(int channel)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPExchange amqpExchange = new AMQPExchange(channel,_phase);
- _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
- _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
- return amqpExchange;
- }
-
- public void destoryExchangeClass(int channel, AMQPExchange amqpExchange)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
- _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
- }
-
- public AMQPQueue createQueueClass(int channel)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPQueue amqpQueue = new AMQPQueue(channel,_phase);
- _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
- _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
- return amqpQueue;
- }
-
- public void destroyQueueClass(int channel,AMQPQueue amqpQueue)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
- _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
- }
-
- public AMQPMessage createMessageClass(int channel,AMQPMessageCallBack messageCb)throws AMQPException
- {
- checkIfConnectionStarted();
- AMQPMessage amqpMessage = new AMQPMessage(channel,_phase,messageCb);
- _eventManager.addMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageGetBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOkBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageRejectBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageQosBody.class,amqpMessage);
- _eventManager.addMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
-
- return amqpMessage;
- }
-
- public void destoryMessageClass(int channel,AMQPMessage amqpMessage)throws AMQPException
- {
- _eventManager.removeMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageGetBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOkBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageRejectBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageQosBody.class,amqpMessage);
- _eventManager.removeMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
- }
-
- //This class should register as a state listener for AMQPConnection
- private void checkIfConnectionStarted() throws AMQPException
- {
- if (_phase == null)
- {
- _phase = _amqpConnection.getPhasePipe();
-
- if (_phase == null)
- {
- throw new AMQPException("Cannot create a channel until connection is ready");
- }
- }
- }
-
- /**
- * Extention point
- * Other interested parties can obtain a reference to the event manager
- * and add listeners to get notified of events
- *
- */
- public AMQPEventManager getEventManager()
- {
- return _eventManager;
- }
-
- /**
- * Extention point
- * Other interested parties can obtain a reference to the state manager
- * and add listeners to get notified of state changes
- *
- */
- public AMQPStateManager getStateManager()
- {
- return null;
- }
+ //Need an event manager per connection
+ private AMQPEventManager _eventManager = new QpidEventManager();
+
+ // Need a state manager per connection
+ private AMQPStateManager _stateManager = new QpidStateManager();
+
+ //Need a phase pipe per connection
+ private Phase _phase;
+
+ //One instance per connection
+ private AMQPConnection _amqpConnection;
+
+ public AMQPClassFactory()
+ {
+
+ }
+
+ public AMQPConnection createConnection(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException
+ {
+ AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
+ return createConnectionClass(url, type);
+ }
+
+ public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException
+ {
+ if (_amqpConnection == null)
+ {
+ PhaseContext ctx = new DefaultPhaseContext();
+ ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+
+ TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx);
+ _amqpConnection = new AMQPConnection(conn, _stateManager);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection);
+ _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection);
+ }
+ return _amqpConnection;
+ }
+
+ public AMQPChannel createChannelClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPChannel amqpChannel = new AMQPChannel(channel, _phase,_stateManager);
+ _eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+ _eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+ return amqpChannel;
+ }
+
+ public void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+ _eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+ }
+
+ public AMQPExchange createExchangeClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPExchange amqpExchange = new AMQPExchange(channel, _phase);
+ _eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+ _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+ return amqpExchange;
+ }
+
+ public void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+ _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+ }
+
+ public AMQPQueue createQueueClass(int channel) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPQueue amqpQueue = new AMQPQueue(channel, _phase);
+ _eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+ _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+ return amqpQueue;
+ }
+
+ public void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+ _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+ }
+
+ public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException
+ {
+ checkIfConnectionStarted();
+ AMQPMessage amqpMessage = new AMQPMessage(channel, _phase, messageCb);
+ _eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+ _eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+
+ return amqpMessage;
+ }
+
+ public void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException
+ {
+ _eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+ _eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+ }
+
+ //This class should register as a state listener for AMQPConnection
+ private void checkIfConnectionStarted() throws AMQPException
+ {
+ if (_phase == null)
+ {
+ _phase = _amqpConnection.getPhasePipe();
+
+ if (_phase == null)
+ {
+ throw new AMQPException("Cannot create a channel until connection is ready");
+ }
+ }
+ }
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the event manager
+ * and add listeners to get notified of events
+ *
+ */
+ public AMQPEventManager getEventManager()
+ {
+ return _eventManager;
+ }
+
+ /**
+ * Extention point
+ * Other interested parties can obtain a reference to the state manager
+ * and add listeners to get notified of state changes
+ *
+ */
+ public AMQPStateManager getStateManager()
+ {
+ return _stateManager;
+ }
}
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java Fri Mar 30 08:53:18 2007
@@ -40,7 +40,10 @@
import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.Phase;
@@ -55,366 +58,383 @@
*/
public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener
{
- private static final Logger _logger = Logger.getLogger(AMQPConnection.class);
+ private static final Logger _logger = Logger.getLogger(AMQPConnection.class);
- private Phase _phase;
+ private Phase _phase;
- private TransportConnection _connection;
+ private TransportConnection _connection;
- private long _correlationId;
+ private long _correlationId;
- private AMQPState _currentState;
-
- private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE,
- AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, AMQPState.CONNECTION_OPEN, };
-
- // The wait period until a server sends a respond
- private long _serverTimeOut = 1000;
-
- private final Lock _lock = new ReentrantLock();
-
- private final Condition _connectionNotStarted = _lock.newCondition();
-
- private final Condition _connectionNotSecure = _lock.newCondition();
-
- private final Condition _connectionNotTuned = _lock.newCondition();
-
- private final Condition _connectionNotOpened = _lock.newCondition();
-
- private final Condition _connectionNotClosed = _lock.newCondition();
-
- private ConnectionStartBody _connectionStartBody;
-
- private ConnectionSecureBody _connectionSecureBody;
-
- private ConnectionTuneBody _connectionTuneBody;
-
- private ConnectionOpenOkBody _connectionOpenOkBody;
-
- private ConnectionCloseOkBody _connectionCloseOkBody;
-
- private ConnectionCloseBody _connectionCloseBody;
-
- protected AMQPConnection(TransportConnection connection)
- {
- _connection = connection;
- _currentState = AMQPState.CONNECTION_UNDEFINED;
- _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
- }
-
- /**
- * -------------------------------------------
- * API Methods
- * --------------------------------------------
- */
-
- /**
- * Opens the TCP connection and let the formalities begin.
- */
- public ConnectionStartBody openTCPConnection() throws AMQPException
- {
- _lock.lock();
- // open the TCP connection
- try
- {
- _connectionStartBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
- _phase = _connection.connect();
-
- // waiting for ConnectionStartBody or error in connection
- //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotStarted.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
- _currentState = AMQPState.CONNECTION_NOT_STARTED;
- return _connectionStartBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error opening connection to broker", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * The current java broker implementation can send a connection tune body
- * as a response to the startOk. Not sure if that is the correct behaviour.
- */
- public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
- _phase.messageSent(msg);
- //_connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotSecure.await();
- //AMQPValidator.throwExceptionOnNull(_connectionSecureBody, "The broker didn't send the ConnectionSecureBody in time");
- //_currentState = AMQPState.CONNECTION_NOT_SECURE;
-
- checkIfConnectionClosed();
- if (_connectionTuneBody != null)
- {
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.startOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
- * issue a new challenge
- */
- public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionTuneBody = null;
- _connectionSecureBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
-
- _connectionSecureBody = null; // The server could send a fresh challenge
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
- _phase.messageSent(msg);
-
- //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotTuned.await();
- checkIfConnectionClosed();
-
- if (_connectionTuneBody != null)
- {
- _currentState = AMQPState.CONNECTION_NOT_TUNED;
- return _connectionTuneBody;
- }
- else if (_connectionSecureBody != null)
- { // oops the server sent another challenge
- _currentState = AMQPState.CONNECTION_NOT_SECURE;
- return _connectionSecureBody;
- }
- else
- {
- throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
- }
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.secureOk", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
- _connectionSecureBody = null;
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
- _phase.messageSent(msg);
- _currentState = AMQPState.CONNECTION_NOT_OPENED;
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- // If the broker sends a connection close due to an error with the
- // Connection tune ok, then this call will verify that
- checkIfConnectionClosed();
-
- _connectionOpenOkBody = null;
- checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
-
- //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- _connectionNotOpened.await();
-
- checkIfConnectionClosed();
- AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
- _currentState = AMQPState.CONNECTION_OPEN;
- return _connectionOpenOkBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.open", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
- {
- _lock.lock();
- try
- {
- _connectionCloseOkBody = null;
- checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
- AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
- _phase.messageSent(msg);
- _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
- AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
- _currentState = AMQPState.CONNECTION_CLOSED;
- return _connectionCloseOkBody;
- }
- catch (InterruptedException e)
- {
- throw new AMQPException("Error in connection.close", e);
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- /**
- * ------------------------------------------- AMQMethodListener methods
- * --------------------------------------------
- */
- public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
- {
- _lock.lock();
- try
- {
- _correlationId = evt.getCorrelationId();
-
- if (evt.getMethod() instanceof ConnectionStartBody)
- {
- _connectionStartBody = (ConnectionStartBody) evt.getMethod();
- _connectionNotStarted.signalAll();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionSecureBody)
- {
- _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
- _connectionNotSecure.signal();
- _connectionNotTuned.signal(); // in case the server has sent another chanllenge
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionTuneBody)
- {
- _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
- _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
- _connectionNotTuned.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionOpenOkBody)
- {
- _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
- _connectionNotOpened.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseOkBody)
- {
- _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
- _connectionNotClosed.signal();
- return true;
- }
- else if (evt.getMethod() instanceof ConnectionCloseBody)
- {
- _connectionCloseBody = (ConnectionCloseBody)evt.getMethod();
- // release the correct lock as u may have some conditions waiting.
- // while an error occured and the broker has sent a close.
- releaseLocks();
- handleClose();
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- _lock.unlock();
- }
- }
-
- private void handleClose() throws AMQPException
- {
- try
- {
- _currentState = AMQPState.CONNECTION_CLOSING;
- // do the required cleanup and send a ConnectionCloseOkBody
- }
- catch (Exception e)
- {
- throw new AMQPException("Error handling connection.close from broker", e);
- }
- }
-
- private void checkIfConnectionClosed()throws AMQPException
- {
- if (_connectionCloseBody != null)
- {
- String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() +
- " with reply code (" + _connectionCloseBody.getReplyCode() + ") " +
- "caused by class " + _connectionCloseBody.getClassId() +
- " and method " + _connectionCloseBody.getMethod();
-
- throw new AMQPException(error);
- }
- }
-
- private void releaseLocks()
- {
- if(_currentState == AMQPState.CONNECTION_NOT_OPENED)
- {
- _connectionNotOpened.signal();
- }
- else if(_currentState == AMQPState.CONNECTION_UNDEFINED)
- {
- _connectionNotStarted.signal();
- }
- else if(_currentState == AMQPState.CONNECTION_NOT_STARTED)
- {
- _connectionNotSecure.signal();
- }
- else if(_currentState == AMQPState.CONNECTION_NOT_SECURE)
- {
- _connectionNotTuned.signal();
- }
- }
-
- public Phase getPhasePipe()
- {
- return _phase;
- }
+ private AMQPState _currentState;
+
+ private AMQPStateManager _stateManager;
+
+ private final AMQPState[] _validCloseStates = new AMQPState[]
+ { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
+ AMQPState.CONNECTION_OPEN, };
+
+ // The wait period until a server sends a respond
+ private long _serverTimeOut = 1000;
+
+ private final Lock _lock = new ReentrantLock();
+
+ private final Condition _connectionNotStarted = _lock.newCondition();
+
+ private final Condition _connectionNotSecure = _lock.newCondition();
+
+ private final Condition _connectionNotTuned = _lock.newCondition();
+
+ private final Condition _connectionNotOpened = _lock.newCondition();
+
+ private final Condition _connectionNotClosed = _lock.newCondition();
+
+ private ConnectionStartBody _connectionStartBody;
+
+ private ConnectionSecureBody _connectionSecureBody;
+
+ private ConnectionTuneBody _connectionTuneBody;
+
+ private ConnectionOpenOkBody _connectionOpenOkBody;
+
+ private ConnectionCloseOkBody _connectionCloseOkBody;
+
+ private ConnectionCloseBody _connectionCloseBody;
+
+ protected AMQPConnection(TransportConnection connection, AMQPStateManager stateManager)
+ {
+ _connection = connection;
+ _stateManager = stateManager;
+ _currentState = AMQPState.CONNECTION_UNDEFINED;
+ _serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+ }
+
+ /**
+ * -------------------------------------------
+ * API Methods
+ * --------------------------------------------
+ */
+
+ /**
+ * Opens the TCP connection and let the formalities begin.
+ */
+ public ConnectionStartBody openTCPConnection() throws AMQPException
+ {
+ _lock.lock();
+ // open the TCP connection
+ try
+ {
+ _connectionStartBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
+ _phase = _connection.connect();
+
+ // waiting for ConnectionStartBody or error in connection
+ //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotStarted.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
+ notifyState(AMQPState.CONNECTION_NOT_STARTED);
+ _currentState = AMQPState.CONNECTION_NOT_STARTED;
+ return _connectionStartBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error opening connection to broker", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * The current java broker implementation can send a connection tune body
+ * as a response to the startOk. Not sure if that is the correct behaviour.
+ */
+ public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
+ _phase.messageSent(msg);
+ // _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS);
+ _connectionNotSecure.await();
+
+ checkIfConnectionClosed();
+ if (_connectionTuneBody != null)
+ {
+ notifyState(AMQPState.CONNECTION_NOT_TUNED);
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ notifyState(AMQPState.CONNECTION_NOT_SECURE);
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.startOk", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
+ * issue a new challenge
+ */
+ public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionTuneBody = null;
+ _connectionSecureBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
+
+ _connectionSecureBody = null; // The server could send a fresh challenge
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
+ _phase.messageSent(msg);
+
+ //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotTuned.await();
+ checkIfConnectionClosed();
+
+ if (_connectionTuneBody != null)
+ {
+ notifyState(AMQPState.CONNECTION_NOT_TUNED);
+ _currentState = AMQPState.CONNECTION_NOT_TUNED;
+ return _connectionTuneBody;
+ }
+ else if (_connectionSecureBody != null)
+ { // oops the server sent another challenge
+ notifyState(AMQPState.CONNECTION_NOT_SECURE);
+ _currentState = AMQPState.CONNECTION_NOT_SECURE;
+ return _connectionSecureBody;
+ }
+ else
+ {
+ throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.secureOk", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
+ _connectionSecureBody = null;
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
+ _phase.messageSent(msg);
+ notifyState(AMQPState.CONNECTION_NOT_OPENED);
+ _currentState = AMQPState.CONNECTION_NOT_OPENED;
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ // If the broker sends a connection close due to an error with the
+ // Connection tune ok, then this call will verify that
+ checkIfConnectionClosed();
+
+ _connectionOpenOkBody = null;
+ checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+
+ //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ _connectionNotOpened.await();
+
+ checkIfConnectionClosed();
+ AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
+ notifyState(AMQPState.CONNECTION_OPEN);
+ _currentState = AMQPState.CONNECTION_OPEN;
+ return _connectionOpenOkBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.open", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _connectionCloseOkBody = null;
+ checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
+ AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+ _phase.messageSent(msg);
+ _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+ AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
+ notifyState(AMQPState.CONNECTION_CLOSED);
+ _currentState = AMQPState.CONNECTION_CLOSED;
+ return _connectionCloseOkBody;
+ }
+ catch (InterruptedException e)
+ {
+ throw new AMQPException("Error in connection.close", e);
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * -------------------------------------------
+ * AMQMethodListener methods
+ * --------------------------------------------
+ */
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+ {
+ _lock.lock();
+ try
+ {
+ _correlationId = evt.getCorrelationId();
+
+ if (evt.getMethod() instanceof ConnectionStartBody)
+ {
+ _connectionStartBody = (ConnectionStartBody) evt.getMethod();
+ _connectionNotStarted.signalAll();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionSecureBody)
+ {
+ _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
+ _connectionNotSecure.signal();
+ _connectionNotTuned.signal(); // in case the server has sent another chanllenge
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionTuneBody)
+ {
+ _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
+ _connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk
+ _connectionNotTuned.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+ {
+ _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
+ _connectionNotOpened.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+ {
+ _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
+ _connectionNotClosed.signal();
+ return true;
+ }
+ else if (evt.getMethod() instanceof ConnectionCloseBody)
+ {
+ _connectionCloseBody = (ConnectionCloseBody) evt.getMethod();
+ // release the correct lock as u may have some conditions waiting.
+ // while an error occured and the broker has sent a close.
+ releaseLocks();
+ handleClose();
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+
+ public Phase getPhasePipe()
+ {
+ return _phase;
+ }
+
+ private void handleClose() throws AMQPException
+ {
+ try
+ {
+ notifyState(AMQPState.CONNECTION_CLOSING);
+ _currentState = AMQPState.CONNECTION_CLOSING;
+ // do the required cleanup and send a ConnectionCloseOkBody
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error handling connection.close from broker", e);
+ }
+ }
+
+ private void checkIfConnectionClosed() throws AMQPException
+ {
+ if (_connectionCloseBody != null)
+ {
+ String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code ("
+ + _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method "
+ + _connectionCloseBody.getMethod();
+
+ throw new AMQPException(error);
+ }
+ }
+
+ private void releaseLocks()
+ {
+ if (_currentState == AMQPState.CONNECTION_NOT_OPENED)
+ {
+ _connectionNotOpened.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_UNDEFINED)
+ {
+ _connectionNotStarted.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_NOT_STARTED)
+ {
+ _connectionNotSecure.signal();
+ }
+ else if (_currentState == AMQPState.CONNECTION_NOT_SECURE)
+ {
+ _connectionNotTuned.signal();
+ }
+ }
+
+ private void notifyState(AMQPState newState) throws AMQPException
+ {
+ _stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE));
+ }
}
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java Fri Mar 30 08:53:18 2007
@@ -20,23 +20,65 @@
*/
package org.apache.qpid.nclient.amqp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.core.AMQPException;
public class QpidStateManager implements AMQPStateManager
{
- public void addListener(AMQPStateListener l) throws AMQException
- {
- // TODO Auto-generated method stub
+ private static final Logger _logger = Logger.getLogger(QpidStateManager.class);
- }
+ private Map<AMQPStateType, List<AMQPStateListener>> _listernerMap = new ConcurrentHashMap<AMQPStateType, List<AMQPStateListener>>();
- public void removeListener(AMQPStateListener l) throws AMQException
+ public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
{
- // TODO Auto-generated method stub
+ List<AMQPStateListener> list;
+ if(_listernerMap.containsKey(stateType))
+ {
+ list = _listernerMap.get(stateType);
+ }
+ else
+ {
+ list = new ArrayList<AMQPStateListener>();
+ _listernerMap.put(stateType, list);
+ }
+ list.add(l);
+ }
+ public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
+ {
+ if(_listernerMap.containsKey(stateType))
+ {
+ List<AMQPStateListener> list = _listernerMap.get(stateType);
+ list.remove(l);
+ }
+ }
+
+ public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException
+ {
+
+ if(_listernerMap.containsKey(event.getStateType()))
+ {
+ List<AMQPStateListener> list = _listernerMap.get(event.getStateType());
+ for(AMQPStateListener l: list)
+ {
+ l.stateChanged(event);
+ }
+ }
+ else
+ {
+ _logger.warn("There are no registered listerners for state type" + event.getStateType());
+ }
}
}
Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java?view=auto&rev=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java Fri Mar 30 08:53:18 2007
@@ -0,0 +1,19 @@
+package org.apache.qpid.nclient.amqp.sample;
+
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class StateHelper implements AMQPStateListener
+{
+
+ public void stateChanged(AMQPStateChangedEvent event) throws AMQPException
+ {
+ String s = event.getStateType() + " changed state from " +
+ event.getOldState() + " to " + event.getNewState();
+
+ System.out.println(s);
+
+ }
+
+}