You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/30 00:24:22 UTC

svn commit: r523854 [1/3] - in /incubator/qpid/branches/qpid.0-9/java/newclient: ./ src/main/java/ src/main/java/org/apache/qpid/nclient/amqp/ src/main/java/org/apache/qpid/nclient/amqp/event/ src/main/java/org/apache/qpid/nclient/amqp/sample/ src/main...

Author: rajith
Date: Thu Mar 29 15:24:20 2007
New Revision: 523854

URL: http://svn.apache.org/viewvc?view=rev&rev=523854
Log:
First cut of the AMQP java API

Added:
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java
Removed:
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/EventManager.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java
Modified:
    incubator/qpid/branches/qpid.0-9/java/newclient/.project
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/client.log4j
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java
    incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/.project
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/.project?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/.project (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/.project Thu Mar 29 15:24:20 2007
@@ -1,17 +1,14 @@
-<?xml version="1.0" encoding="UTF-8"?>
 <projectDescription>
-	<name>qpid-newclient</name>
-	<comment></comment>
-	<projects>
-	</projects>
-	<buildSpec>
-		<buildCommand>
-			<name>org.eclipse.jdt.core.javabuilder</name>
-			<arguments>
-			</arguments>
-		</buildCommand>
-	</buildSpec>
-	<natures>
-		<nature>org.eclipse.jdt.core.javanature</nature>
-	</natures>
-</projectDescription>
+  <name>qpid-newclient</name>
+  <comment/>
+  <projects/>
+  <buildSpec>
+    <buildCommand>
+      <name>org.eclipse.jdt.core.javabuilder</name>
+      <arguments/>
+    </buildCommand>
+  </buildSpec>
+  <natures>
+    <nature>org.eclipse.jdt.core.javanature</nature>
+  </natures>
+</projectDescription>
\ No newline at end of file

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/client.log4j
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/client.log4j?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/client.log4j (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/client.log4j Thu Mar 29 15:24:20 2007
@@ -16,10 +16,11 @@
 # specific language governing permissions and limitations
 # under the License.
 #
-log4j.rootLogger=${root.logging.level}
+log4j.rootLogger=DEBUG
 
 
-log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+#log4j.logger.org.apache.qpid=${amqj.logging.level}, console
+log4j.logger.org.apache.qpid=DEBUG, console
 log4j.additivity.org.apache.qpid=false
 
 log4j.appender.console=org.apache.log4j.ConsoleAppender

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPCallBackSupport.java Thu Mar 29 15:24:20 2007
@@ -1,11 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.nclient.amqp;
 
 import java.security.SecureRandom;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
 
 public abstract class AMQPCallBackSupport 
 {
@@ -31,16 +52,16 @@
 	{
 		if(noWait)
 		{
-			// u only need to register if u are expecting a response
-			long localCorrelationId = getNextCorrelationId();
-			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
-			_cbMap.put(localCorrelationId, cb);
-			return msg; 
+		    	AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID);
+			return msg;
 		}
 		else
 		{
-			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID);
-			return msg;			
+		    	//  u only need to register if u are expecting a response
+			long localCorrelationId = getNextCorrelationId();
+			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,methodBody,QpidConstants.EMPTY_CORRELATION_ID,localCorrelationId);
+			_cbMap.put(localCorrelationId, cb);
+			return msg;						
 		}
 	}
 	
@@ -52,12 +73,25 @@
 		return msg;
 	}
 	
-	protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)
+	protected void invokeCallBack(long localCorrelationId, AMQMethodBody methodBody)throws AMQPException
 	{
-		if(_cbMap.contains(localCorrelationId))
+		if(_cbMap.containsKey(localCorrelationId))
 		{
 			AMQPCallBack cb = (AMQPCallBack)_cbMap.get(localCorrelationId);
-			cb.brokerResponded(methodBody);
+			if(cb == null)
+			{
+			    throw new AMQPException("Unable to find the callback object responsible for handling " + methodBody);
+			}
+			else
+			{
+			    	cb.setIsComplete(true);
+				cb.brokerResponded(methodBody);
+			}
+			_cbMap.remove(localCorrelationId);
+		}
+		else
+		{
+		    //ignore, as this event is for another class instance
 		}
 	}
 

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPChannel.java Thu Mar 29 15:24:20 2007
@@ -36,249 +36,317 @@
 import org.apache.qpid.framing.ChannelOpenBody;
 import org.apache.qpid.framing.ChannelOpenOkBody;
 import org.apache.qpid.framing.ChannelResumeBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
 import org.apache.qpid.nclient.amqp.state.AMQPState;
 import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
 import org.apache.qpid.nclient.config.ClientConfiguration;
 import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.Phase;
 import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
 import org.apache.qpid.nclient.util.AMQPValidator;
 
 /**
- * This represents the Channel class defined in the AMQP protocol.
- * This class is a finite state machine and is thread safe by design.
- * Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown.
- * Only one thread can enter the methods that change state, at a given time. 
- * The AMQP protocol recommends one thread per channel by design. 
- *   
- * A JMS Session can wrap an instance of this class.   
+ * This represents the Channel class defined in the AMQP protocol. This class is a finite state machine and is thread
+ * safe by design. Only valid state changes are allowed or else an IllegalStateTransitionException will be thrown. Only
+ * one thread can enter the methods that change state, at a given time. The AMQP protocol recommends one thread per
+ * channel by design.
+ * 
+ * A JMS Session can wrap an instance of this class.
  */
 
 public class AMQPChannel extends AMQPStateMachine implements AMQPMethodListener
 {
-private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
-	
-	//the channelId assigned for this channel
-	private int _channelId;
-	private Phase _phase;
-	private AMQPState _currentState;
-	private final AMQPState[] _validCloseStates = new AMQPState[]{AMQPState.CHANNEL_OPENED,AMQPState.CHANNEL_SUSPEND};
-	private final AMQPState[] _validResumeStates = new AMQPState[]{AMQPState.CHANNEL_CLOSED,AMQPState.CHANNEL_NOT_OPENED};
-	
-	// The wait period until a server sends a respond
-	private long _serverTimeOut = 1000;
-	private final Lock _lock = new ReentrantLock();	
-    private final Condition _channelNotOpend  = _lock.newCondition(); 
-    private final Condition _channelNotClosed  = _lock.newCondition();
-    private final Condition _channelFlowNotResponded  = _lock.newCondition();
-    private final Condition _channelNotResumed  = _lock.newCondition();    
-    
-    private ChannelOpenOkBody _channelOpenOkBody; 
-    private ChannelCloseOkBody _channelCloseOkBody;	
+    private static final Logger _logger = Logger.getLogger(AMQPChannel.class);
+
+    // the channelId assigned for this channel
+    private int _channelId;
+
+    private Phase _phase;
+
+    private AMQPState _currentState;
+
+    private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CHANNEL_OPENED, AMQPState.CHANNEL_SUSPEND };
+
+    private final AMQPState[] _validResumeStates = new AMQPState[] { AMQPState.CHANNEL_CLOSED, AMQPState.CHANNEL_NOT_OPENED };
+
+    // The wait period until a server sends a respond
+    private long _serverTimeOut = 1000;
+
+    private final Lock _lock = new ReentrantLock();
+
+    private final Condition _channelNotOpend = _lock.newCondition();
+
+    private final Condition _channelNotClosed = _lock.newCondition();
+
+    private final Condition _channelFlowNotResponded = _lock.newCondition();
+
+    private final Condition _channelNotResumed = _lock.newCondition();
+
+    private ChannelOpenOkBody _channelOpenOkBody;
+
+    private ChannelCloseOkBody _channelCloseOkBody;
+
     private ChannelFlowOkBody _channelFlowOkBody;
