You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/30 18:54:26 UTC

svn commit: r524171 [2/2] - in /incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp: ./ qpid/ sample/

Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java?view=auto&rev=524171
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPClassFactory.java Fri Mar 30 09:54:25 2007
@@ -0,0 +1,286 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOkBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.AMQPChannel;
+import org.apache.qpid.nclient.amqp.AMQPClassFactory;
+import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.amqp.AMQPExchange;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.transport.AMQPConnectionURL;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+import org.apache.qpid.nclient.transport.TransportConnection;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * The Class Factory creates AMQP Class
+ * equivalents defined in the spec.
+ * 
+ * There should one instance per connection.
+ * The factory class creates all the support
+ * classes and provides an instance of the 
+ * AMQP class in ready-to-use state.
+ *
+ */
+public class QpidAMQPClassFactory implements AMQPClassFactory
+{
+	//Need an event manager per connection
+	private AMQPEventManager _eventManager = new QpidEventManager();
+
+	// Need a state manager per connection
+	private AMQPStateManager _stateManager = new QpidStateManager();
+
+	//Need a phase pipe per connection
+	private Phase _phase;
+
+	//One instance per connection
+	private QpidAMQPConnection _amqpConnection;
+
+	public QpidAMQPClassFactory()
+	{
+
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnection(java.lang.String, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType)
+	 */
+	public AMQPConnection createConnectionClass(String urlStr, ConnectionType type) throws AMQPException, URLSyntaxException
+	{
+		AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
+		return createConnectionClass(url, type);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createConnectionClass(org.apache.qpid.nclient.transport.ConnectionURL, org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType)
+	 */
+	public AMQPConnection createConnectionClass(ConnectionURL url, ConnectionType type) throws AMQPException
+	{
+		if (_amqpConnection == null)
+		{
+			PhaseContext ctx = new DefaultPhaseContext();
+			ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+
+			TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type, ctx);
+			_amqpConnection = new QpidAMQPConnection(conn, _stateManager);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class, _amqpConnection);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class, _amqpConnection);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class, _amqpConnection);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class, _amqpConnection);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class, _amqpConnection);
+			_eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class, _amqpConnection);
+		}
+		return _amqpConnection;
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createChannelClass(int)
+	 */
+	public AMQPChannel createChannelClass(int channel) throws AMQPException
+	{
+		checkIfConnectionStarted();
+		QpidAMQPChannel amqpChannel = new QpidAMQPChannel(channel, _phase,_stateManager);
+		_eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+		_eventManager.addMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+		_eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+		_eventManager.addMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+		_eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+		_eventManager.addMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+		return amqpChannel;
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyChannelClass(int, org.apache.qpid.nclient.amqp.AMQPChannel)
+	 */
+	public void destroyChannelClass(int channel, QpidAMQPChannel amqpChannel) throws AMQPException
+	{
+		_eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class, amqpChannel);
+		_eventManager.removeMethodEventListener(channel, ChannelCloseBody.class, amqpChannel);
+		_eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class, amqpChannel);
+		_eventManager.removeMethodEventListener(channel, ChannelFlowBody.class, amqpChannel);
+		_eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class, amqpChannel);
+		_eventManager.removeMethodEventListener(channel, ChannelOkBody.class, amqpChannel);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createExchangeClass(int)
+	 */
+	public AMQPExchange createExchangeClass(int channel) throws AMQPException
+	{
+		checkIfConnectionStarted();
+		QpidAMQPExchange amqpExchange = new QpidAMQPExchange(channel, _phase);
+		_eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+		_eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+		return amqpExchange;
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryExchangeClass(int, org.apache.qpid.nclient.amqp.AMQPExchange)
+	 */
+	public void destoryExchangeClass(int channel, QpidAMQPExchange amqpExchange) throws AMQPException
+	{
+		_eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class, amqpExchange);
+		_eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class, amqpExchange);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createQueueClass(int)
+	 */
+	public AMQPQueue createQueueClass(int channel) throws AMQPException
+	{
+		checkIfConnectionStarted();
+		QpidAMQPQueue amqpQueue = new QpidAMQPQueue(channel, _phase);
+		_eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+		_eventManager.addMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+		_eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+		_eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+		_eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+		return amqpQueue;
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destroyQueueClass(int, org.apache.qpid.nclient.amqp.AMQPQueue)
+	 */
+	public void destroyQueueClass(int channel, QpidAMQPQueue amqpQueue) throws AMQPException
+	{
+		_eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class, amqpQueue);
+		_eventManager.removeMethodEventListener(channel, QueueBindOkBody.class, amqpQueue);
+		_eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class, amqpQueue);
+		_eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class, amqpQueue);
+		_eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class, amqpQueue);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#createMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessageCallBack)
+	 */
+	public AMQPMessage createMessageClass(int channel, AMQPMessageCallBack messageCb) throws AMQPException
+	{
+		checkIfConnectionStarted();
+		QpidAMQPMessage amqpMessage = new QpidAMQPMessage(channel, _phase, messageCb);
+		_eventManager.addMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+		_eventManager.addMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+
+		return amqpMessage;
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#destoryMessageClass(int, org.apache.qpid.nclient.amqp.AMQPMessage)
+	 */
+	public void destoryMessageClass(int channel, QpidAMQPMessage amqpMessage) throws AMQPException
+	{
+		_eventManager.removeMethodEventListener(channel, MessageAppendBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageCancelBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageCloseBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageGetBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageOffsetBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageOkBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageOpenBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageRecoverBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageRejectBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageResumeBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageQosBody.class, amqpMessage);
+		_eventManager.removeMethodEventListener(channel, MessageTransferBody.class, amqpMessage);
+	}
+
+	//This class should register as a state listener for AMQPConnection
+	private void checkIfConnectionStarted() throws AMQPException
+	{
+		if (_phase == null)
+		{
+			_phase = _amqpConnection.getPhasePipe();
+
+			if (_phase == null)
+			{
+				throw new AMQPException("Cannot create a channel until connection is ready");
+			}
+		}
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getEventManager()
+	 */
+	public AMQPEventManager getEventManager()
+	{
+		return _eventManager;
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPClassFactory#getStateManager()
+	 */
+	public AMQPStateManager getStateManager()
+	{
+		return _stateManager;
+	}
+}

Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java?view=auto&rev=524171
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPConnection.java Fri Mar 30 09:54:25 2007
@@ -0,0 +1,448 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionStartOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.amqp.state.AMQPState;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.transport.TransportConnection;
+import org.apache.qpid.nclient.util.AMQPValidator;
+
+/**
+ * This maps directly to the Connection class defined in the AMQP protocol This class is a finite state machine and is
+ * thread safe by design A particular method (state changing) can only be invoked once and only in sequence or else an
+ * IllegalStateTransitionException will be thrown Also only one thread can enter those methods at a given time.
+ */
+public class QpidAMQPConnection extends AMQPStateMachine implements AMQPMethodListener, AMQPConnection
+{
+	private static final Logger _logger = Logger.getLogger(QpidAMQPConnection.class);
+
+	private Phase _phase;
+
+	private TransportConnection _connection;
+
+	private long _correlationId;
+
+	private AMQPState _currentState;
+
+	private AMQPStateManager _stateManager;
+
+	private final AMQPState[] _validCloseStates = new AMQPState[]
+	{ AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
+			AMQPState.CONNECTION_OPEN, };
+
+	// The wait period until a server sends a respond
+	private long _serverTimeOut = 1000;
+
+	private final Lock _lock = new ReentrantLock();
+
+	private final Condition _connectionNotStarted = _lock.newCondition();
+
+	private final Condition _connectionNotSecure = _lock.newCondition();
+
+	private final Condition _connectionNotTuned = _lock.newCondition();
+
+	private final Condition _connectionNotOpened = _lock.newCondition();
+
+	private final Condition _connectionNotClosed = _lock.newCondition();
+
+	private ConnectionStartBody _connectionStartBody;
+
+	private ConnectionSecureBody _connectionSecureBody;
+
+	private ConnectionTuneBody _connectionTuneBody;
+
+	private ConnectionOpenOkBody _connectionOpenOkBody;
+
+	private ConnectionCloseOkBody _connectionCloseOkBody;
+
+	private ConnectionCloseBody _connectionCloseBody;
+
+	protected QpidAMQPConnection(TransportConnection connection, AMQPStateManager stateManager)
+	{
+		_connection = connection;
+		_stateManager = stateManager;
+		_currentState = AMQPState.CONNECTION_UNDEFINED;
+		_serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+	}
+
+	/**
+	 * ------------------------------------------- 
+	 * API Methods 
+	 * --------------------------------------------
+	 */
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPConnection#openTCPConnection()
+	 */
+	public ConnectionStartBody openTCPConnection() throws AMQPException
+	{
+		_lock.lock();
+		// open the TCP connection
+		try
+		{
+			_connectionStartBody = null;
+			checkIfValidStateTransition(AMQPState.CONNECTION_UNDEFINED, _currentState, AMQPState.CONNECTION_NOT_STARTED);
+			_phase = _connection.connect();
+
+			// waiting for ConnectionStartBody or error in connection
+			//_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_connectionNotStarted.await();
+
+			checkIfConnectionClosed();
+			AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
+			notifyState(AMQPState.CONNECTION_NOT_STARTED);
+			_currentState = AMQPState.CONNECTION_NOT_STARTED;
+			return _connectionStartBody;
+		}
+		catch (InterruptedException e)
+		{
+			throw new AMQPException("Error opening connection to broker", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPConnection#startOk(org.apache.qpid.framing.ConnectionStartOkBody)
+	 */
+	public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_connectionSecureBody = null;
+			checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
+			AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
+			_phase.messageSent(msg);
+			// _connectionNotSecure.await(_serverTimeOut,TimeUnit.MILLISECONDS);
+			_connectionNotSecure.await();
+
+			checkIfConnectionClosed();
+			if (_connectionTuneBody != null)
+			{
+				notifyState(AMQPState.CONNECTION_NOT_TUNED);
+				_currentState = AMQPState.CONNECTION_NOT_TUNED;
+				return _connectionTuneBody;
+			}
+			else if (_connectionSecureBody != null)
+			{ // oops the server sent another challenge
+				notifyState(AMQPState.CONNECTION_NOT_SECURE);
+				_currentState = AMQPState.CONNECTION_NOT_SECURE;
+				return _connectionSecureBody;
+			}
+			else
+			{
+				throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+			}
+		}
+		catch (InterruptedException e)
+		{
+			throw new AMQPException("Error in connection.startOk", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPConnection#secureOk(org.apache.qpid.framing.ConnectionSecureOkBody)
+	 */
+	public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_connectionTuneBody = null;
+			_connectionSecureBody = null;
+			checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
+
+			_connectionSecureBody = null; // The server could send a fresh challenge
+			AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
+			_phase.messageSent(msg);
+
+			//_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_connectionNotTuned.await();
+			checkIfConnectionClosed();
+
+			if (_connectionTuneBody != null)
+			{
+				notifyState(AMQPState.CONNECTION_NOT_TUNED);
+				_currentState = AMQPState.CONNECTION_NOT_TUNED;
+				return _connectionTuneBody;
+			}
+			else if (_connectionSecureBody != null)
+			{ // oops the server sent another challenge
+				notifyState(AMQPState.CONNECTION_NOT_SECURE);
+				_currentState = AMQPState.CONNECTION_NOT_SECURE;
+				return _connectionSecureBody;
+			}
+			else
+			{
+				throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+			}
+		}
+		catch (InterruptedException e)
+		{
+			throw new AMQPException("Error in connection.secureOk", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPConnection#tuneOk(org.apache.qpid.framing.ConnectionTuneOkBody)
+	 */
+	public void tuneOk(ConnectionTuneOkBody connectionTuneOkBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			checkIfValidStateTransition(AMQPState.CONNECTION_NOT_TUNED, _currentState, AMQPState.CONNECTION_NOT_OPENED);
+			_connectionSecureBody = null;
+			AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionTuneOkBody, _correlationId);
+			_phase.messageSent(msg);
+			notifyState(AMQPState.CONNECTION_NOT_OPENED);
+			_currentState = AMQPState.CONNECTION_NOT_OPENED;
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPConnection#open(org.apache.qpid.framing.ConnectionOpenBody)
+	 */
+	public ConnectionOpenOkBody open(ConnectionOpenBody connectionOpenBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			// If the broker sends a connection close due to an error with the
+			// Connection tune ok, then this call will verify that
+			checkIfConnectionClosed();
+
+			_connectionOpenOkBody = null;
+			checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
+			AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+			_phase.messageSent(msg);
+
+			//_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			_connectionNotOpened.await();
+
+			checkIfConnectionClosed();
+			AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
+			notifyState(AMQPState.CONNECTION_OPEN);
+			_currentState = AMQPState.CONNECTION_OPEN;
+			return _connectionOpenOkBody;
+		}
+		catch (InterruptedException e)
+		{
+			throw new AMQPException("Error in connection.open", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPConnection#close(org.apache.qpid.framing.ConnectionCloseBody)
+	 */
+	public ConnectionCloseOkBody close(ConnectionCloseBody connectioncloseBody) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_connectionCloseOkBody = null;
+			checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
+			AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+			_phase.messageSent(msg);
+			_connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+			AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
+			notifyState(AMQPState.CONNECTION_CLOSED);
+			_currentState = AMQPState.CONNECTION_CLOSED;
+			return _connectionCloseOkBody;
+		}
+		catch (InterruptedException e)
+		{
+			throw new AMQPException("Error in connection.close", e);
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+	/**
+	 * ------------------------------------------- 
+	 * AMQMethodListener methods
+	 * --------------------------------------------
+	 */
+	public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+	{
+		_lock.lock();
+		try
+		{
+			_correlationId = evt.getCorrelationId();
+
+			if (evt.getMethod() instanceof ConnectionStartBody)
+			{
+				_connectionStartBody = (ConnectionStartBody) evt.getMethod();
+				_connectionNotStarted.signalAll();
+				return true;
+			}
+			else if (evt.getMethod() instanceof ConnectionSecureBody)
+			{
+				_connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
+				_connectionNotSecure.signal();
+				_connectionNotTuned.signal(); // in case the server has sent another chanllenge
+				return true;
+			}
+			else if (evt.getMethod() instanceof ConnectionTuneBody)
+			{
+				_connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
+				_connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk 
+				_connectionNotTuned.signal();
+				return true;
+			}
+			else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+			{
+				_connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
+				_connectionNotOpened.signal();
+				return true;
+			}
+			else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+			{
+				_connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
+				_connectionNotClosed.signal();
+				return true;
+			}
+			else if (evt.getMethod() instanceof ConnectionCloseBody)
+			{
+				_connectionCloseBody = (ConnectionCloseBody) evt.getMethod();
+				// release the correct lock as u may have some conditions waiting.
+				// while an error occured and the broker has sent a close.
+				releaseLocks();
+				handleClose();
+				return true;
+			}
+			else
+			{
+				return false;
+			}
+		}
+		finally
+		{
+			_lock.unlock();
+		}
+	}
+
+
+	public Phase getPhasePipe()
+	{
+		return _phase;
+	}
+	
+	private void handleClose() throws AMQPException
+	{
+		try
+		{
+			notifyState(AMQPState.CONNECTION_CLOSING);
+			_currentState = AMQPState.CONNECTION_CLOSING;
+			// do the required cleanup and send a ConnectionCloseOkBody
+		}
+		catch (Exception e)
+		{
+			throw new AMQPException("Error handling connection.close from broker", e);
+		}
+	}
+
+	private void checkIfConnectionClosed() throws AMQPException
+	{
+		if (_connectionCloseBody != null)
+		{
+			String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() + " with reply code ("
+					+ _connectionCloseBody.getReplyCode() + ") " + "caused by class " + _connectionCloseBody.getClassId() + " and method "
+					+ _connectionCloseBody.getMethod();
+
+			throw new AMQPException(error);
+		}
+	}
+
+	private void releaseLocks()
+	{
+		if (_currentState == AMQPState.CONNECTION_NOT_OPENED)
+		{
+			_connectionNotOpened.signal();
+		}
+		else if (_currentState == AMQPState.CONNECTION_UNDEFINED)
+		{
+			_connectionNotStarted.signal();
+		}
+		else if (_currentState == AMQPState.CONNECTION_NOT_STARTED)
+		{
+			_connectionNotSecure.signal();
+		}
+		else if (_currentState == AMQPState.CONNECTION_NOT_SECURE)
+		{
+			_connectionNotTuned.signal();
+		}
+	}
+
+	private void notifyState(AMQPState newState) throws AMQPException
+	{
+		_stateManager.notifyStateChanged(new AMQPStateChangedEvent(_currentState, newState,AMQPStateType.CONNECTION_STATE));
+	}
+
+}
\ No newline at end of file

Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java?view=auto&rev=524171
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPExchange.java Fri Mar 30 09:54:25 2007
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.ExchangeDeleteBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPExchange;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+/**
+ * 
+ * This class represents the Exchange class defined in AMQP.
+ * Each method takes an @see AMQPCallBack object if it wants to know
+ * the response from the broker to particular method. 
+ * Clients can handle the reponse asynchronously or block for a response
+ * using AMQPCallBack.isComplete() periodically using a loop.
+ */
+public class QpidAMQPExchange extends AMQPCallBackSupport implements AMQPMethodListener, AMQPExchange
+{
+	private Phase _phase;
+	
+	protected QpidAMQPExchange(int channelId,Phase phase)
+	{
+		super(channelId);
+		_phase = phase;
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPExchange#declare(org.apache.qpid.framing.ExchangeDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void declare(ExchangeDeclareBody exchangeDeclareBody,AMQPCallBack cb) throws AMQPException
+	{		
+		AMQPMethodEvent msg = handleNoWait(exchangeDeclareBody.nowait,exchangeDeclareBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPExchange#delete(org.apache.qpid.framing.ExchangeDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void delete(ExchangeDeleteBody exchangeDeleteBody,AMQPCallBack cb) throws AMQPException
+	{	
+		AMQPMethodEvent msg = handleNoWait(exchangeDeleteBody.nowait,exchangeDeleteBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	
+	/**-------------------------------------------
+     * AMQPMethodListener methods
+     *--------------------------------------------
+     */
+	public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+    {
+    	long localCorrelationId = evt.getLocalCorrelationId();
+    	AMQMethodBody methodBody = evt.getMethod(); 
+    	if ( methodBody instanceof ExchangeDeclareOkBody || methodBody instanceof ExchangeDeleteOkBody)
+    	{
+    		invokeCallBack(localCorrelationId,methodBody);
+    		return true;
+    	}
+    	else
+    	{
+    		return false;
+    	}
+    }
+}

Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java?view=auto&rev=524171
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPMessage.java Fri Mar 30 09:54:25 2007
@@ -0,0 +1,256 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageEmptyBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+/**
+ * This class represents the AMQP Message class.
+ * You need an instance of this class per channel.
+ * A @see AMQPMessageCallBack class is taken as an argument in the constructor.
+ * A client can use this class to issue Message class methods on the broker. 
+ * When the broker issues Message class methods on the client, the client is notified
+ * via the AMQPMessageCallBack interface. 
+ *  
+ * A JMS Message producer implementation can wrap an instance if this and map
+ * JMS method calls to the appropriate AMQP methods.
+ *  
+ * AMQPMessageCallBack can be implemented by the JMS MessageConsumer implementation.
+ *
+ */
+public class QpidAMQPMessage extends AMQPCallBackSupport implements AMQPMethodListener, AMQPMessage
+{
+	private Phase _phase;
+	private AMQPMessageCallBack _messageCb;
+	
+	protected QpidAMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
+	{
+		super(channelId);
+		_phase = phase;
+		_messageCb = messageCb; 
+	}	
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#transfer(org.apache.qpid.framing.MessageTransferBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	
+	public void transfer(MessageTransferBody messageTransferBody,AMQPCallBack cb) throws AMQPException 
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(messageTransferBody,cb);
+		_phase.messageSent(msg);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#consume(org.apache.qpid.framing.MessageConsumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException 
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#cancel(org.apache.qpid.framing.MessageCancelBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void cancel(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException 
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#get(org.apache.qpid.framing.MessageGetBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void get(MessageGetBody messageGetBody,AMQPCallBack cb) throws AMQPException 
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(messageGetBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#recover(org.apache.qpid.framing.MessageRecoverBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void recover(MessageRecoverBody messageRecoverBody,AMQPCallBack cb) throws AMQPException 
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(messageRecoverBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#open(org.apache.qpid.framing.MessageOpenBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void open(MessageOpenBody messageOpenBody,AMQPCallBack cb) throws AMQPException 
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(messageOpenBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#close(org.apache.qpid.framing.MessageCloseBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void close(MessageCloseBody messageCloseBody,AMQPCallBack cb) throws AMQPException 
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(messageCloseBody,cb);
+		_phase.messageSent(msg);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#append(org.apache.qpid.framing.MessageAppendBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void append(MessageAppendBody messageAppendBody,AMQPCallBack cb) throws AMQPException 
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(messageAppendBody,cb);
+		_phase.messageSent(msg);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#checkpoint(org.apache.qpid.framing.MessageCheckpointBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void checkpoint(MessageCheckpointBody messageCheckpointBody,AMQPCallBack cb) throws AMQPException 
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(messageCheckpointBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#resume(org.apache.qpid.framing.MessageResumeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void resume(MessageResumeBody messageResumeBody,AMQPCallBack cb) throws AMQPException 
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(messageResumeBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#qos(org.apache.qpid.framing.MessageQosBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void qos(MessageQosBody messageQosBody,AMQPCallBack cb) throws AMQPException 
+	{
+		AMQPMethodEvent msg = handleAsynchronousCall(messageQosBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#ok(org.apache.qpid.framing.MessageOkBody, long)
+	 */
+	public void ok(MessageOkBody messageOkBody,long correlationId) throws AMQPException 
+	{
+		AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOkBody,correlationId);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#reject(org.apache.qpid.framing.MessageRejectBody, long)
+	 */
+	public void reject(MessageRejectBody messageRejectBody,long correlationId) throws AMQPException 
+	{
+		AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageRejectBody,correlationId);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPMessage#offset(org.apache.qpid.framing.MessageOffsetBody, long)
+	 */
+	public void offset(MessageOffsetBody messageOffsetBody,long correlationId) throws AMQPException 
+	{
+		AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,messageOffsetBody,correlationId);
+		_phase.messageSent(msg);
+	}
+	
+	/**-------------------------------------------
+     * AMQPMethodListener methods
+     *--------------------------------------------
+     */
+	public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+    {
+    	long localCorrelationId = evt.getLocalCorrelationId();
+    	AMQMethodBody methodBody = evt.getMethod(); 
+    	if ( methodBody instanceof MessageOkBody     || 
+    		 methodBody instanceof MessageRejectBody ||
+    		 methodBody instanceof MessageEmptyBody)
+    	{
+    		invokeCallBack(localCorrelationId,methodBody);
+    		return true;
+    	}
+    	else if (methodBody instanceof MessageTransferBody)
+    	{
+    		_messageCb.transfer((MessageTransferBody)methodBody, evt.getCorrelationId());
+    		return true;
+    	}
+    	else if (methodBody instanceof MessageAppendBody)
+    	{
+    		_messageCb.append((MessageAppendBody)methodBody, evt.getCorrelationId());
+    		return true;
+    	}
+    	else if (methodBody instanceof MessageOpenBody)
+    	{
+    		_messageCb.open((MessageOpenBody)methodBody, evt.getCorrelationId());
+    		return true;
+    	}
+    	else if (methodBody instanceof MessageCloseBody)
+    	{
+    		_messageCb.close((MessageCloseBody)methodBody, evt.getCorrelationId());
+    		return true;
+    	}
+    	else if (methodBody instanceof MessageCheckpointBody)
+    	{
+    		_messageCb.checkpoint((MessageCheckpointBody)methodBody, evt.getCorrelationId());
+    		return true;
+    	}
+    	else if (methodBody instanceof MessageRecoverBody)
+    	{
+    		_messageCb.recover((MessageRecoverBody)methodBody, evt.getCorrelationId());
+    		return true;
+    	}
+    	else if (methodBody instanceof MessageResumeBody)
+    	{
+    		_messageCb.resume((MessageResumeBody)methodBody, evt.getCorrelationId());
+    		return true;
+    	}
+    	else
+    	{
+    		return false;
+    	}
+    }
+}

Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java?view=auto&rev=524171
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidAMQPQueue.java Fri Mar 30 09:54:25 2007
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.QueueUnbindBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPCallBackSupport;
+import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.Phase;
+
+/**
+ * 
+ * This class represents the Queue class defined in AMQP.
+ * Each method takes an @see AMQPCallBack object if it wants to know
+ * the response from the broker to a particular method. 
+ * Clients can handle the reponse asynchronously or block for a response
+ * using AMQPCallBack.isComplete() periodically using a loop.
+ */
+public class QpidAMQPQueue extends AMQPCallBackSupport implements AMQPMethodListener, AMQPQueue
+{
+	private Phase _phase;
+
+	protected QpidAMQPQueue(int channelId,Phase phase)
+	{
+		super(channelId);
+		_phase = phase;
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPQueue#declare(org.apache.qpid.framing.QueueDeclareBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void declare(QueueDeclareBody queueDeclareBody,AMQPCallBack cb) throws AMQPException
+	{		
+		AMQPMethodEvent msg = handleNoWait(queueDeclareBody.nowait,queueDeclareBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPQueue#bind(org.apache.qpid.framing.QueueBindBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void bind(QueueBindBody queueBindBody,AMQPCallBack cb) throws AMQPException
+	{		
+		AMQPMethodEvent msg = handleNoWait(queueBindBody.nowait,queueBindBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	// Queue.unbind doesn't have nowait
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPQueue#unbind(org.apache.qpid.framing.QueueUnbindBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void unbind(QueueUnbindBody queueUnbindBody,AMQPCallBack cb) throws AMQPException
+	{		
+		AMQPMethodEvent msg = handleAsynchronousCall(queueUnbindBody,cb);
+		_phase.messageSent(msg);
+	}
+	
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPQueue#purge(org.apache.qpid.framing.QueuePurgeBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void purge(QueuePurgeBody queuePurgeBody,AMQPCallBack cb) throws AMQPException
+	{		
+		AMQPMethodEvent msg = handleNoWait(queuePurgeBody.nowait,queuePurgeBody,cb);
+		_phase.messageSent(msg);
+	}
+
+	/* (non-Javadoc)
+	 * @see org.apache.qpid.nclient.amqp.AMQPQueue#delete(org.apache.qpid.framing.QueueDeleteBody, org.apache.qpid.nclient.amqp.AMQPCallBack)
+	 */
+	public void delete(QueueDeleteBody queueDeleteBody,AMQPCallBack cb) throws AMQPException
+	{		
+		AMQPMethodEvent msg = handleNoWait(queueDeleteBody.nowait,queueDeleteBody,cb);
+		_phase.messageSent(msg);
+	}
+
+	
+	/**-------------------------------------------
+     * AMQPMethodListener methods
+     *--------------------------------------------
+     */
+	public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+    {
+    	long localCorrelationId = evt.getLocalCorrelationId();
+    	AMQMethodBody methodBody = evt.getMethod(); 
+    	if ( methodBody instanceof QueueDeclareOkBody ||
+    		 methodBody instanceof QueueBindOkBody	  ||
+    		 methodBody instanceof QueueUnbindOkBody  ||
+    		 methodBody instanceof QueuePurgeOkBody	  ||
+    		 methodBody instanceof QueueDeleteOkBody	  
+    	    )
+    	{
+    		invokeCallBack(localCorrelationId,methodBody);
+    		return true;
+    	}    	
+    	else
+    	{
+    		return false;
+    	}
+    }	
+}

Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java?view=auto&rev=524171
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidEventManager.java Fri Mar 30 09:54:25 2007
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+
+/**
+ * This class registeres with the ModelPhase as a AMQMethodListener, 
+ * to receive method events and then it distributes methods to other listerners
+ * using a filtering criteria. The criteria is channel id and method body class.
+ * The method listeners are added and removed dynamically
+ * 
+ * <p/>
+ */
+public class QpidEventManager implements AMQPEventManager
+{
+    private static final Logger _logger = Logger.getLogger(QpidEventManager.class);
+
+    private Map<Integer, Map> _channelMap = new ConcurrentHashMap<Integer, Map>();
+
+    /**
+     * ------------------------------------------------
+     * methods introduced by AMQEventManager
+     * ------------------------------------------------
+     */
+    public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
+    {
+	Map<Class, List> _methodListenerMap;
+	if (_channelMap.containsKey(channelId))
+	{
+	    _methodListenerMap = _channelMap.get(channelId);
+
+	}
+	else
+	{
+	    _methodListenerMap = new ConcurrentHashMap<Class, List>();
+	    _channelMap.put(channelId, _methodListenerMap);
+	}
+
+	List<AMQPMethodListener> _listeners;
+	if (_methodListenerMap.containsKey(clazz))
+	{
+	    _listeners = _methodListenerMap.get(clazz);
+	}
+	else
+	{
+	    _listeners = new ArrayList<AMQPMethodListener>();
+	    _methodListenerMap.put(clazz, _listeners);
+	}
+
+	_listeners.add(l);
+
+    }
+
+    public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
+    {
+	if (_channelMap.containsKey(channelId))
+	{
+	    Map<Class, List> _methodListenerMap = _channelMap.get(channelId);
+
+	    if (_methodListenerMap.containsKey(clazz))
+	    {
+		List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz);
+		_listeners.remove(l);
+	    }
+
+	}
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent)
+     */
+    public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException
+    {
+	if (_channelMap.containsKey(evt.getChannelId()))
+	{
+	    Map<Class, List> _methodListenerMap = _channelMap.get(evt.getChannelId());
+
+	    if (_methodListenerMap.containsKey(evt.getMethod().getClass()))
+	    {
+
+		List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass());
+		for (AMQPMethodListener l : _listeners)
+		{
+		    l.methodReceived(evt);
+		}
+
+		return (_listeners.size() > 0);
+	    }
+
+	}
+
+	return false;
+    }
+}

Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java?view=auto&rev=524171
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/qpid/QpidStateManager.java Fri Mar 30 09:54:25 2007
@@ -0,0 +1,84 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.qpid;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.amqp.state.AMQPStateChangedEvent;
+import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class QpidStateManager implements AMQPStateManager
+{
+
+	private static final Logger _logger = Logger.getLogger(QpidStateManager.class);
+
+    private Map<AMQPStateType, List<AMQPStateListener>> _listernerMap = new ConcurrentHashMap<AMQPStateType, List<AMQPStateListener>>();
+
+    public void addListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
+    {
+    	List<AMQPStateListener> list;
+    	if(_listernerMap.containsKey(stateType))
+    	{
+    		list = _listernerMap.get(stateType);
+    	}
+    	else
+    	{
+    		list = new ArrayList<AMQPStateListener>();
+    		_listernerMap.put(stateType, list);
+    	}
+    	list.add(l);
+    }
+
+    public void removeListener(AMQPStateType stateType, AMQPStateListener l) throws AMQException
+    {    	
+    	if(_listernerMap.containsKey(stateType))
+    	{
+    		List<AMQPStateListener> list = _listernerMap.get(stateType);
+    		list.remove(l);
+    	}
+    }
+    
+    public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException
+    {
+     	
+    	if(_listernerMap.containsKey(event.getStateType()))
+    	{
+    		List<AMQPStateListener> list = _listernerMap.get(event.getStateType());
+    		for(AMQPStateListener l: list)
+    		{
+    			l.stateChanged(event);
+    		}
+    	}
+    	else
+    	{
+    		_logger.warn("There are no registered listerners for state type" + event.getStateType());
+    	}
+    }
+
+}

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java?view=diff&rev=524171&r1=524170&r2=524171
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java Fri Mar 30 09:54:25 2007
@@ -65,6 +65,7 @@
 import org.apache.qpid.nclient.amqp.AMQPExchange;
 import org.apache.qpid.nclient.amqp.AMQPMessage;
 import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.qpid.QpidAMQPClassFactory;
 import org.apache.qpid.nclient.amqp.state.AMQPStateType;
 import org.apache.qpid.nclient.transport.AMQPConnectionURL;
 import org.apache.qpid.nclient.transport.ConnectionURL;
@@ -90,7 +91,7 @@
 	private static int _channel = 2;
 
 	// Need a Class factory per connection
-	private AMQPClassFactory _classFactory = new AMQPClassFactory();
+	private AMQPClassFactory _classFactory = new QpidAMQPClassFactory();
 
 	private int _ticket;