You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/30 17:53:19 UTC

svn commit: r524144 [1/2] - in /incubator/qpid/branches/client_restructure/java: ./ common/src/main/java/org/apache/qpid/framing/ newclient/ newclient/src/main/java/org/apache/qpid/nclient/amqp/ newclient/src/main/java/org/apache/qpid/nclient/amqp/samp...

Author: rajith
Date: Fri Mar 30 08:53:18 2007
New Revision: 524144

URL: http://svn.apache.org/viewvc?view=rev&rev=524144
Log:
added state support to the new client and modified the example to illustrate it

Added:
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java
Modified:
    incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
    incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
    incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
    incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java
    incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
    incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java
    incubator/qpid/branches/client_restructure/java/newclient/.project
    incubator/qpid/branches/client_restructure/java/newclient/pom.xml
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
    incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
    incubator/qpid/branches/client_restructure/java/pom.xml

Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Fri Mar 30 08:53:18 2007
@@ -24,10 +24,6 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import java.util.HashMap;
-import java.util.Map;
 
 public class AMQDataBlockDecoder
 {
@@ -68,13 +64,11 @@
         BodyFactory bodyFactory;
         if (type == AMQRequestBody.TYPE)
         {
-            AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
-            bodyFactory = new AMQRequestBodyFactory(protocolSession);
+            bodyFactory = new AMQRequestBodyFactory();
         }
         else if (type == AMQResponseBody.TYPE)
         {
-            AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
-            bodyFactory = new AMQResponseBodyFactory(protocolSession);
+            bodyFactory = new AMQResponseBodyFactory();
         }
         else
         {

Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java Fri Mar 30 08:53:18 2007
@@ -22,21 +22,19 @@
 
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
-public class AMQMethodBodyFactory implements BodyFactory
+public class AMQMethodBodyFactory implements BodyFactory, ProtocolVersionList
 {
     private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
 
-    private final AMQVersionAwareProtocolSession _protocolSession;
+    VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
     
-    public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+    public AMQMethodBodyFactory()
     {
-        _protocolSession = protocolSession;
     }
 
     public AMQMethodBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
     {
-        return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize);    
+        return _registry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize);    
     }
 }

Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBody.java Fri Mar 30 08:53:18 2007
@@ -21,7 +21,6 @@
 package org.apache.qpid.framing;
 
 import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 public class AMQRequestBody extends AMQBody
 {
@@ -31,12 +30,10 @@
     protected long requestId;
     protected long responseMark;
     protected AMQMethodBody methodPayload;
-    protected AMQVersionAwareProtocolSession protocolSession;
-
+    
     // Constructor
-    public AMQRequestBody(AMQVersionAwareProtocolSession protocolSession)
+    public AMQRequestBody()
     {
-        this.protocolSession = protocolSession;
     }
     
     public AMQRequestBody(long requestId, long responseMark,
@@ -45,7 +42,6 @@
         this.requestId = requestId;
         this.responseMark = responseMark;
         this.methodPayload = methodPayload;
-        protocolSession = null;
     }
 
 
@@ -75,17 +71,12 @@
     
     protected void populateFromBuffer(ByteBuffer buffer, long size)
         throws AMQFrameDecodingException
-    {
-        if (protocolSession == null)
-        {
-            throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor.");
-        }
-        
+    {   
         requestId = EncodingUtils.readLong(buffer);
         responseMark = EncodingUtils.readLong(buffer);
         int reserved = EncodingUtils.readInteger(buffer); // reserved, throw away
         
-        AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession);
+        AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory();
         methodPayload = methodBodyFactory.createBody(buffer, size);
     }
     

Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQRequestBodyFactory.java Fri Mar 30 08:53:18 2007
@@ -21,20 +21,16 @@
 package org.apache.qpid.framing;
 
 import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 public class AMQRequestBodyFactory implements BodyFactory
-{
-    private final AMQVersionAwareProtocolSession protocolSession;
-    
-    public AMQRequestBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+{    
+    public AMQRequestBodyFactory()
     {
-        this.protocolSession = protocolSession;
     }
     
     public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
     {
-        AMQRequestBody rb = new AMQRequestBody(protocolSession);
+        AMQRequestBody rb = new AMQRequestBody();
         rb.populateFromBuffer(in, bodySize);
         return rb;
     }

Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBody.java Fri Mar 30 08:53:18 2007
@@ -21,7 +21,6 @@
 package org.apache.qpid.framing;
 
 import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 public class AMQResponseBody extends AMQBody
 {
@@ -32,12 +31,10 @@
     protected long requestId;
     protected int batchOffset;
     protected AMQMethodBody methodPayload;
-    protected AMQVersionAwareProtocolSession protocolSession;
 
     // Constructor
-    public AMQResponseBody(AMQVersionAwareProtocolSession protocolSession)
+    public AMQResponseBody()
     {
-        this.protocolSession = protocolSession;
     }
     
     public AMQResponseBody(long responseId, long requestId,
@@ -47,7 +44,6 @@
         this.requestId = requestId;
         this.batchOffset = batchOffset;
         this.methodPayload = methodPayload;
-        protocolSession = null;
     }
 
     // Field methods
@@ -77,16 +73,13 @@
     
     protected void populateFromBuffer(ByteBuffer buffer, long size)
         throws AMQFrameDecodingException
-    {
-        if (protocolSession == null)
-            throw new AMQFrameDecodingException("Cannot call populateFromBuffer() without using correct constructor.");
-            
+    {            
         responseId = EncodingUtils.readLong(buffer);
         requestId = EncodingUtils.readLong(buffer);
         // XXX
         batchOffset = EncodingUtils.readInteger(buffer);
 
-        AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory(protocolSession);
+        AMQMethodBodyFactory methodBodyFactory = new AMQMethodBodyFactory();
         methodPayload = methodBodyFactory.createBody(buffer, size);
     }
     

Modified: incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java (original)
+++ incubator/qpid/branches/client_restructure/java/common/src/main/java/org/apache/qpid/framing/AMQResponseBodyFactory.java Fri Mar 30 08:53:18 2007
@@ -25,16 +25,13 @@
 
 public class AMQResponseBodyFactory implements BodyFactory
 {
-    private final AMQVersionAwareProtocolSession protocolSession;
-    
-    public AMQResponseBodyFactory(AMQVersionAwareProtocolSession protocolSession)
+    public AMQResponseBodyFactory()
     {
-        this.protocolSession = protocolSession;
     }
     
     public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
     {
-        AMQResponseBody rb = new AMQResponseBody(protocolSession);
+        AMQResponseBody rb = new AMQResponseBody();
         rb.populateFromBuffer(in, bodySize);
         return rb;
     }

Modified: incubator/qpid/branches/client_restructure/java/newclient/.project
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/.project?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/.project (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/.project Fri Mar 30 08:53:18 2007
@@ -1,7 +1,10 @@
 <projectDescription>
   <name>qpid-newclient</name>
   <comment/>
-  <projects/>
+  <projects>
+    <project>qpid-broker</project>
+    <project>qpid-common</project>
+  </projects>
   <buildSpec>
     <buildCommand>
       <name>org.eclipse.jdt.core.javabuilder</name>

Modified: incubator/qpid/branches/client_restructure/java/newclient/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/pom.xml?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/pom.xml (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/pom.xml Fri Mar 30 08:53:18 2007
@@ -58,6 +58,12 @@
         </dependency>
 
         <dependency>
+           <groupId>commons-configuration</groupId>
+           <artifactId>commons-configuration</artifactId>
+           <version>1.3</version>
+        </dependency>
+
+        <dependency>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
         </dependency>

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java Fri Mar 30 08:53:18 2007
@@ -40,7 +40,10 @@
 import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
 import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
 import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
 import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
 import org.apache.qpid.nclient.config.ClientConfiguration;
 import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.Phase;
@@ -58,295 +61,309 @@
 
 public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener
 {
-    private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
+	private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
 
-    // the channelId assigned for this channel
-    private int _channelId;
+	// the channelId assigned for this channel
+	private int _channelId;
 
-    private Phase _phase;
+	private Phase _phase;
 
-    private AMQPState _currentState;
+	private AMQPState _currentState;
 
-    private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
-
-    private final AMQPState[] _validResumeStates = new AMQPState[] { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
-
-    // The wait period until a server sends a respond
-    private long _serverTimeOut = 1000;
-
-    private final Lock _lock = new ReentrantLock();
-
-    private final Condition _channelNotOpend = _lock.newCondition();
-
-    private final Condition _channelNotClosed = _lock.newCondition();
-
-    private final Condition _channelFlowNotResponded = _lock.newCondition();
-
-    private final Condition _channelNotResumed = _lock.newCondition();
-
-    private ChannelOpenOkBody _channelOpenOkBody;
-
-    private ChannelCloseOkBody _channelCloseOkBody;
-
-    private ChannelFlowOkBody _channelFlowOkBody;
-
-    private ChannelOkBody _channelOkBody;
-    
-    private ChannelCloseBody _channelCloseBody;
-
-    protected AMQPChannel(int channelId, Phase phase)
-    {
-	_channelId = channelId;
-	_phase = phase;
-	_currentState = AMQPState.CHANNEL_NOT_OPENED;
-	_serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
-    }
-
-    /**
-     * ------------------------------------------- 
-     * API Methods
-     *  --------------------------------------------
-     */
-
-    /**
-     * Opens the channel
-     */
-    public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
-    {
-	_lock.lock();
-	try
-	{
-	    _channelOpenOkBody = null;
-	    checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
-	    AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
-	    _phase.messageSent(msg);
-	    
-	    //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    _channelNotOpend.await();
-	    checkIfConnectionClosed();
-	    AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
-	    _currentState = AMQPState.CHANNEL_OPENED;
-	    return _channelOpenOkBody;
-	}
-	catch (Exception e)
-	{
-	    throw new AMQPException("Error in channel.open", e);
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    /**
-     * Close the channel
-     */
-    public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
-    {
-	_lock.lock();
-	try
-	{
-	    _channelCloseOkBody = null;
-	    checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
-	    AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
-	    _phase.messageSent(msg);
-	    
-	    //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    _channelNotClosed.await();
-	    AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
-	    _currentState = AMQPState.CHANNEL_CLOSED;
-	    return _channelCloseOkBody;
-	}
-	catch (Exception e)
-	{
-	    throw new AMQPException("Error in channel.close", e);
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    /**
-     * Channel Flow
-     */
-    public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
-    {
-	_lock.lock();
-	try
-	{
-	    _channelFlowOkBody = null;
-	    if (channelFlowBody.active)
-	    {
-		checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
-	    }
-	    else
-	    {
-		checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
-	    }
-	    AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
-	    _phase.messageSent(msg);
-	    
-	    //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    _channelFlowNotResponded.await();
-	    checkIfConnectionClosed();
-	    AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
-	    handleChannelFlowState(_channelFlowOkBody.active);
-	    return _channelFlowOkBody;
-	}
-	catch (Exception e)
-	{
-	    throw new AMQPException("Error in channel.flow", e);
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    /**
-     * Close the channel
-     */
-    public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
-    {
-	_lock.lock();
-	try
-	{
-	    _channelOkBody = null;
-	    checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
-	    AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
-	    _phase.messageSent(msg);
-	    
-	    //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    _channelNotResumed.await();
-	    checkIfConnectionClosed();
-	    AMQPValidator.throwExceptionOnNull(_channelOkBody,
-		    "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
-	    _currentState = AMQPState.CHANNEL_OPENED;
-	    return _channelOkBody;
-	}
-	catch (Exception e)
-	{
-	    throw new AMQPException("Error in channel.resume", e);
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    /**
-     * ------------------------------------------- 
-     * AMQPMethodListener methods
-     * --------------------------------------------
-     */
-    public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
-    {
-	_lock.lock();
-	try
-	{
-        	if (evt.getMethod() instanceof ChannelOpenOkBody)
-        	{
-        	    _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
-        	    _channelNotOpend.signal();
-        	    return true;
-        	}
-        	else if (evt.getMethod() instanceof ChannelCloseOkBody)
-        	{
-        	    _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
-        	    _channelNotClosed.signal();
-        	    return true;
-        	}
-        	else if (evt.getMethod() instanceof ChannelCloseBody)
-        	{
-        	    _channelCloseBody = (ChannelCloseBody)evt.getMethod();
-    		    // release the correct lock as u may have some conditions waiting.
-    		   // while an error occured and the broker has sent a close.
-    		    releaseLocks();
-        	    handleChannelClose(_channelCloseBody);
-        	    return true;
-        	}
-        	else if (evt.getMethod() instanceof ChannelFlowOkBody)
-        	{
-        	    _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
-        	    _channelFlowNotResponded.signal();
-        	    return true;
-        	}
-        	else if (evt.getMethod() instanceof ChannelFlowBody)
-        	{
-        	    handleChannelFlow((ChannelFlowBody) evt.getMethod());
-        	    return true;
-        	}
-        	else if (evt.getMethod() instanceof ChannelOkBody)
-        	{
-        	    _channelOkBody = (ChannelOkBody) evt.getMethod();
-        	    // In this case the only method expecting channel-ok is channel-resume
-        	    // haven't implemented ping and pong.
-        	    _channelNotResumed.signal();
-        	    return true;
-        	}
-        	else
-        	{
-        	    return false;
-        	}
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    private void handleChannelClose(ChannelCloseBody channelCloseBody)
-    {
-	_currentState = AMQPState.CHANNEL_CLOSED;
-	// handle channel related cleanup
-    }
-    
-    private void releaseLocks()
-    {
-	if(_currentState == AMQPState.CHANNEL_NOT_OPENED)
-	{
-	    _channelNotOpend.signal();
-	    _channelNotResumed.signal(); // It could be a channel.resume call
-	}
-	else if(_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
-	{
-	    _channelFlowNotResponded.signal();
-	}
-	else if(_currentState == AMQPState.CHANNEL_CLOSED)
-	{
-	    _channelNotResumed.signal();
-	}
-    }
-    
-    private void checkIfConnectionClosed()throws AMQPException
-    {
-	if (_channelCloseBody != null)
-	{
-	    String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() +
-	                    " with reply code (" + _channelCloseBody.getReplyCode() + ") " +
-	                    "caused by class " + _channelCloseBody.getClassId() +
-	                    " and method " + _channelCloseBody.getMethod();
-	    
-	    throw new AMQPException(error);	    
-	}
-    }    
-
-    private void handleChannelFlow(ChannelFlowBody channelFlowBody)
-    {
-	_lock.lock();
-	try
-	{
-	    handleChannelFlowState(channelFlowBody.active);
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    private void handleChannelFlowState(boolean flow)
-    {
-	_currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
-    }
+	private AMQPStateManager _stateManager;
+
+	private final AMQPState[] _validCloseStates = new AMQPState[]
+	{ AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+
+	private final AMQPState[] _validResumeStates = new AMQPState[]
+	{ AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+
+	// The wait period until a server sends a respond
+	private long _serverTimeOut = 1000;
+
+	private final Lock _lock = new ReentrantLock();
+
+	private final Condition _channelNotOpend = _lock.newCondition();
+
+	private final Condition _channelNotClosed = _lock.newCondition();
+
+	private final Condition _channelFlowNotResponded = _lock.newCondition();
+
+	private final Condition _channelNotResumed = _lock.newCondition();
+
+	private ChannelOpenOkBody _channelOpenOkBody;
+
+	private ChannelCloseOkBody _channelCloseOkBody;
+
+	private ChannelFlowOkBody _channelFlowOkBody;
+
+	private ChannelOkBody _channelOkBody;
+
+	private ChannelCloseBody _channelCloseBody;
+
+	protected AMQPChannel(int channelId, Phase phase, AMQPStateManager stateManager)
+	{
+		_channelId = channelId;
+		_phase = phase;
+		_stateManager = stateManager;
+		_currentState = AMQPState.CHANNEL_NOT_OPENED;
+		_serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+	}
+
+	/**
+	 * ------------------------------------------- 
+	 * API Methods
+	 *  --------------------------------------------
+	 */
+
+	/**
+	 * Opens the channel
+	 */
+	public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_channelOpenOkBody = null;
+			checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
+			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+			_phase.messageSent(msg);
+
+			//_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_channelNotOpend.await();
+			checkIfConnectionClosed();
+			AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
+			notifyState(AMQPState.CHANNEL_OPENED);
+			_currentState = AMQPState.CHANNEL_OPENED;
+			return _channelOpenOkBody;
+		}
+		catch (Exception e)
+		{
+			throw new AMQPException("Error in channel.open", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/**
+	 * Close the channel
+	 */
+	public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_channelCloseOkBody = null;
+			checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
+			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+			_phase.messageSent(msg);
+
+			//_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_channelNotClosed.await();
+			AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
+			notifyState(AMQPState.CHANNEL_CLOSED);
+			_currentState = AMQPState.CHANNEL_CLOSED;
+			return _channelCloseOkBody;
+		}
+		catch (Exception e)
+		{
+			throw new AMQPException("Error in channel.close", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/**
+	 * Channel Flow
+	 */
+	public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_channelFlowOkBody = null;
+			if (channelFlowBody.active)
+			{
+				checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
+			}
+			else
+			{
+				checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
+			}
+			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
+			_phase.messageSent(msg);
+
+			//_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_channelFlowNotResponded.await();
+			checkIfConnectionClosed();
+			AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
+			handleChannelFlowState(_channelFlowOkBody.active);
+			return _channelFlowOkBody;
+		}
+		catch (Exception e)
+		{
+			throw new AMQPException("Error in channel.flow", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/**
+	 * Close the channel
+	 */
+	public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_channelOkBody = null;
+			checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
+			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
+			_phase.messageSent(msg);
+
+			//_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_channelNotResumed.await();
+			checkIfConnectionClosed();
+			AMQPValidator.throwExceptionOnNull(_channelOkBody,
+					"The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
+			notifyState(AMQPState.CHANNEL_OPENED);
+			_currentState = AMQPState.CHANNEL_OPENED;
+			return _channelOkBody;
+		}
+		catch (Exception e)
+		{
+			throw new AMQPException("Error in channel.resume", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/**
+	 * ------------------------------------------- 
+	 * AMQPMethodListener methods
+	 * --------------------------------------------
+	 */
+	public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			if (evt.getMethod() instanceof ChannelOpenOkBody)
+			{
+				_channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
+				_channelNotOpend.signal();
+				return true;
+			}
+			else if (evt.getMethod() instanceof ChannelCloseOkBody)
+			{
+				_channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
+				_channelNotClosed.signal();
+				return true;
+			}
+			else if (evt.getMethod() instanceof ChannelCloseBody)
+			{
+				_channelCloseBody = (ChannelCloseBody) evt.getMethod();
+				// release the correct lock as u may have some conditions waiting.
+				// while an error occured and the broker has sent a close.
+				releaseLocks();
+				handleChannelClose(_channelCloseBody);
+				return true;
+			}
+			else if (evt.getMethod() instanceof ChannelFlowOkBody)
+			{
+				_channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
+				_channelFlowNotResponded.signal();
+				return true;
+			}
+			else if (evt.getMethod() instanceof ChannelFlowBody)
+			{
+				handleChannelFlow((ChannelFlowBody) evt.getMethod());
+				return true;
+			}
+			else if (evt.getMethod() instanceof ChannelOkBody)
+			{
+				_channelOkBody = (ChannelOkBody) evt.getMethod();
+				// In this case the only method expecting channel-ok is channel-resume
+				// haven't implemented ping and pong.
+				_channelNotResumed.signal();
+				return true;
+			}
+			else
+			{
+				return false;
+			}
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	private void handleChannelClose(ChannelCloseBody channelCloseBody) throws AMQPException
+	{
+		notifyState(AMQPState.CHANNEL_CLOSED);
+		_currentState = AMQPState.CHANNEL_CLOSED;
+		// handle channel related cleanup
+	}
+
+	private void releaseLocks()
+	{
+		if (_currentState == AMQPState.CHANNEL_NOT_OPENED)
+		{
+			_channelNotOpend.signal();
+			_channelNotResumed.signal(); // It could be a channel.resume call
+		}
+		else if (_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
+		{
+			_channelFlowNotResponded.signal();
+		}
+		else if (_currentState == AMQPState.CHANNEL_CLOSED)
+		{
+			_channelNotResumed.signal();
+		}
+	}
+
+	private void checkIfConnectionClosed() throws AMQPException
+	{
+		if (_channelCloseBody != null)
+		{
+			String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() + " with reply code ("
+					+ _channelCloseBody.getReplyCode() + ") " + "caused by class " + _channelCloseBody.getClassId() + " and method "
+					+ _channelCloseBody.getMethod();
+
+			throw new AMQPException(error);
+		}
+	}
+
+	private void handleChannelFlow(ChannelFlowBody channelFlowBody)throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			handleChannelFlowState(channelFlowBody.active);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	private void handleChannelFlowState(boolean flow)throws AMQPException
+	{
+		notifyState((flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND);
+		_currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
+	}
+	
+	private void notifyState(AMQPState newState) throws AMQPException
+	{
+		_stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CHANNEL_STATE));
+	}
 }

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java Fri Mar 30 08:53:18 2007
@@ -78,178 +78,178 @@
  */
 public class AMQPClassFactory
 {
-    //Need an event manager per connection
-    private AMQPEventManager _eventManager = new QpidEventManager();
-    
-    // Need a state manager per connection
-    private AMQPStateManager _stateManager = new QpidStateManager();
-    
-    //Need a phase pipe per connection
-    private Phase _phase;
-    
-    //One instance per connection
-    private AMQPConnection _amqpConnection;
-    
-    public AMQPClassFactory()
-    {
-	
-    }
-    
-    public AMQPConnection createConnection(String urlStr,ConnectionType type)throws AMQPException, URLSyntaxException
-    {	
-	AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
-	return createConnectionClass(url,type);
-    }
-    
-    public AMQPConnection createConnectionClass(ConnectionURL url,ConnectionType type)throws AMQPException
-    {
-	if (_amqpConnection == null)
-	{
-	    PhaseContext ctx = new DefaultPhaseContext();
-	    ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
-	    
-	    TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type,ctx);	    
-	    _amqpConnection = new AMQPConnection(conn);
-	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class,_amqpConnection);
-	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class,_amqpConnection);
-	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class,_amqpConnection);
-	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class,_amqpConnection);
-	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class,_amqpConnection);
-	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class,_amqpConnection);
-	}
-	return _amqpConnection;
-    }   
-    
-    public AMQPChannel createChannelClass(int channel)throws AMQPException
-    {
-	checkIfConnectionStarted();
-	AMQPChannel amqpChannel = new AMQPChannel(channel,_phase);
-	_eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
-        _eventManager.addMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
-        _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
-        _eventManager.addMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
-        _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
-        _eventManager.addMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
-	return amqpChannel;
-    }
-    
-    public void destroyChannelClass(int channel,AMQPChannel amqpChannel)throws AMQPException
-    {
-	_eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
-        _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
-        _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
-        _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
-        _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
-        _eventManager.removeMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
-    }
-    
-    public AMQPExchange createExchangeClass(int channel)throws AMQPException
-    {
-	checkIfConnectionStarted();
-	AMQPExchange amqpExchange = new AMQPExchange(channel,_phase);
-	_eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
-        _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
-	return amqpExchange;    
-    }
-    
-    public void destoryExchangeClass(int channel, AMQPExchange amqpExchange)throws AMQPException
-    {
-	_eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
-        _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
-    }
-    
-    public AMQPQueue createQueueClass(int channel)throws AMQPException
-    {
-	checkIfConnectionStarted();
-	AMQPQueue amqpQueue = new AMQPQueue(channel,_phase);
-	_eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
-        _eventManager.addMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
-        _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
-        _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
-        _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
-	return amqpQueue;
-    }
-    
-    public void destroyQueueClass(int channel,AMQPQueue amqpQueue)throws AMQPException
-    {
-	_eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
-        _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
-        _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
-        _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
-        _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
-    }
-    
-    public AMQPMessage createMessageClass(int channel,AMQPMessageCallBack messageCb)throws AMQPException
-    {
-	checkIfConnectionStarted();
-	AMQPMessage amqpMessage = new AMQPMessage(channel,_phase,messageCb);
-	_eventManager.addMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
-        _eventManager.addMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
-        _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
-        _eventManager.addMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
-        _eventManager.addMethodEventListener(channel, MessageGetBody.class,amqpMessage);
-        _eventManager.addMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
-        _eventManager.addMethodEventListener(channel, MessageOkBody.class,amqpMessage);
-        _eventManager.addMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
-        _eventManager.addMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
-        _eventManager.addMethodEventListener(channel, MessageRejectBody.class,amqpMessage);        
-        _eventManager.addMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
-        _eventManager.addMethodEventListener(channel, MessageQosBody.class,amqpMessage);
-        _eventManager.addMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
-        
-	return amqpMessage;
-    }
-    
-    public void destoryMessageClass(int channel,AMQPMessage amqpMessage)throws AMQPException
-    {
-	_eventManager.removeMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
-        _eventManager.removeMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
-        _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
-        _eventManager.removeMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
-        _eventManager.removeMethodEventListener(channel, MessageGetBody.class,amqpMessage);
-        _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
-        _eventManager.removeMethodEventListener(channel, MessageOkBody.class,amqpMessage);
-        _eventManager.removeMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
-        _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
-        _eventManager.removeMethodEventListener(channel, MessageRejectBody.class,amqpMessage);        
-        _eventManager.removeMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
-        _eventManager.removeMethodEventListener(channel, MessageQosBody.class,amqpMessage);
-        _eventManager.removeMethodEventListener(channel, MessageTransferBody.class,amqpMessage);        
-    }
-    
-    //This class should register as a state listener for AMQPConnection
-    private void checkIfConnectionStarted() throws AMQPException
-    {
-	if (_phase == null)
-	{
-	    _phase = _amqpConnection.getPhasePipe();
-	    
-	    if (_phase == null)
-	    {
-		throw new AMQPException("Cannot create a channel until connection is ready");
-	    }
-	}
-    }
-    
-    /**
-     * Extention point
-     * Other interested parties can obtain a reference to the event manager
-     * and add listeners to get notified of events
-     * 
-     */
-    public AMQPEventManager getEventManager()
-    {
-	return _eventManager;
-    }
-    
-    /**
-     * Extention point
-     * Other interested parties can obtain a reference to the state manager
-     * and add listeners to get notified of state changes
-     * 
-     */
-    public AMQPStateManager getStateManager()
-    {
-	return null;
-    }
+	//Need an event manager per connection
+	private AMQPEventManager _eventManager = new QpidEventManager();
+
+	// Need a state manager per connection
+	private AMQPStateManager _stateManager = new QpidStateManager();
+
+	//Need a phase pipe per connection
+	private Phase _phase;
+
+	//One instance per connection
+	private AMQPConnection _amqpConnection;
+
+	public AMQPClassFactory()
+	{
+
+	}
+
+	public AMQPConnection createConnection(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException
+	{
+		AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
+		return createConnectionClass(url, type);
+	}
+
+	public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException
+	{
+		if (_amqpConnection == null)
+		{
+			PhaseContext ctx = new DefaultPhaseContext();
+			ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+
+			TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx);
+			_amqpConnection = new AMQPConnection(conn, _stateManager);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection);
+		}
+		return _amqpConnection;
+	}
+
+	public AMQPChannel createChannelClass(int channel) throws AMQPException
+	{
+		checkIfConnectionStarted();
+		AMQPChannel amqpChannel = new AMQPChannel(channel, _phase,_stateManager);
+		_eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+		_eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+		_eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+		_eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+		_eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+		_eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+		return amqpChannel;
+	}
+
+	public void destroyChannelClass(int channel, AMQPChannel amqpChannel) throws AMQPException
+	{
+		_eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+		_eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+		_eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+		_eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+		_eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+		_eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+	}
+
+	public AMQPExchange createExchangeClass(int channel) throws AMQPException
+	{
+		checkIfConnectionStarted();
+		AMQPExchange amqpExchange = new AMQPExchange(channel, _phase);
+		_eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+		_eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+		return amqpExchange;
+	}
+
+	public void destoryExchangeClass(int channel, AMQPExchange amqpExchange) throws AMQPException
+	{
+		_eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+		_eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+	}
+
+	public AMQPQueue createQueueClass(int channel) throws AMQPException
+	{
+		checkIfConnectionStarted();
+		AMQPQueue amqpQueue = new AMQPQueue(channel, _phase);
+		_eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+		_eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+		_eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+		_eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+		_eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+		return amqpQueue;
+	}
+
+	public void destroyQueueClass(int channel, AMQPQueue amqpQueue) throws AMQPException
+	{
+		_eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+		_eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+		_eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+		_eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+		_eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+	}
+
+	public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException
+	{
+		checkIfConnectionStarted();
+		AMQPMessage amqpMessage = new AMQPMessage(channel, _phase, messageCb);
+		_eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+
+		return amqpMessage;
+	}
+
+	public void destoryMessageClass(int channel, AMQPMessage amqpMessage) throws AMQPException
+	{
+		_eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+	}
+
+	//This class should register as a state listener for AMQPConnection
+	private void checkIfConnectionStarted() throws AMQPException
+	{
+		if (_phase == null)
+		{
+			_phase = _amqpConnection.getPhasePipe();
+
+			if (_phase == null)
+			{
+				throw new AMQPException("Cannot create a channel until connection is ready");
+			}
+		}
+	}
+
+	/**
+	 * Extention point
+	 * Other interested parties can obtain a reference to the event manager
+	 * and add listeners to get notified of events
+	 * 
+	 */
+	public AMQPEventManager getEventManager()
+	{
+		return _eventManager;
+	}
+
+	/**
+	 * Extention point
+	 * Other interested parties can obtain a reference to the state manager
+	 * and add listeners to get notified of state changes
+	 * 
+	 */
+	public AMQPStateManager getStateManager()
+	{
+		return _stateManager;
+	}
 }

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java Fri Mar 30 08:53:18 2007
@@ -40,7 +40,10 @@
 import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
 import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
 import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
 import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
 import org.apache.qpid.nclient.config.ClientConfiguration;
 import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.Phase;
@@ -55,366 +58,383 @@
  */
 public class AMQPConnection extends AMQPStateMachine implements AMQPMethodListener
 {
-    private static final Logger _logger = Logger.getLogger(AMQPConnection.class);
+	private static final Logger _logger = Logger.getLogger(AMQPConnection.class);
 
-    private Phase _phase;
+	private Phase _phase;
 
-    private TransportConnection _connection;
+	private TransportConnection _connection;
 
-    private long _correlationId;
+	private long _correlationId;
 
-    private AMQPState _currentState;
-
-    private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE,
-	    AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, AMQPState.CONNECTION_OPEN, };
-
-    // The wait period until a server sends a respond
-    private long _serverTimeOut = 1000;
-
-    private final Lock _lock = new ReentrantLock();
-
-    private final Condition _connectionNotStarted = _lock.newCondition();
-
-    private final Condition _connectionNotSecure = _lock.newCondition();
-
-    private final Condition _connectionNotTuned = _lock.newCondition();
-
-    private final Condition _connectionNotOpened = _lock.newCondition();
-
-    private final Condition _connectionNotClosed = _lock.newCondition();
-
-    private ConnectionStartBody _connectionStartBody;
-
-    private ConnectionSecureBody _connectionSecureBody;
-
-    private ConnectionTuneBody _connectionTuneBody;
-
-    private ConnectionOpenOkBody _connectionOpenOkBody;
-
-    private ConnectionCloseOkBody _connectionCloseOkBody;
-    
-    private ConnectionCloseBody _connectionCloseBody;
-
-    protected AMQPConnection(TransportConnection connection)
-    {
-	_connection = connection;
-	_currentState = AMQPState.CONNECTION_UNDEFINED;
-	_serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
-    }
-
-    /**
-     * ------------------------------------------- 
-     * API Methods 
-     * --------------------------------------------
-     */
-
-    /**
-     * Opens the TCP connection and let the formalities begin.
-     */
-    public ConnectionStartBody openTCPConnection() throws AMQPException
-    {
-	_lock.lock();
-	// open the TCP connection
-	try
-	{
-	    _connectionStartBody = null;
-	    checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
-	    _phase = _connection.connect();
-
-	    // waiting for ConnectionStartBody or error in connection
-	    //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    _connectionNotStarted.await();
-	    
-	    checkIfConnectionClosed();
-	    AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
-	    _currentState = AMQPState.CONNECTION_NOT_STARTED;
-	    return _connectionStartBody;
-	}
-	catch (InterruptedException e)
-	{
-	    throw new AMQPException("Error opening connection to broker", e);
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    /**
-     * The current java broker implementation can send a connection tune body
-     * as a response to the startOk. Not sure if that is the correct behaviour.
-     */
-    public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
-    {
-	_lock.lock();
-	try
-	{
-	    _connectionSecureBody = null;
-	    checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
-	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
-	    _phase.messageSent(msg);
-	    //_connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    _connectionNotSecure.await();
-	    //AMQPValidator.throwExceptionOnNull(_connectionSecureBody, "The broker didn't send the ConnectionSecureBody in time");
-	    //_currentState = AMQPState.CONNECTION_NOT_SECURE;
-	    
-	    checkIfConnectionClosed();
-	    if (_connectionTuneBody != null)
-	    {
-		_currentState = AMQPState.CONNECTION_NOT_TUNED;
-		return _connectionTuneBody;
-	    }
-	    else if (_connectionSecureBody != null)
-	    { // oops the server sent another challenge
-		_currentState = AMQPState.CONNECTION_NOT_SECURE;
-		return _connectionSecureBody;
-	    }
-	    else
-	    {
-		throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
-	    }	    
-	}
-	catch (InterruptedException e)
-	{
-	    throw new AMQPException("Error in connection.startOk", e);
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    /**
-     * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
-     * issue a new challenge
-     */
-    public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
-    {
-	_lock.lock();
-	try
-	{
-	    _connectionTuneBody = null;
-	    _connectionSecureBody = null;
-	    checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
-
-	    _connectionSecureBody = null; // The server could send a fresh challenge
-	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
-	    _phase.messageSent(msg);
-	    
-	    //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    _connectionNotTuned.await();
-	    checkIfConnectionClosed();
-
-	    if (_connectionTuneBody != null)
-	    {
-		_currentState = AMQPState.CONNECTION_NOT_TUNED;
-		return _connectionTuneBody;
-	    }
-	    else if (_connectionSecureBody != null)
-	    { // oops the server sent another challenge
-		_currentState = AMQPState.CONNECTION_NOT_SECURE;
-		return _connectionSecureBody;
-	    }
-	    else
-	    {
-		throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
-	    }
-	}
-	catch (InterruptedException e)
-	{
-	    throw new AMQPException("Error in connection.secureOk", e);
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
-    {
-	_lock.lock();
-	try
-	{
-	    checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
-	    _connectionSecureBody = null;
-	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
-	    _phase.messageSent(msg);
-	    _currentState = AMQPState.CONNECTION_NOT_OPENED;
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
-    {
-	_lock.lock();
-	try
-	{
-	    // If the broker sends a connection close due to an error with the
-	    // Connection tune ok, then this call will verify that
-	    checkIfConnectionClosed();
-	    
-	    _connectionOpenOkBody = null;
-	    checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
-	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
-	    _phase.messageSent(msg);
-	    
-	    //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    _connectionNotOpened.await();
-	    
-	    checkIfConnectionClosed();
-	    AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
-	    _currentState = AMQPState.CONNECTION_OPEN;
-	    return _connectionOpenOkBody;
-	}
-	catch (InterruptedException e)
-	{
-	    throw new AMQPException("Error in connection.open", e);
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
-    {
-	_lock.lock();
-	try
-	{
-	    _connectionCloseOkBody = null;
-	    checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
-	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
-	    _phase.messageSent(msg);
-	    _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
-	    _currentState = AMQPState.CONNECTION_CLOSED;
-	    return _connectionCloseOkBody;
-	}
-	catch (InterruptedException e)
-	{
-	    throw new AMQPException("Error in connection.close", e);
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    /**
-     * ------------------------------------------- AMQMethodListener methods
-     * --------------------------------------------
-     */
-    public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
-    {
-	_lock.lock();
-	try
-	{
-	    _correlationId = evt.getCorrelationId();
-
-	    if (evt.getMethod() instanceof ConnectionStartBody)
-	    {
-		_connectionStartBody = (ConnectionStartBody) evt.getMethod();
-		_connectionNotStarted.signalAll();
-		return true;
-	    }
-	    else if (evt.getMethod() instanceof ConnectionSecureBody)
-	    {
-		_connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
-		_connectionNotSecure.signal();
-		_connectionNotTuned.signal(); // in case the server has sent another chanllenge
-		return true;
-	    }
-	    else if (evt.getMethod() instanceof ConnectionTuneBody)
-	    {
-		_connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
-		_connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk 
-		_connectionNotTuned.signal();
-		return true;
-	    }
-	    else if (evt.getMethod() instanceof ConnectionOpenOkBody)
-	    {
-		_connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
-		_connectionNotOpened.signal();
-		return true;
-	    }
-	    else if (evt.getMethod() instanceof ConnectionCloseOkBody)
-	    {
-		_connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
-		_connectionNotClosed.signal();
-		return true;
-	    }
-	    else if (evt.getMethod() instanceof ConnectionCloseBody)
-	    {
-		_connectionCloseBody = (ConnectionCloseBody)evt.getMethod();
-		// release the correct lock as u may have some conditions waiting.
-		// while an error occured and the broker has sent a close.
-		releaseLocks();
-		handleClose();
-		return true;
-	    }
-	    else
-	    {
-		return false;
-	    }
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
-    }
-
-    private void handleClose() throws AMQPException
-    {
-	try
-	{	    
-	    _currentState = AMQPState.CONNECTION_CLOSING;
-	    // do the required cleanup and send a ConnectionCloseOkBody
-	}
-	catch (Exception e)
-	{
-	    throw new AMQPException("Error handling connection.close from broker", e);
-	}
-    }
-    
-    private void checkIfConnectionClosed()throws AMQPException
-    {
-	if (_connectionCloseBody != null)
-	{
-	    String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() +
-	                    " with reply code (" + _connectionCloseBody.getReplyCode() + ") " +
-	                    "caused by class " + _connectionCloseBody.getClassId() +
-	                    " and method " + _connectionCloseBody.getMethod();
-	    
-	    throw new AMQPException(error);	    
-	}
-    }    
-    
-    private void releaseLocks()
-    {
-	if(_currentState == AMQPState.CONNECTION_NOT_OPENED)
-	{
-	    _connectionNotOpened.signal();
-	}
-	else if(_currentState == AMQPState.CONNECTION_UNDEFINED)
-	{
-	    _connectionNotStarted.signal();
-	}
-	else if(_currentState == AMQPState.CONNECTION_NOT_STARTED)
-	{
-	    _connectionNotSecure.signal();
-	}
-	else if(_currentState == AMQPState.CONNECTION_NOT_SECURE)
-	{
-	    _connectionNotTuned.signal();
-	}
-    }
-
-    public Phase getPhasePipe()
-    {
-	return _phase;
-    }
+	private AMQPState _currentState;
+
+	private AMQPStateManager _stateManager;
+
+	private final AMQPState[] _validCloseStates = new AMQPState[]
+	{ AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
+			AMQPState.CONNECTION_OPEN, };
+
+	// The wait period until a server sends a respond
+	private long _serverTimeOut = 1000;
+
+	private final Lock _lock = new ReentrantLock();
+
+	private final Condition _connectionNotStarted = _lock.newCondition();
+
+	private final Condition _connectionNotSecure = _lock.newCondition();
+
+	private final Condition _connectionNotTuned = _lock.newCondition();
+
+	private final Condition _connectionNotOpened = _lock.newCondition();
+
+	private final Condition _connectionNotClosed = _lock.newCondition();
+
+	private ConnectionStartBody _connectionStartBody;
+
+	private ConnectionSecureBody _connectionSecureBody;
+
+	private ConnectionTuneBody _connectionTuneBody;
+
+	private ConnectionOpenOkBody _connectionOpenOkBody;
+
+	private ConnectionCloseOkBody _connectionCloseOkBody;
+
+	private ConnectionCloseBody _connectionCloseBody;
+
+	protected AMQPConnection(TransportConnection connection, AMQPStateManager stateManager)
+	{
+		_connection = connection;
+		_stateManager = stateManager;
+		_currentState = AMQPState.CONNECTION_UNDEFINED;
+		_serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+	}
+
+	/**
+	 * ------------------------------------------- 
+	 * API Methods 
+	 * --------------------------------------------
+	 */
+
+	/**
+	 * Opens the TCP connection and let the formalities begin.
+	 */
+	public ConnectionStartBody openTCPConnection() throws AMQPException
+	{
+		_lock.lock();
+		// open the TCP connection
+		try
+		{
+			_connectionStartBody = null;
+			checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
+			_phase = _connection.connect();
+
+			// waiting for ConnectionStartBody or error in connection
+			//_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_connectionNotStarted.await();
+
+			checkIfConnectionClosed();
+			AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
+			notifyState(AMQPState.CONNECTION_NOT_STARTED);
+			_currentState = AMQPState.CONNECTION_NOT_STARTED;
+			return _connectionStartBody;
+		}
+		catch (InterruptedException e)
+		{
+			throw new AMQPException("Error opening connection to broker", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/**
+	 * The current java broker implementation can send a connection tune body
+	 * as a response to the startOk. Not sure if that is the correct behaviour.
+	 */
+	public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_connectionSecureBody = null;
+			checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
+			AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
+			_phase.messageSent(msg);
+			// _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS);
+			_connectionNotSecure.await();
+
+			checkIfConnectionClosed();
+			if (_connectionTuneBody != null)
+			{
+				notifyState(AMQPState.CONNECTION_NOT_TUNED);
+				_currentState = AMQPState.CONNECTION_NOT_TUNED;
+				return _connectionTuneBody;
+			}
+			else if (_connectionSecureBody != null)
+			{ // oops the server sent another challenge
+				notifyState(AMQPState.CONNECTION_NOT_SECURE);
+				_currentState = AMQPState.CONNECTION_NOT_SECURE;
+				return _connectionSecureBody;
+			}
+			else
+			{
+				throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+			}
+		}
+		catch (InterruptedException e)
+		{
+			throw new AMQPException("Error in connection.startOk", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/**
+	 * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
+	 * issue a new challenge
+	 */
+	public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_connectionTuneBody = null;
+			_connectionSecureBody = null;
+			checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
+
+			_connectionSecureBody = null; // The server could send a fresh challenge
+			AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
+			_phase.messageSent(msg);
+
+			//_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_connectionNotTuned.await();
+			checkIfConnectionClosed();
+
+			if (_connectionTuneBody != null)
+			{
+				notifyState(AMQPState.CONNECTION_NOT_TUNED);
+				_currentState = AMQPState.CONNECTION_NOT_TUNED;
+				return _connectionTuneBody;
+			}
+			else if (_connectionSecureBody != null)
+			{ // oops the server sent another challenge
+				notifyState(AMQPState.CONNECTION_NOT_SECURE);
+				_currentState = AMQPState.CONNECTION_NOT_SECURE;
+				return _connectionSecureBody;
+			}
+			else
+			{
+				throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+			}
+		}
+		catch (InterruptedException e)
+		{
+			throw new AMQPException("Error in connection.secureOk", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
+			_connectionSecureBody = null;
+			AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
+			_phase.messageSent(msg);
+			notifyState(AMQPState.CONNECTION_NOT_OPENED);
+			_currentState = AMQPState.CONNECTION_NOT_OPENED;
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			// If the broker sends a connection close due to an error with the
+			// Connection tune ok, then this call will verify that
+			checkIfConnectionClosed();
+
+			_connectionOpenOkBody = null;
+			checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
+			AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+			_phase.messageSent(msg);
+
+			//_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_connectionNotOpened.await();
+
+			checkIfConnectionClosed();
+			AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
+			notifyState(AMQPState.CONNECTION_OPEN);
+			_currentState = AMQPState.CONNECTION_OPEN;
+			return _connectionOpenOkBody;
+		}
+		catch (InterruptedException e)
+		{
+			throw new AMQPException("Error in connection.open", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_connectionCloseOkBody = null;
+			checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
+			AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+			_phase.messageSent(msg);
+			_connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
+			notifyState(AMQPState.CONNECTION_CLOSED);
+			_currentState = AMQPState.CONNECTION_CLOSED;
+			return _connectionCloseOkBody;
+		}
+		catch (InterruptedException e)
+		{
+			throw new AMQPException("Error in connection.close", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/**
+	 * ------------------------------------------- 
+	 * AMQMethodListener methods
+	 * --------------------------------------------
+	 */
+	public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_correlationId = evt.getCorrelationId();
+
+			if (evt.getMethod() instanceof ConnectionStartBody)
+			{
+				_connectionStartBody = (ConnectionStartBody) evt.getMethod();
+				_connectionNotStarted.signalAll();
+				return true;
+			}
+			else if (evt.getMethod() instanceof ConnectionSecureBody)
+			{
+				_connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
+				_connectionNotSecure.signal();
+				_connectionNotTuned.signal(); // in case the server has sent another chanllenge
+				return true;
+			}
+			else if (evt.getMethod() instanceof ConnectionTuneBody)
+			{
+				_connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
+				_connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk 
+				_connectionNotTuned.signal();
+				return true;
+			}
+			else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+			{
+				_connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
+				_connectionNotOpened.signal();
+				return true;
+			}
+			else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+			{
+				_connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
+				_connectionNotClosed.signal();
+				return true;
+			}
+			else if (evt.getMethod() instanceof ConnectionCloseBody)
+			{
+				_connectionCloseBody = (ConnectionCloseBody) evt.getMethod();
+				// release the correct lock as u may have some conditions waiting.
+				// while an error occured and the broker has sent a close.
+				releaseLocks();
+				handleClose();
+				return true;
+			}
+			else
+			{
+				return false;
+			}
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+
+	public Phase getPhasePipe()
+	{
+		return _phase;
+	}
+	
+	private void handleClose() throws AMQPException
+	{
+		try
+		{
+			notifyState(AMQPState.CONNECTION_CLOSING);
+			_currentState = AMQPState.CONNECTION_CLOSING;
+			// do the required cleanup and send a ConnectionCloseOkBody
+		}
+		catch (Exception e)
+		{
+			throw new AMQPException("Error handling connection.close from broker", e);
+		}
+	}
+
+	private void checkIfConnectionClosed() throws AMQPException
+	{
+		if (_connectionCloseBody != null)
+		{
+			String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code ("
+					+ _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method "
+					+ _connectionCloseBody.getMethod();
+
+			throw new AMQPException(error);
+		}
+	}
+
+	private void releaseLocks()
+	{
+		if (_currentState == AMQPState.CONNECTION_NOT_OPENED)
+		{
+			_connectionNotOpened.signal();
+		}
+		else if (_currentState == AMQPState.CONNECTION_UNDEFINED)
+		{
+			_connectionNotStarted.signal();
+		}
+		else if (_currentState == AMQPState.CONNECTION_NOT_STARTED)
+		{
+			_connectionNotSecure.signal();
+		}
+		else if (_currentState == AMQPState.CONNECTION_NOT_SECURE)
+		{
+			_connectionNotTuned.signal();
+		}
+	}
+
+	private void notifyState(AMQPState newState) throws AMQPException
+	{
+		_stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE));
+	}
 
 }

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java Fri Mar 30 08:53:18 2007
@@ -20,23 +20,65 @@
  */
 package org.apache.qpid.nclient.amqp;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
 import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
 import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.core.AMQPException;
 
 public class QpidStateManager implements AMQPStateManager
 {
 
-    public void addListener(AMQPStateListener l) throws AMQException
-    {
-	// TODO Auto-generated method stub
+	private static final Logger _logger = Logger.getLogger(QpidStateManager.class);
 
-    }
+    private Map<AMQPStateType, List<AMQPStateListener>> _listernerMap = new ConcurrentHashMap<AMQPStateType, List<AMQPStateListener>>();
 
-    public void removeListener(AMQPStateListener l) throws AMQException
+    public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
     {
-	// TODO Auto-generated method stub
+    	List<AMQPStateListener> list;
+    	if(_listernerMap.containsKey(stateType))
+    	{
+    		list = _listernerMap.get(stateType);
+    	}
+    	else
+    	{
+    		list = new ArrayList<AMQPStateListener>();
+    		_listernerMap.put(stateType, list);
+    	}
+    	list.add(l);
+    }
 
+    public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
+    {    	
+    	if(_listernerMap.containsKey(stateType))
+    	{
+    		List<AMQPStateListener> list = _listernerMap.get(stateType);
+    		list.remove(l);
+    	}
+    }
+    
+    public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException
+    {
+     	
+    	if(_listernerMap.containsKey(event.getStateType()))
+    	{
+    		List<AMQPStateListener> list = _listernerMap.get(event.getStateType());
+    		for(AMQPStateListener l: list)
+    		{
+    			l.stateChanged(event);
+    		}
+    	}
+    	else
+    	{
+    		_logger.warn("There are no registered listerners for state type" + event.getStateType());
+    	}
     }
 
 }

Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java?view=auto&rev=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/StateHelper.java Fri Mar 30 08:53:18 2007
@@ -0,0 +1,19 @@
+package org.apache.qpid.nclient.amqp.sample;
+
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class StateHelper implements AMQPStateListener
+{
+
+	public void stateChanged(AMQPStateChangedEvent event) throws AMQPException
+	{
+		String s = event.getStateType() + " changed state from " +
+		           event.getOldState() + " to " + event.getNewState();
+		
+		System.out.println(s);
+
+	}
+
+}