+
     private ChannelOkBody _channelOkBody;
     
-	public AMQPChannel(int channelId)
+    private ChannelCloseBody _channelCloseBody;
+
+    protected AMQPChannel(int channelId, Phase phase)
     {
-		_channelId = channelId;
-    	_currentState = AMQPState.CHANNEL_NOT_OPENED;
-    	_serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
+	_channelId = channelId;
+	_phase = phase;
+	_currentState = AMQPState.CHANNEL_NOT_OPENED;
+	_serverTimeOut = ClientConfiguration.get().getLong(QpidConstants.SERVER_TIMEOUT_IN_MILLISECONDS);
     }
-    
-    /**-------------------------------------------
+
+    /**
+     * ------------------------------------------- 
      * API Methods
-     *--------------------------------------------
+     *  --------------------------------------------
      */
-    
-	/**
-	 * Opens the channel
-	 */
-	public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException 
-	{
-		_lock.lock();			
-		try	{
-			_channelOpenOkBody = null;
-			checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED,_currentState,AMQPState.CHANNEL_OPENED);
-			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelOpenBody,QpidConstants.EMPTY_CORRELATION_ID);
-			_phase.messageSent(msg);
-			_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-			AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
-			_currentState = AMQPState.CHANNEL_OPENED;
-			return _channelOpenOkBody;
-		}
-		catch(Exception e)
-		{
-			throw new AMQPException("XXX");
-		}
-		finally
-		{
-			_lock.unlock();
-		}
-	}
-	
-	/**
-	 * Close the channel
-	 */
-	public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException 
-	{
-		_lock.lock();	
-		try	{			
-			_channelCloseOkBody = null;
-			checkIfValidStateTransition(_validCloseStates,_currentState,AMQPState.CHANNEL_CLOSED);
-			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelCloseBody,QpidConstants.EMPTY_CORRELATION_ID);
-			_phase.messageSent(msg);
-			_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-			AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
-			_currentState = AMQPState.CHANNEL_CLOSED;
-			return _channelCloseOkBody;
-		}
-		catch(Exception e)
-		{
-			throw new AMQPException("XXX");
-		}
-		finally
-		{
-			_lock.unlock();
-		}
-	}
-	
-	/**
-	 * Channel Flow
-	 */
-	public ChannelFlowOkBody close(ChannelFlowBody channelFlowBody) throws AMQPException 
-	{
-		_lock.lock();	
-		try	{
-			_channelFlowOkBody = null;
-			if(channelFlowBody.active)
-			{
-				checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND,_currentState,AMQPState.CHANNEL_OPENED);
-			}
-			else
-			{
-				checkIfValidStateTransition(AMQPState.CHANNEL_OPENED,_currentState,AMQPState.CHANNEL_SUSPEND);
-			}
-			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelFlowBody,QpidConstants.EMPTY_CORRELATION_ID);
-			_phase.messageSent(msg);
-			_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-			AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
-			handleChannelFlowState(_channelFlowOkBody.active);
-			return _channelFlowOkBody;
-		}
-		catch(Exception e)
-		{
-			throw new AMQPException("XXX");
-		}
-		finally
-		{
-			_lock.unlock();
-		}
-	}
-	
-	/**
-	 * Close the channel
-	 */
-	public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException 
-	{
-		_lock.lock();	
-		try	{
-			_channelOkBody = null;
-			checkIfValidStateTransition(_validResumeStates,_currentState,AMQPState.CHANNEL_OPENED);
-			AMQPMethodEvent msg = new AMQPMethodEvent(_channelId,channelResumeBody,QpidConstants.EMPTY_CORRELATION_ID);
-			_phase.messageSent(msg);
-			_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-			AMQPValidator.throwExceptionOnNull(_channelOkBody, "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
-			_currentState = AMQPState.CHANNEL_OPENED;
-			return _channelOkBody;
-		}
-		catch(Exception e)
-		{
-			throw new AMQPException("XXX");
-		}
-		finally
-		{
-			_lock.unlock();
-		}
+
+    /**
+     * Opens the channel
+     */
+    public ChannelOpenOkBody open(ChannelOpenBody channelOpenBody) throws AMQPException
+    {
+	_lock.lock();
+	try
+	{
+	    _channelOpenOkBody = null;
+	    checkIfValidStateTransition(AMQPState.CHANNEL_NOT_OPENED, _currentState, AMQPState.CHANNEL_OPENED);
+	    AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
+	    _phase.messageSent(msg);
+	    
+	    //_channelNotOpend.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+	    _channelNotOpend.await();
+	    checkIfConnectionClosed();
+	    AMQPValidator.throwExceptionOnNull(_channelOpenOkBody, "The broker didn't send the ChannelOpenOkBody in time");
+	    _currentState = AMQPState.CHANNEL_OPENED;
+	    return _channelOpenOkBody;
+	}
+	catch (Exception e)
+	{
+	    throw new AMQPException("Error in channel.open", e);
+	}
+	finally
+	{
+	    _lock.unlock();
+	}
+    }
+
+    /**
+     * Close the channel
+     */
+    public ChannelCloseOkBody close(ChannelCloseBody channelCloseBody) throws AMQPException
+    {
+	_lock.lock();
+	try
+	{
+	    _channelCloseOkBody = null;
+	    checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CHANNEL_CLOSED);
+	    AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelCloseBody, QpidConstants.EMPTY_CORRELATION_ID);
+	    _phase.messageSent(msg);
+	    
+	    //_channelNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+	    _channelNotClosed.await();
+	    AMQPValidator.throwExceptionOnNull(_channelCloseOkBody, "The broker didn't send the ChannelCloseOkBody in time");
+	    _currentState = AMQPState.CHANNEL_CLOSED;
+	    return _channelCloseOkBody;
 	}
-	
-	/**-------------------------------------------
+	catch (Exception e)
+	{
+	    throw new AMQPException("Error in channel.close", e);
+	}
+	finally
+	{
+	    _lock.unlock();
+	}
+    }
+
+    /**
+     * Channel Flow
+     */
+    public ChannelFlowOkBody flow(ChannelFlowBody channelFlowBody) throws AMQPException
+    {
+	_lock.lock();
+	try
+	{
+	    _channelFlowOkBody = null;
+	    if (channelFlowBody.active)
+	    {
+		checkIfValidStateTransition(AMQPState.CHANNEL_SUSPEND, _currentState, AMQPState.CHANNEL_OPENED);
+	    }
+	    else
+	    {
+		checkIfValidStateTransition(AMQPState.CHANNEL_OPENED, _currentState, AMQPState.CHANNEL_SUSPEND);
+	    }
+	    AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelFlowBody, QpidConstants.EMPTY_CORRELATION_ID);
+	    _phase.messageSent(msg);
+	    
+	    //_channelFlowNotResponded.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+	    _channelFlowNotResponded.await();
+	    checkIfConnectionClosed();
+	    AMQPValidator.throwExceptionOnNull(_channelFlowOkBody, "The broker didn't send the ChannelFlowOkBody in time");
+	    handleChannelFlowState(_channelFlowOkBody.active);
+	    return _channelFlowOkBody;
+	}
+	catch (Exception e)
+	{
+	    throw new AMQPException("Error in channel.flow", e);
+	}
+	finally
+	{
+	    _lock.unlock();
+	}
+    }
+
+    /**
+     * Close the channel
+     */
+    public ChannelOkBody resume(ChannelResumeBody channelResumeBody) throws AMQPException
+    {
+	_lock.lock();
+	try
+	{
+	    _channelOkBody = null;
+	    checkIfValidStateTransition(_validResumeStates, _currentState, AMQPState.CHANNEL_OPENED);
+	    AMQPMethodEvent msg = new AMQPMethodEvent(_channelId, channelResumeBody, QpidConstants.EMPTY_CORRELATION_ID);
+	    _phase.messageSent(msg);
+	    
+	    //_channelNotResumed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+	    _channelNotResumed.await();
+	    checkIfConnectionClosed();
+	    AMQPValidator.throwExceptionOnNull(_channelOkBody,
+		    "The broker didn't send the ChannelOkBody in response to the ChannelResumeBody in time");
+	    _currentState = AMQPState.CHANNEL_OPENED;
+	    return _channelOkBody;
+	}
+	catch (Exception e)
+	{
+	    throw new AMQPException("Error in channel.resume", e);
+	}
+	finally
+	{
+	    _lock.unlock();
+	}
+    }
+
+    /**
+     * ------------------------------------------- 
      * AMQPMethodListener methods
-     *--------------------------------------------
+     * --------------------------------------------
      */
-	public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
-    {    	
-    	if (evt.getMethod() instanceof ChannelOpenOkBody)
-    	{
-    		_channelOpenOkBody = (ChannelOpenOkBody)evt.getMethod();
-    		_channelNotOpend.signal();
-    		return true;
-    	}
-    	else if (evt.getMethod() instanceof ChannelCloseOkBody)
-    	{
-    		_channelCloseOkBody = (ChannelCloseOkBody)evt.getMethod();
-    		_channelNotClosed.signal();
-    		return true;
-    	}
-    	else if (evt.getMethod() instanceof ChannelCloseBody)
-    	{
-    		handleChannelClose((ChannelCloseBody)evt.getMethod());
-    		return true;
-    	}
-    	else if (evt.getMethod() instanceof ChannelFlowOkBody)
-    	{
-    		_channelFlowOkBody = (ChannelFlowOkBody)evt.getMethod();
-    		_channelFlowNotResponded.signal();
-    		return true;
-    	}
-    	else if (evt.getMethod() instanceof ChannelFlowBody)
-    	{
-    		handleChannelFlow((ChannelFlowBody)evt.getMethod());
-    		return true;
-    	}
-    	else if (evt.getMethod() instanceof ChannelOkBody)
-    	{    		 
-    		_channelOkBody = (ChannelOkBody)evt.getMethod();
-    		 //In this case the only method expecting channel-ok is channel-resume
-    		 // haven't implemented ping and pong.
-    		_channelNotResumed.signal();
-    		return true;
-    	}
-    	else
-    	{
-    		return false;
-    	}
-    }
-	
-	private void handleChannelClose(ChannelCloseBody channelCloseBody)
-	{
-		try
-		{
-			_lock.lock();
-			_currentState = AMQPState.CHANNEL_CLOSED;
-		}
-		finally
-		{
-			_lock.unlock();
-		}
-	}
-	
-	private void handleChannelFlow(ChannelFlowBody channelFlowBody)
-	{
-		_lock.lock();
-		try
-		{
-			handleChannelFlowState(channelFlowBody.active);
-		}
-		finally
-		{
-			_lock.unlock();
-		}
+    public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
+    {
+	_lock.lock();
+	try
+	{
+        	if (evt.getMethod() instanceof ChannelOpenOkBody)
+        	{
+        	    _channelOpenOkBody = (ChannelOpenOkBody) evt.getMethod();
+        	    _channelNotOpend.signal();
+        	    return true;
+        	}
+        	else if (evt.getMethod() instanceof ChannelCloseOkBody)
+        	{
+        	    _channelCloseOkBody = (ChannelCloseOkBody) evt.getMethod();
+        	    _channelNotClosed.signal();
+        	    return true;
+        	}
+        	else if (evt.getMethod() instanceof ChannelCloseBody)
+        	{
+        	    _channelCloseBody = (ChannelCloseBody)evt.getMethod();
+    		    // release the correct lock as u may have some conditions waiting.
+    		   // while an error occured and the broker has sent a close.
+    		    releaseLocks();
+        	    handleChannelClose(_channelCloseBody);
+        	    return true;
+        	}
+        	else if (evt.getMethod() instanceof ChannelFlowOkBody)
+        	{
+        	    _channelFlowOkBody = (ChannelFlowOkBody) evt.getMethod();
+        	    _channelFlowNotResponded.signal();
+        	    return true;
+        	}
+        	else if (evt.getMethod() instanceof ChannelFlowBody)
+        	{
+        	    handleChannelFlow((ChannelFlowBody) evt.getMethod());
+        	    return true;
+        	}
+        	else if (evt.getMethod() instanceof ChannelOkBody)
+        	{
+        	    _channelOkBody = (ChannelOkBody) evt.getMethod();
+        	    // In this case the only method expecting channel-ok is channel-resume
+        	    // haven't implemented ping and pong.
+        	    _channelNotResumed.signal();
+        	    return true;
+        	}
+        	else
+        	{
+        	    return false;
+        	}
+	}
+	finally
+	{
+	    _lock.unlock();
+	}
+    }
+
+    private void handleChannelClose(ChannelCloseBody channelCloseBody)
+    {
+	_currentState = AMQPState.CHANNEL_CLOSED;
+	// handle channel related cleanup
+    }
+    
+    private void releaseLocks()
+    {
+	if(_currentState == AMQPState.CHANNEL_NOT_OPENED)
+	{
+	    _channelNotOpend.signal();
+	    _channelNotResumed.signal(); // It could be a channel.resume call
+	}
+	else if(_currentState == AMQPState.CHANNEL_OPENED || _currentState == AMQPState.CHANNEL_SUSPEND)
+	{
+	    _channelFlowNotResponded.signal();
+	}
+	else if(_currentState == AMQPState.CHANNEL_CLOSED)
+	{
+	    _channelNotResumed.signal();
+	}
+    }
+    
+    private void checkIfConnectionClosed()throws AMQPException
+    {
+	if (_channelCloseBody != null)
+	{
+	    String error = "Broker has closed channel due to : " + _channelCloseBody.getReplyText() +
+	                    " with reply code (" + _channelCloseBody.getReplyCode() + ") " +
+	                    "caused by class " + _channelCloseBody.getClassId() +
+	                    " and method " + _channelCloseBody.getMethod();
+	    
+	    throw new AMQPException(error);	    
 	}
-	
-	private void handleChannelFlowState(boolean flow)
+    }    
+
+    private void handleChannelFlow(ChannelFlowBody channelFlowBody)
+    {
+	_lock.lock();
+	try
 	{
-		_currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND; 
+	    handleChannelFlowState(channelFlowBody.active);
 	}
+	finally
+	{
+	    _lock.unlock();
+	}
+    }
+
+    private void handleChannelFlowState(boolean flow)
+    {
+	_currentState = (flow) ? AMQPState.CHANNEL_OPENED : AMQPState.CHANNEL_SUSPEND;
+    }
 }

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPClassFactory.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,255 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOkBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionStartBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageOffsetBody;
+import org.apache.qpid.framing.MessageOkBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageQosBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.DefaultPhaseContext;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.transport.AMQPConnectionURL;
+import org.apache.qpid.nclient.transport.ConnectionURL;
+import org.apache.qpid.nclient.transport.TransportConnection;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory;
+import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * The Class Factory creates AMQP Class
+ * equivalents defined in the spec.
+ * 
+ * There should one instance per connection.
+ * The factory class creates all the support
+ * classes and provides an instance of the 
+ * AMQP class in ready-to-use state.
+ *
+ */
+public class AMQPClassFactory
+{
+    //Need an event manager per connection
+    private AMQPEventManager _eventManager = new QpidEventManager();
+    
+    // Need a state manager per connection
+    private AMQPStateManager _stateManager = new QpidStateManager();
+    
+    //Need a phase pipe per connection
+    private Phase _phase;
+    
+    //One instance per connection
+    private AMQPConnection _amqpConnection;
+    
+    public AMQPClassFactory()
+    {
+	
+    }
+    
+    public AMQPConnection createConnection(String urlStr,ConnectionType type)throws AMQPException, URLSyntaxException
+    {	
+	AMQPConnectionURL url = new AMQPConnectionURL(urlStr);
+	return createConnectionClass(url,type);
+    }
+    
+    public AMQPConnection createConnectionClass(ConnectionURL url,ConnectionType type)throws AMQPException
+    {
+	if (_amqpConnection == null)
+	{
+	    PhaseContext ctx = new DefaultPhaseContext();
+	    ctx.setProperty(QpidConstants.EVENT_MANAGER, _eventManager);
+	    
+	    TransportConnection conn = TransportConnectionFactory.createTransportConnection(url, type,ctx);	    
+	    _amqpConnection = new AMQPConnection(conn);
+	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionStartBody.class,_amqpConnection);
+	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionSecureBody.class,_amqpConnection);
+	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionTuneBody.class,_amqpConnection);
+	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionOpenOkBody.class,_amqpConnection);
+	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseBody.class,_amqpConnection);
+	    _eventManager.addMethodEventListener(QpidConstants.CHANNEL_ZERO, ConnectionCloseOkBody.class,_amqpConnection);
+	}
+	return _amqpConnection;
+    }   
+    
+    public AMQPChannel createChannelClass(int channel)throws AMQPException
+    {
+	checkIfConnectionStarted();
+	AMQPChannel amqpChannel = new AMQPChannel(channel,_phase);
+	_eventManager.addMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
+        _eventManager.addMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
+        _eventManager.addMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
+        _eventManager.addMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
+        _eventManager.addMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
+        _eventManager.addMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
+	return amqpChannel;
+    }
+    
+    public void destroyChannelClass(int channel,AMQPChannel amqpChannel)throws AMQPException
+    {
+	_eventManager.removeMethodEventListener(channel, ChannelOpenOkBody.class,amqpChannel);
+        _eventManager.removeMethodEventListener(channel, ChannelCloseBody.class,amqpChannel);
+        _eventManager.removeMethodEventListener(channel, ChannelCloseOkBody.class,amqpChannel);
+        _eventManager.removeMethodEventListener(channel, ChannelFlowBody.class,amqpChannel);
+        _eventManager.removeMethodEventListener(channel, ChannelFlowOkBody.class,amqpChannel);
+        _eventManager.removeMethodEventListener(channel, ChannelOkBody.class,amqpChannel);
+    }
+    
+    public AMQPExchange createExchangeClass(int channel)throws AMQPException
+    {
+	checkIfConnectionStarted();
+	AMQPExchange amqpExchange = new AMQPExchange(channel,_phase);
+	_eventManager.addMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
+        _eventManager.addMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
+	return amqpExchange;    
+    }
+    
+    public void destoryExchangeClass(int channel, AMQPExchange amqpExchange)throws AMQPException
+    {
+	_eventManager.removeMethodEventListener(channel, ExchangeDeclareOkBody.class,amqpExchange);
+        _eventManager.removeMethodEventListener(channel, ExchangeDeleteOkBody.class,amqpExchange);
+    }
+    
+    public AMQPQueue createQueueClass(int channel)throws AMQPException
+    {
+	checkIfConnectionStarted();
+	AMQPQueue amqpQueue = new AMQPQueue(channel,_phase);
+	_eventManager.addMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
+        _eventManager.addMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
+        _eventManager.addMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
+        _eventManager.addMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
+        _eventManager.addMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
+	return amqpQueue;
+    }
+    
+    public void destroyQueueClass(int channel,AMQPQueue amqpQueue)throws AMQPException
+    {
+	_eventManager.removeMethodEventListener(channel, QueueDeclareOkBody.class,amqpQueue);
+        _eventManager.removeMethodEventListener(channel, QueueBindOkBody.class,amqpQueue);
+        _eventManager.removeMethodEventListener(channel, QueueUnbindOkBody.class,amqpQueue);
+        _eventManager.removeMethodEventListener(channel, QueuePurgeOkBody.class,amqpQueue);
+        _eventManager.removeMethodEventListener(channel, QueueDeleteOkBody.class,amqpQueue);
+    }
+    
+    public AMQPMessage createMessageClass(int channel,AMQPMessageCallBack messageCb)throws AMQPException
+    {
+	checkIfConnectionStarted();
+	AMQPMessage amqpMessage = new AMQPMessage(channel,_phase,messageCb);
+	_eventManager.addMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
+        _eventManager.addMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
+        _eventManager.addMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
+        _eventManager.addMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
+        _eventManager.addMethodEventListener(channel, MessageGetBody.class,amqpMessage);
+        _eventManager.addMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
+        _eventManager.addMethodEventListener(channel, MessageOkBody.class,amqpMessage);
+        _eventManager.addMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
+        _eventManager.addMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
+        _eventManager.addMethodEventListener(channel, MessageRejectBody.class,amqpMessage);        
+        _eventManager.addMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
+        _eventManager.addMethodEventListener(channel, MessageQosBody.class,amqpMessage);
+        _eventManager.addMethodEventListener(channel, MessageTransferBody.class,amqpMessage);
+        
+	return amqpMessage;
+    }
+    
+    public void destoryMessageClass(int channel,AMQPMessage amqpMessage)throws AMQPException
+    {
+	_eventManager.removeMethodEventListener(channel, MessageAppendBody.class,amqpMessage);
+        _eventManager.removeMethodEventListener(channel, MessageCancelBody.class,amqpMessage);
+        _eventManager.removeMethodEventListener(channel, MessageCheckpointBody.class,amqpMessage);
+        _eventManager.removeMethodEventListener(channel, MessageCloseBody.class,amqpMessage);
+        _eventManager.removeMethodEventListener(channel, MessageGetBody.class,amqpMessage);
+        _eventManager.removeMethodEventListener(channel, MessageOffsetBody.class,amqpMessage);
+        _eventManager.removeMethodEventListener(channel, MessageOkBody.class,amqpMessage);
+        _eventManager.removeMethodEventListener(channel, MessageOpenBody.class,amqpMessage);
+        _eventManager.removeMethodEventListener(channel, MessageRecoverBody.class,amqpMessage);
+        _eventManager.removeMethodEventListener(channel, MessageRejectBody.class,amqpMessage);        
+        _eventManager.removeMethodEventListener(channel, MessageResumeBody.class,amqpMessage);
+        _eventManager.removeMethodEventListener(channel, MessageQosBody.class,amqpMessage);
+        _eventManager.removeMethodEventListener(channel, MessageTransferBody.class,amqpMessage);        
+    }
+    
+    //This class should register as a state listener for AMQPConnection
+    private void checkIfConnectionStarted() throws AMQPException
+    {
+	if (_phase == null)
+	{
+	    _phase = _amqpConnection.getPhasePipe();
+	    
+	    if (_phase == null)
+	    {
+		throw new AMQPException("Cannot create a channel until connection is ready");
+	    }
+	}
+    }
+    
+    /**
+     * Extention point
+     * Other interested parties can obtain a reference to the event manager
+     * and add listeners to get notified of events
+     * 
+     */
+    public AMQPEventManager getEventManager()
+    {
+	return _eventManager;
+    }
+    
+    /**
+     * Extention point
+     * Other interested parties can obtain a reference to the state manager
+     * and add listeners to get notified of state changes
+     * 
+     */
+    public AMQPStateManager getStateManager()
+    {
+	return null;
+    }
+}

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPConnection.java Thu Mar 29 15:24:20 2007
@@ -37,14 +37,14 @@
 import org.apache.qpid.framing.ConnectionStartOkBody;
 import org.apache.qpid.framing.ConnectionTuneBody;
 import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
 import org.apache.qpid.nclient.amqp.state.AMQPState;
 import org.apache.qpid.nclient.amqp.state.AMQPStateMachine;
 import org.apache.qpid.nclient.config.ClientConfiguration;
 import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.Phase;
 import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
 import org.apache.qpid.nclient.transport.TransportConnection;
 import org.apache.qpid.nclient.util.AMQPValidator;
 
@@ -65,9 +65,8 @@
 
     private AMQPState _currentState;
 
-    private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED,
-	    AMQPState.CONNECTION_NOT_SECURE, AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED,
-	    AMQPState.CONNECTION_OPEN, };
+    private final AMQPState[] _validCloseStates = new AMQPState[] { AMQPState.CONNECTION_NOT_STARTED, AMQPState.CONNECTION_NOT_SECURE,
+	    AMQPState.CONNECTION_NOT_TUNED, AMQPState.CONNECTION_NOT_OPENED, AMQPState.CONNECTION_OPEN, };
 
     // The wait period until a server sends a respond
     private long _serverTimeOut = 1000;
@@ -93,8 +92,10 @@
     private ConnectionOpenOkBody _connectionOpenOkBody;
 
     private ConnectionCloseOkBody _connectionCloseOkBody;
+    
+    private ConnectionCloseBody _connectionCloseBody;
 
-    public AMQPConnection(TransportConnection connection)
+    protected AMQPConnection(TransportConnection connection)
     {
 	_connection = connection;
 	_currentState = AMQPState.CONNECTION_UNDEFINED;
@@ -102,12 +103,14 @@
     }
 
     /**
-         * ------------------------------------------- API Methods --------------------------------------------
-         */
+     * ------------------------------------------- 
+     * API Methods 
+     * --------------------------------------------
+     */
 
     /**
-         * Opens the TCP connection and let the formalities begin.
-         */
+     * Opens the TCP connection and let the formalities begin.
+     */
     public ConnectionStartBody openTCPConnection() throws AMQPException
     {
 	_lock.lock();
@@ -119,15 +122,17 @@
 	    _phase = _connection.connect();
 
 	    // waiting for ConnectionStartBody or error in connection
-	    _connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    AMQPValidator.throwExceptionOnNull(_connectionStartBody,
-		    "The broker didn't send the ConnectionStartBody in time");
+	    //_connectionNotStarted.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+	    _connectionNotStarted.await();
+	    
+	    checkIfConnectionClosed();
+	    AMQPValidator.throwExceptionOnNull(_connectionStartBody, "The broker didn't send the ConnectionStartBody in time");
 	    _currentState = AMQPState.CONNECTION_NOT_STARTED;
 	    return _connectionStartBody;
 	}
-	catch (Exception e)
+	catch (InterruptedException e)
 	{
-	    throw new AMQPException("XXX");
+	    throw new AMQPException("Error opening connection to broker", e);
 	}
 	finally
 	{
@@ -135,25 +140,43 @@
 	}
     }
 
-    public ConnectionSecureBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
+    /**
+     * The current java broker implementation can send a connection tune body
+     * as a response to the startOk. Not sure if that is the correct behaviour.
+     */
+    public AMQMethodBody startOk(ConnectionStartOkBody connectionStartOkBody) throws AMQPException
     {
 	_lock.lock();
 	try
 	{
 	    _connectionSecureBody = null;
-	    checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState,
-		    AMQPState.CONNECTION_NOT_SECURE);
+	    checkIfValidStateTransition(AMQPState.CONNECTION_NOT_STARTED, _currentState, AMQPState.CONNECTION_NOT_SECURE);
 	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionStartOkBody, _correlationId);
 	    _phase.messageSent(msg);
-	    _connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    AMQPValidator.throwExceptionOnNull(_connectionSecureBody,
-		    "The broker didn't send the ConnectionSecureBody in time");
-	    _currentState = AMQPState.CONNECTION_NOT_SECURE;
-	    return _connectionSecureBody;
+	    //_connectionNotSecure.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+	    _connectionNotSecure.await();
+	    //AMQPValidator.throwExceptionOnNull(_connectionSecureBody, "The broker didn't send the ConnectionSecureBody in time");
+	    //_currentState = AMQPState.CONNECTION_NOT_SECURE;
+	    
+	    checkIfConnectionClosed();
+	    if (_connectionTuneBody != null)
+	    {
+		_currentState = AMQPState.CONNECTION_NOT_TUNED;
+		return _connectionTuneBody;
+	    }
+	    else if (_connectionSecureBody != null)
+	    { // oops the server sent another challenge
+		_currentState = AMQPState.CONNECTION_NOT_SECURE;
+		return _connectionSecureBody;
+	    }
+	    else
+	    {
+		throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
+	    }	    
 	}
-	catch (Exception e)
+	catch (InterruptedException e)
 	{
-	    throw new AMQPException("XXX");
+	    throw new AMQPException("Error in connection.startOk", e);
 	}
 	finally
 	{
@@ -162,9 +185,9 @@
     }
 
     /**
-         * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
-         * issue a new challenge
-         */
+     * The server will verify the response contained in the secureOK body and send a ConnectionTuneBody or it could
+     * issue a new challenge
+     */
     public AMQMethodBody secureOk(ConnectionSecureOkBody connectionSecureOkBody) throws AMQPException
     {
 	_lock.lock();
@@ -173,11 +196,15 @@
 	    _connectionTuneBody = null;
 	    _connectionSecureBody = null;
 	    checkIfValidStateTransition(AMQPState.CONNECTION_NOT_SECURE, _currentState, AMQPState.CONNECTION_NOT_TUNED);
+
 	    _connectionSecureBody = null; // The server could send a fresh challenge
-	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody,
-		    _correlationId);
+	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionSecureOkBody, _correlationId);
 	    _phase.messageSent(msg);
-	    _connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+	    
+	    //_connectionNotTuned.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+	    _connectionNotTuned.await();
+	    checkIfConnectionClosed();
+
 	    if (_connectionTuneBody != null)
 	    {
 		_currentState = AMQPState.CONNECTION_NOT_TUNED;
@@ -193,9 +220,9 @@
 		throw new AMQPException("The broker didn't send the ConnectionTuneBody or ConnectionSecureBody in time");
 	    }
 	}
-	catch (Exception e)
+	catch (InterruptedException e)
 	{
-	    throw new AMQPException("XXX");
+	    throw new AMQPException("Error in connection.secureOk", e);
 	}
 	finally
 	{
@@ -214,10 +241,6 @@
 	    _phase.messageSent(msg);
 	    _currentState = AMQPState.CONNECTION_NOT_OPENED;
 	}
-	catch (Exception e)
-	{
-	    throw new AMQPException("XXX");
-	}
 	finally
 	{
 	    _lock.unlock();
@@ -229,20 +252,26 @@
 	_lock.lock();
 	try
 	{
+	    // If the broker sends a connection close due to an error with the
+	    // Connection tune ok, then this call will verify that
+	    checkIfConnectionClosed();
+	    
 	    _connectionOpenOkBody = null;
 	    checkIfValidStateTransition(AMQPState.CONNECTION_NOT_OPENED, _currentState, AMQPState.CONNECTION_OPEN);
-	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody,
-		    QpidConstants.EMPTY_CORRELATION_ID);
+	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectionOpenBody, QpidConstants.EMPTY_CORRELATION_ID);
 	    _phase.messageSent(msg);
-	    _connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody,
-		    "The broker didn't send the ConnectionOpenOkBody in time");
+	    
+	    //_connectionNotOpened.await(_serverTimeOut, TimeUnit.MILLISECONDS);
+	    _connectionNotOpened.await();
+	    
+	    checkIfConnectionClosed();
+	    AMQPValidator.throwExceptionOnNull(_connectionOpenOkBody, "The broker didn't send the ConnectionOpenOkBody in time");
 	    _currentState = AMQPState.CONNECTION_OPEN;
 	    return _connectionOpenOkBody;
 	}
-	catch (Exception e)
+	catch (InterruptedException e)
 	{
-	    throw new AMQPException("XXX");
+	    throw new AMQPException("Error in connection.open", e);
 	}
 	finally
 	{
@@ -257,18 +286,16 @@
 	{
 	    _connectionCloseOkBody = null;
 	    checkIfValidStateTransition(_validCloseStates, _currentState, AMQPState.CONNECTION_CLOSED);
-	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody,
-		    QpidConstants.EMPTY_CORRELATION_ID);
+	    AMQPMethodEvent msg = new AMQPMethodEvent(QpidConstants.CHANNEL_ZERO, connectioncloseBody, QpidConstants.EMPTY_CORRELATION_ID);
 	    _phase.messageSent(msg);
 	    _connectionNotClosed.await(_serverTimeOut, TimeUnit.MILLISECONDS);
-	    AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody,
-		    "The broker didn't send the ConnectionCloseOkBody in time");
+	    AMQPValidator.throwExceptionOnNull(_connectionCloseOkBody, "The broker didn't send the ConnectionCloseOkBody in time");
 	    _currentState = AMQPState.CONNECTION_CLOSED;
 	    return _connectionCloseOkBody;
 	}
-	catch (Exception e)
+	catch (InterruptedException e)
 	{
-	    throw new AMQPException("XXX");
+	    throw new AMQPException("Error in connection.close", e);
 	}
 	finally
 	{
@@ -277,72 +304,117 @@
     }
 
     /**
-         * ------------------------------------------- AMQMethodListener methods
-         * --------------------------------------------
-         */
+     * ------------------------------------------- AMQMethodListener methods
+     * --------------------------------------------
+     */
     public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException
     {
-	_correlationId = evt.getCorrelationId();
-
-	if (evt.getMethod() instanceof ConnectionStartBody)
+	_lock.lock();
+	try
 	{
-	    _connectionStartBody = (ConnectionStartBody) evt.getMethod();
-	    _connectionNotStarted.signal();
-	    return true;
+	    _correlationId = evt.getCorrelationId();
+
+	    if (evt.getMethod() instanceof ConnectionStartBody)
+	    {
+		_connectionStartBody = (ConnectionStartBody) evt.getMethod();
+		_connectionNotStarted.signalAll();
+		return true;
+	    }
+	    else if (evt.getMethod() instanceof ConnectionSecureBody)
+	    {
+		_connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
+		_connectionNotSecure.signal();
+		_connectionNotTuned.signal(); // in case the server has sent another chanllenge
+		return true;
+	    }
+	    else if (evt.getMethod() instanceof ConnectionTuneBody)
+	    {
+		_connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
+		_connectionNotSecure.signal(); //if the server does the auth with ConntectionStartOk 
+		_connectionNotTuned.signal();
+		return true;
+	    }
+	    else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+	    {
+		_connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
+		_connectionNotOpened.signal();
+		return true;
+	    }
+	    else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+	    {
+		_connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
+		_connectionNotClosed.signal();
+		return true;
+	    }
+	    else if (evt.getMethod() instanceof ConnectionCloseBody)
+	    {
+		_connectionCloseBody = (ConnectionCloseBody)evt.getMethod();
+		// release the correct lock as u may have some conditions waiting.
+		// while an error occured and the broker has sent a close.
+		releaseLocks();
+		handleClose();
+		return true;
+	    }
+	    else
+	    {
+		return false;
+	    }
 	}
-	else if (evt.getMethod() instanceof ConnectionSecureBody)
+	finally
 	{
-	    _connectionSecureBody = (ConnectionSecureBody) evt.getMethod();
-	    _connectionNotSecure.signal();
-	    _connectionNotTuned.signal(); // in case the server has sent another chanllenge
-	    return true;
+	    _lock.unlock();
 	}
-	else if (evt.getMethod() instanceof ConnectionTuneBody)
+    }
+
+    private void handleClose() throws AMQPException
+    {
+	try
+	{	    
+	    _currentState = AMQPState.CONNECTION_CLOSING;
+	    // do the required cleanup and send a ConnectionCloseOkBody
+	}
+	catch (Exception e)
 	{
-	    _connectionTuneBody = (ConnectionTuneBody) evt.getMethod();
-	    _connectionNotTuned.signal();
-	    return true;
+	    throw new AMQPException("Error handling connection.close from broker", e);
 	}
-	else if (evt.getMethod() instanceof ConnectionOpenOkBody)
+    }
+    
+    private void checkIfConnectionClosed()throws AMQPException
+    {
+	if (_connectionCloseBody != null)
+	{
+	    String error = "Broker has closed connection due to : " + _connectionCloseBody.getReplyText() +
+	                    " with reply code (" + _connectionCloseBody.getReplyCode() + ") " +
+	                    "caused by class " + _connectionCloseBody.getClassId() +
+	                    " and method " + _connectionCloseBody.getMethod();
+	    
+	    throw new AMQPException(error);	    
+	}
+    }    
+    
+    private void releaseLocks()
+    {
+	if(_currentState == AMQPState.CONNECTION_NOT_OPENED)
 	{
-	    _connectionOpenOkBody = (ConnectionOpenOkBody) evt.getMethod();
 	    _connectionNotOpened.signal();
-	    return true;
 	}
-	else if (evt.getMethod() instanceof ConnectionCloseOkBody)
+	else if(_currentState == AMQPState.CONNECTION_UNDEFINED)
 	{
-	    _connectionCloseOkBody = (ConnectionCloseOkBody) evt.getMethod();
-	    _connectionNotClosed.signal();
-	    return true;
+	    _connectionNotStarted.signal();
 	}
-	else if (evt.getMethod() instanceof ConnectionCloseBody)
+	else if(_currentState == AMQPState.CONNECTION_NOT_STARTED)
 	{
-	    handleClose();
-	    return true;
+	    _connectionNotSecure.signal();
 	}
-	else
+	else if(_currentState == AMQPState.CONNECTION_NOT_SECURE)
 	{
-	    return false;
+	    _connectionNotTuned.signal();
 	}
     }
 
-    public void handleClose() throws AMQPException
+    public Phase getPhasePipe()
     {
-	_lock.lock();
-	try
-	{
-	    checkIfValidStateTransition(AMQPState.CONNECTION_OPEN, _currentState, AMQPState.CONNECTION_CLOSING);
-	    _currentState = AMQPState.CONNECTION_CLOSING;
-	    // do the required cleanup and send a ConnectionCloseOkBody
-	}
-	catch (Exception e)
-	{
-	    throw new AMQPException("XXX");
-	}
-	finally
-	{
-	    _lock.unlock();
-	}
+	return _phase;
     }
 
 }

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPExchange.java Thu Mar 29 15:24:20 2007
@@ -26,10 +26,10 @@
 import org.apache.qpid.framing.ExchangeDeclareOkBody;
 import org.apache.qpid.framing.ExchangeDeleteBody;
 import org.apache.qpid.framing.ExchangeDeleteOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
 import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
 
 /**
  * 
@@ -43,7 +43,7 @@
 {
 	private Phase _phase;
 	
-	public AMQPExchange(int channelId,Phase phase)
+	protected AMQPExchange(int channelId,Phase phase)
 	{
 		super(channelId);
 		_phase = phase;

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessage.java Thu Mar 29 15:24:20 2007
@@ -25,6 +25,7 @@
 import org.apache.qpid.framing.MessageCancelBody;
 import org.apache.qpid.framing.MessageCheckpointBody;
 import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageConsumeBody;
 import org.apache.qpid.framing.MessageEmptyBody;
 import org.apache.qpid.framing.MessageGetBody;
 import org.apache.qpid.framing.MessageOffsetBody;
@@ -35,10 +36,10 @@
 import org.apache.qpid.framing.MessageRejectBody;
 import org.apache.qpid.framing.MessageResumeBody;
 import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
 import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
 
 /**
  * This class represents the AMQP Message class.
@@ -59,7 +60,7 @@
 	private Phase _phase;
 	private AMQPMessageCallBack _messageCb;
 	
-	public AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
+	protected AMQPMessage(int channelId,Phase phase,AMQPMessageCallBack messageCb)
 	{
 		super(channelId);
 		_phase = phase;
@@ -78,9 +79,9 @@
 		_phase.messageSent(msg);
 	}
 
-	public void consume(MessageCancelBody messageCancelBody,AMQPCallBack cb) throws AMQPException 
+	public void consume(MessageConsumeBody messageConsumeBody,AMQPCallBack cb) throws AMQPException 
 	{
-		AMQPMethodEvent msg = handleAsynchronousCall(messageCancelBody,cb);
+		AMQPMethodEvent msg = handleAsynchronousCall(messageConsumeBody,cb);
 		_phase.messageSent(msg);
 	}
 	

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPMessageCallBack.java Thu Mar 29 15:24:20 2007
@@ -21,10 +21,8 @@
 package org.apache.qpid.nclient.amqp;
 
 import org.apache.qpid.framing.MessageAppendBody;
-import org.apache.qpid.framing.MessageCancelBody;
 import org.apache.qpid.framing.MessageCheckpointBody;
 import org.apache.qpid.framing.MessageCloseBody;
-import org.apache.qpid.framing.MessageGetBody;
 import org.apache.qpid.framing.MessageOpenBody;
 import org.apache.qpid.framing.MessageRecoverBody;
 import org.apache.qpid.framing.MessageResumeBody;

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/AMQPQueue.java Thu Mar 29 15:24:20 2007
@@ -31,10 +31,10 @@
 import org.apache.qpid.framing.QueuePurgeOkBody;
 import org.apache.qpid.framing.QueueUnbindBody;
 import org.apache.qpid.framing.QueueUnbindOkBody;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
 import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.Phase;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.nclient.model.AMQPMethodListener;
 
 /**
  * 
@@ -48,7 +48,7 @@
 {
 	private Phase _phase;
 
-	public AMQPQueue(int channelId,Phase phase)
+	protected AMQPQueue(int channelId,Phase phase)
 	{
 		super(channelId);
 		_phase = phase;

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidEventManager.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,123 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodListener;
+import org.apache.qpid.nclient.core.AMQPException;
+
+/**
+ * This class registeres with the ModelPhase as a AMQMethodListener, 
+ * to receive method events and then it distributes methods to other listerners
+ * using a filtering criteria. The criteria is channel id and method body class.
+ * The method listeners are added and removed dynamically
+ * 
+ * <p/>
+ */
+public class QpidEventManager implements AMQPEventManager
+{
+    private static final Logger _logger = Logger.getLogger(QpidEventManager.class);
+
+    private Map<Integer, Map> _channelMap = new ConcurrentHashMap<Integer, Map>();
+
+    /**
+     * ------------------------------------------------
+     * methods introduced by AMQEventManager
+     * ------------------------------------------------
+     */
+    public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
+    {
+	Map<Class, List> _methodListenerMap;
+	if (_channelMap.containsKey(channelId))
+	{
+	    _methodListenerMap = _channelMap.get(channelId);
+
+	}
+	else
+	{
+	    _methodListenerMap = new ConcurrentHashMap<Class, List>();
+	    _channelMap.put(channelId, _methodListenerMap);
+	}
+
+	List<AMQPMethodListener> _listeners;
+	if (_methodListenerMap.containsKey(clazz))
+	{
+	    _listeners = _methodListenerMap.get(clazz);
+	}
+	else
+	{
+	    _listeners = new ArrayList<AMQPMethodListener>();
+	    _methodListenerMap.put(clazz, _listeners);
+	}
+
+	_listeners.add(l);
+
+    }
+
+    public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l)
+    {
+	if (_channelMap.containsKey(channelId))
+	{
+	    Map<Class, List> _methodListenerMap = _channelMap.get(channelId);
+
+	    if (_methodListenerMap.containsKey(clazz))
+	    {
+		List<AMQPMethodListener> _listeners = _methodListenerMap.get(clazz);
+		_listeners.remove(l);
+	    }
+
+	}
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.qpid.nclient.model.AMQStateManager#methodReceived(org.apache.qpid.protocol.AMQMethodEvent)
+     */
+    public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException
+    {
+	if (_channelMap.containsKey(evt.getChannelId()))
+	{
+	    Map<Class, List> _methodListenerMap = _channelMap.get(evt.getChannelId());
+
+	    if (_methodListenerMap.containsKey(evt.getMethod().getClass()))
+	    {
+
+		List<AMQPMethodListener> _listeners = _methodListenerMap.get(evt.getMethod().getClass());
+		for (AMQPMethodListener l : _listeners)
+		{
+		    l.methodReceived(evt);
+		}
+
+		return (_listeners.size() > 0);
+	    }
+
+	}
+
+	return false;
+    }
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/QpidStateManager.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.amqp.state.AMQPStateListener;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+
+public class QpidStateManager implements AMQPStateManager
+{
+
+    public void addListener(AMQPStateListener l) throws AMQException
+    {
+	// TODO Auto-generated method stub
+
+    }
+
+    public void removeListener(AMQPStateListener l) throws AMQException
+    {
+	// TODO Auto-generated method stub
+
+    }
+
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPEventManager.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,11 @@
+package org.apache.qpid.nclient.amqp.event;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPEventManager
+{
+    public void addMethodEventListener(int channelId, Class clazz, AMQPMethodListener l);
+    public void removeMethodEventListener(int channelId, Class clazz, AMQPMethodListener l);
+    public <B extends AMQMethodBody> boolean notifyEvent(AMQPMethodEvent<B> evt) throws AMQPException;
+}
\ No newline at end of file

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodEvent.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,78 @@
+package org.apache.qpid.nclient.amqp.event;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * This class is exactly the same as the AMQMethod event.
+ * Except I renamed requestId to corelationId, so I could use it both ways.
+ * 
+ * I didn't want to modify anything in common so that there is no
+ * impact on the existing code.
+ *
+ */
+public class AMQPMethodEvent<M extends AMQMethodBody>
+{
+
+    private final M _method;
+
+    private final int _channelId;
+
+    /**
+     * This is the rquest id from the broker when it sent me a request
+     * when I respond I remember this id and copy this to the outgoing
+     * response. 
+     */
+    private final long _correlationId;
+
+    /**
+     * I could use _correlationId, bcos when I send a request
+     * this field is blank and is only used internally. But I 
+     * used a seperate field to make it more clear.
+     */
+    private long _localCorrletionId = 0;
+
+    public AMQPMethodEvent(int channelId, M method, long correlationId, long localCorrletionId)
+    {
+	_channelId = channelId;
+	_method = method;
+	_correlationId = correlationId;
+	_localCorrletionId = localCorrletionId;
+    }
+
+    public AMQPMethodEvent(int channelId, M method, long correlationId)
+    {
+	_channelId = channelId;
+	_method = method;
+	_correlationId = correlationId;
+    }
+
+    public M getMethod()
+    {
+	return _method;
+    }
+
+    public int getChannelId()
+    {
+	return _channelId;
+    }
+
+    public long getCorrelationId()
+    {
+	return _correlationId;
+    }
+
+    public long getLocalCorrelationId()
+    {
+	return _localCorrletionId;
+    }
+
+    public String toString()
+    {
+	StringBuilder buf = new StringBuilder("Method event: \n");
+	buf.append("Channel id: \n").append(_channelId);
+	buf.append("Method: \n").append(_method);
+	buf.append("Request Id: ").append(_correlationId);
+	buf.append("Local Correlation Id: ").append(_localCorrletionId);
+	return buf.toString();
+    }
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/event/AMQPMethodListener.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,11 @@
+package org.apache.qpid.nclient.amqp.event;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPMethodListener 
+{
+
+	public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException;
+	
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/MessageHelper.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,57 @@
+package org.apache.qpid.nclient.amqp.sample;
+
+import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageCheckpointBody;
+import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOpenBody;
+import org.apache.qpid.framing.MessageRecoverBody;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.nclient.amqp.AMQPMessageCallBack;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public class MessageHelper implements AMQPMessageCallBack
+{
+
+    public void append(MessageAppendBody messageAppendBody, long correlationId) throws AMQPException
+    {
+	// TODO Auto-generated method stub
+
+    }
+
+    public void checkpoint(MessageCheckpointBody messageCheckpointBody, long correlationId) throws AMQPException
+    {
+	// TODO Auto-generated method stub
+
+    }
+
+    public void close(MessageCloseBody messageCloseBody, long correlationId) throws AMQPException
+    {
+	// TODO Auto-generated method stub
+
+    }
+
+    public void open(MessageOpenBody messageOpenBody, long correlationId) throws AMQPException
+    {
+	// TODO Auto-generated method stub
+
+    }
+
+    public void recover(MessageRecoverBody messageRecoverBody, long correlationId) throws AMQPException
+    {
+	// TODO Auto-generated method stub
+
+    }
+
+    public void resume(MessageResumeBody messageResumeBody, long correlationId) throws AMQPException
+    {
+	// TODO Auto-generated method stub
+
+    }
+
+    public void transfer(MessageTransferBody messageTransferBody, long correlationId) throws AMQPException
+    {
+	System.out.println("The Broker has sent a message" + messageTransferBody.toString());
+    }
+
+}