You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/30 00:24:22 UTC

svn commit: r523854 [2/3] - in /incubator/qpid/branches/qpid.0-9/java/newclient: ./ src/main/java/ src/main/java/org/apache/qpid/nclient/amqp/ src/main/java/org/apache/qpid/nclient/amqp/event/ src/main/java/org/apache/qpid/nclient/amqp/sample/ src/main...

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java Thu Mar 29 15:24:20 2007
@@ -1,25 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.nclient.amqp.sample;
 
 import java.util.StringTokenizer;
+import java.util.UUID;
 
 import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslClient;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
 import org.apache.qpid.framing.ConnectionSecureBody;
 import org.apache.qpid.framing.ConnectionSecureOkBody;
 import org.apache.qpid.framing.ConnectionStartBody;
 import org.apache.qpid.framing.ConnectionStartOkBody;
 import org.apache.qpid.framing.ConnectionTuneBody;
 import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.ExchangeDeclareBody;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPChannel;
+import org.apache.qpid.nclient.amqp.AMQPClassFactory;
 import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.amqp.AMQPExchange;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPQueue;
 import org.apache.qpid.nclient.transport.AMQPConnectionURL;
 import org.apache.qpid.nclient.transport.ConnectionURL;
-import org.apache.qpid.nclient.transport.TransportConnection;
-import org.apache.qpid.nclient.transport.TransportConnectionFactory;
 import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
 
 /**
@@ -27,26 +74,34 @@
  * Notes this is just a simple demo.
  * 
  * I have used Helper classes to keep the code cleaner.
+ * Will break this into unit tests later on
  */
+
+@SuppressWarnings("unused")
 public class TestClient
 {
-    private byte major;
-    private byte minor;
+    private byte _major;
+    private byte _minor;
     private ConnectionURL _url;
+    private static int _channel = 2;   
+    // Need a Class factory per connection
+    private AMQPClassFactory _classFactory = new AMQPClassFactory();
+    private int _ticket;    
     
     public AMQPConnection openConnection() throws Exception
     {
-	_url = new AMQPConnectionURL("");
-	TransportConnection conn = TransportConnectionFactory.createTransportConnection(_url, ConnectionType.VM);
-	return new AMQPConnection(conn);
+	//_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'");
+	
+	_url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
+	return _classFactory.createConnectionClass(_url,ConnectionType.TCP);
     }
     
-    public void handleProtocolNegotiation(AMQPConnection con) throws Exception
+    public void handleConnectionNegotiation(AMQPConnection con) throws Exception
     {
 	// ConnectionStartBody
 	ConnectionStartBody connectionStartBody = con.openTCPConnection();
-	major = connectionStartBody.getMajor();
-	minor = connectionStartBody.getMajor();
+	_major = connectionStartBody.getMajor();
+	_minor = connectionStartBody.getMinor();
 	
 	FieldTable clientProperties = FieldTableFactory.newFieldTable();        
         clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id
@@ -61,33 +116,335 @@
                 null, SecurityHelper.createCallbackHandler(mechanism,_url));
         
 	ConnectionStartOkBody connectionStartOkBody = 
-	    ConnectionStartOkBody.createMethodBody(major, minor, clientProperties, 
+	    ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, 
 		                                   new AMQShortString(tokenizer.nextToken()), 
 		                                   new AMQShortString(mechanism), 
 		                                   (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
 	// ConnectionSecureBody 
-	ConnectionSecureBody connectionSecureBody = con.startOk(connectionStartOkBody);
+	AMQMethodBody body = con.startOk(connectionStartOkBody);
+	ConnectionTuneBody connectionTuneBody;
 	
-	ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(
-							major,minor,sc.evaluateChallenge(connectionSecureBody.getChallenge()));
+	if (body instanceof ConnectionSecureBody)
+	{
+        	ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody)body;
+        	ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(
+        							_major,_minor,sc.evaluateChallenge(connectionSecureBody.getChallenge()));
+        	//Assuming the server is not going to send another challenge
+        	connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody);
+        	
+	}
+	else 
+	{
+	    connectionTuneBody = (ConnectionTuneBody)body;
+	}
 	
-	// Assuming the server is not going to send another challenge
-	ConnectionTuneBody connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody);
 	
 	// Using broker supplied values
 	ConnectionTuneOkBody connectionTuneOkBody = 
-	    	ConnectionTuneOkBody.createMethodBody(major,minor,
+	    	ConnectionTuneOkBody.createMethodBody(_major,_minor,
 	    					      connectionTuneBody.getChannelMax(),
 	    					      connectionTuneBody.getFrameMax(),
 	    					      connectionTuneBody.getHeartbeat());
 	con.tuneOk(connectionTuneOkBody);
+	
+	ConnectionOpenBody connectionOpenBody = 
+	    ConnectionOpenBody.createMethodBody(_major,_minor,null, true,new AMQShortString(_url.getVirtualHost()));
+	
+	ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody);
+    }
+    
+    public void handleChannelNegotiation() throws Exception
+    {
+	AMQPChannel channel = _classFactory.createChannelClass(_channel);
+	
+	ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));	
+	ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody);
+	
+	//lets have some fun
+	ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false);
+	
+	ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody);
+	System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
+	
+	channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true);
+	channelFlowOkBody = channel.flow(channelFlowBody);
+	System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
+    }
+    
+    public void createExchange() throws Exception
+    {
+	AMQPExchange exchange = _classFactory.createExchangeClass(_channel);
+	
+	ExchangeDeclareBody exchangeDeclareBody = 
+	    ExchangeDeclareBody.createMethodBody(_major, _minor, 
+		    				  null, // arguments
+		    				  false,//auto delete
+		    				  false,// durable 
+		    				  new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),
+		    				  true, //internal
+		    				  false,// nowait
+		    				  false,// passive
+		    				  _ticket,
+		    				  new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
+	
+	AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange");
+	exchange.declare(exchangeDeclareBody, cb);
+	// Blocking for response
+	while (!cb.isComplete()){}
+    }
+         
+    
+    public void createAndBindQueue()throws Exception
+    {
+	AMQPQueue queue = _classFactory.createQueueClass(_channel);
+	
+	QueueDeclareBody queueDeclareBody = 
+	    QueueDeclareBody.createMethodBody(_major, _minor,
+		                             null, //arguments
+		                             false,//auto delete
+		                             false,// durable
+		                             false, //exclusive,
+		                             false, //nowait, 
+		                             false, //passive,
+		                             new AMQShortString("MyTestQueue"),
+		                             0);
+	
+	AMQPCallBack cb = new AMQPCallBack(){
+
+	    @Override
+	    public void brokerResponded(AMQMethodBody body)
+	    {		
+		QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody)body;
+		System.out.println("[Broker has created the queue, " +
+			"message count " + queueDeclareOkBody.getMessageCount() + 
+			"consumer count " + queueDeclareOkBody.getConsumerCount() + "]\n");
+	    }
+
+	    @Override
+	    public void brokerRespondedWithError(AMQException e)
+	    {		
+	    }
+	    
+	};
+	
+	queue.declare(queueDeclareBody, cb);
+        //Blocking for response
+	while (!cb.isComplete()){}
+	
+	QueueBindBody queueBindBody = 
+	    QueueBindBody.createMethodBody(_major, _minor,
+		                           null, //arguments
+		                           new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange
+		                           false, //nowait
+		                           new AMQShortString("MyTestQueue"), //queue
+		                           new AMQShortString("RH"), //routingKey
+		                           0 //ticket
+		                           );
+	
+	cb = createCallBackWithMessage("Broker has bound the queue");
+	queue.bind(queueBindBody, cb);
+	//Blocking for response
+	while (!cb.isComplete()){}	
+    }
+    
+    public void purgeQueue()throws Exception
+    {
+	AMQPQueue queue = _classFactory.createQueueClass(_channel);
+	
+	QueuePurgeBody queuePurgeBody = 
+	    QueuePurgeBody.createMethodBody(_major, _minor, 
+		                            false, //nowait
+		                            new AMQShortString("MyTestQueue"), //queue
+		                            0 //ticket
+		                            );
+	    
+	AMQPCallBack cb = new AMQPCallBack(){
+
+	    @Override
+	    public void brokerResponded(AMQMethodBody body)
+	    {		
+		QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody)body;
+		System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n");
+	    }
+
+	    @Override
+	    public void brokerRespondedWithError(AMQException e)
+	    {		
+	    }
+	    
+	};
+	
+	queue.purge(queuePurgeBody, cb);
+        //Blocking for response
+	while (!cb.isComplete()){}
+	    
     }
+    
+    public void deleteQueue()throws Exception
+    {
+	AMQPQueue queue = _classFactory.createQueueClass(_channel);
+	
+	QueueDeleteBody queueDeleteBody = 
+	    QueueDeleteBody.createMethodBody(_major, _minor,
+	    				    false, //ifEmpty
+	    				    false, //ifUnused
+		                            false, //nowait
+		                            new AMQShortString("MyTestQueue"), //queue
+		                            0 //ticket
+		                            );
+	    
+	AMQPCallBack cb = new AMQPCallBack(){
+
+	    @Override
+	    public void brokerResponded(AMQMethodBody body)
+	    {		
+		QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody)body;
+		System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n");
+	    }
 
+	    @Override
+	    public void brokerRespondedWithError(AMQException e)
+	    {		
+	    }
+	    
+	};
+	
+	queue.delete(queueDeleteBody, cb);
+        //Blocking for response
+	while (!cb.isComplete()){}
+	    
+    }
+
+    public void publishAndSubscribe() throws Exception
+    {
+	AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
+	MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor,
+		                                                                   new AMQShortString("myClient"),// destination
+		                                                                   false, //exclusive
+		                                                                   null, //filter
+		                                                                   false, //noAck,
+		                                                                   false, //noLocal, 
+		                                                                   new AMQShortString("MyTestQueue"), //queue
+		                                                                   0 //ticket
+		                                                                   );
+	
+	AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume");	
+	message.consume(messageConsumeBody, cb);
+	//Blocking for response
+	while (!cb.isComplete()){}
+	
+	// Sending 5 messages serially
+	for (int i=0; i<5; i++)
+	{
+        	cb = createCallBackWithMessage("Broker has accepted msg " + i);
+        	message.transfer(createMessages("Test" + i),cb);
+        	while (!cb.isComplete()){}	
+	}
+	
+    	MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient"));
+    
+    	AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel");	
+    	message.cancel(messageCancelBody, cb2);
+	
+    }
+    
+    private MessageTransferBody createMessages(String content) throws Exception
+    {
+	FieldTable headers = FieldTableFactory.newFieldTable();  
+	headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + "");
+		
+	MessageTransferBody messageTransferBody = 
+	    MessageTransferBody.createMethodBody(_major, _minor,
+		    				 new AMQShortString("testApp"), //appId
+		                                 headers, //applicationHeaders
+		                                 new Content(Content.TypeEnum.INLINE_T,content.getBytes()), //body
+		                                 new AMQShortString(""), //contentEncoding, 
+		                                 new AMQShortString("text/plain"), //contentType
+		                                 new AMQShortString("testApp"), //correlationId
+		                                 (short)1, //deliveryMode non persistant
+		                                 new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
+		                                 new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+		                                 0l, //expiration
+		                                 false, //immediate
+		                                 false, //mandatory
+		                                 new AMQShortString(UUID.randomUUID().toString()), //messageId
+		                                 (short)0, //priority
+		                                 false, //redelivered
+		                                 new AMQShortString("RH"), //replyTo
+		                                 new AMQShortString("RH"), //routingKey, 
+		                                 "abc".getBytes(), //securityToken
+		                                 0, //ticket
+		                                 System.currentTimeMillis(), //timestamp
+		                                 new AMQShortString(""), //transactionId
+		                                 0l, //ttl, 
+		                                 new AMQShortString("Hello") //userId
+		                                 );
+	
+	return messageTransferBody;
+		                                 
+    }
+    
+    public void publishAndGet() throws Exception
+    {
+	AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
+	AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5");
+	
+	MessageGetBody messageGetBody = 
+	    	MessageGetBody.createMethodBody(_major, _minor, 
+						new AMQShortString("myClient"), 
+						false, //noAck
+						new AMQShortString("MyTestQueue"), //queue
+						0 //ticket
+						);
+	
+	//AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper());
+	message.transfer(createMessages("Test"),cb);
+	while(!cb.isComplete()){}
+	
+	cb = createCallBackWithMessage("Broker has accepted get");
+	message.get(messageGetBody, cb);	
+    }
+    
+    // Creates a gneric call back and prints the given message
+    private AMQPCallBack createCallBackWithMessage(final String msg)
+    {
+	AMQPCallBack cb = new AMQPCallBack(){
+
+	    @Override
+	    public void brokerResponded(AMQMethodBody body)
+	    {		
+		System.out.println(msg);
+	    }
+
+	    @Override
+	    public void brokerRespondedWithError(AMQException e)
+	    {		
+	    }
+	    
+	};
+	
+	return cb;
+    }
+    
     public static void main(String[] args)
     {
 	TestClient test = new TestClient();
-	AMQPConnection con = test.openConnection();
-        
+	try
+	{
+	    AMQPConnection con = test.openConnection();
+	    test.handleConnectionNegotiation(con);
+	    test.handleChannelNegotiation();
+	    test.createExchange();
+	    test.createAndBindQueue();
+	    test.publishAndSubscribe();
+	    test.purgeQueue();	    
+	    test.publishAndGet();
+	    test.deleteQueue();
+	}
+	catch (Exception e)
+	{
+	    e.printStackTrace();
+	}
     }
 
 }

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java Thu Mar 29 15:24:20 2007
@@ -4,6 +4,9 @@
 import java.io.InputStream;
 import java.util.Collection;
 import java.util.List;
+import java.util.TreeMap;
+
+import javax.security.sasl.SaslClientFactory;
 
 import org.apache.commons.configuration.CombinedConfiguration;
 import org.apache.commons.configuration.ConfigurationException;
@@ -11,6 +14,7 @@
 import org.apache.commons.configuration.XMLConfiguration;
 import org.apache.log4j.Logger;
 import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.security.AMQPCallbackHandler;
 
 /**
  * Loads a properties file from classpath.
@@ -64,21 +68,33 @@
 	
 	public static void main(String[] args)
 	{
-		System.out.println(ClientConfiguration.get().getString(QpidConstants.USE_SHARED_READ_WRITE_POOL));
-			
-		//System.out.println(ClientConfiguration.get().getString("methodListeners.methodListener(1).[@class]"));
-		int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER);
-		System.out.println(count);
-				
-		for(int i=0 ;i<count;i++)
-		{
-			String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")";
-			System.out.println("\n\n"+ClientConfiguration.get().getString(methodListener + QpidConstants.CLASS));
-			List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS);
-			for(String s:list)
-			{
-				System.out.println(s);
-			}
-		}
+	    String key = QpidConstants.AMQP_SECURITY + "." + 
+	        QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." +
+	        QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY;
+	        
+	    TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
+                new TreeMap<String, Class<? extends SaslClientFactory>>();
+	    
+	        int index = ClientConfiguration.get().getMaxIndex(key);                                       
+	        	
+	        for (int i=0; i<index+1;i++)
+	        {
+	            String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]");
+	            String className = ClientConfiguration.get().getString(key + "(" + i + ")" );
+	            try
+	            {
+	                Class<?> clazz = Class.forName(className);
+	                if (!(SaslClientFactory.class.isAssignableFrom(clazz)))
+	                {
+	                    _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping");
+	                    continue;
+	                }
+	                factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
+	            }
+	            catch (Exception ex)
+	            {
+	                _logger.error("Error instantiating SaslClientFactory calss " + className  + " - skipping");
+	            }
+	        }
 	}
 }

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml Thu Mar 29 15:24:20 2007
@@ -1,86 +1,36 @@
 <?xml version="1.0" encoding="ISO-8859-1" ?>
 <qpidClientConfig>
 
-<security>
-	<saslClientFactoryTypes>
-		<saslClientFactory type="AMQPLAIN">org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory>
-	</saslClientFactoryTypes>
-	<securityMechanisms>
-		<securityMechanismHandler type="PLAIN">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
-		<securityMechanismHandler type="CRAM_MD5">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
-	</securityMechanisms>
-</security>
-
-<!-- Transport Layer properties -->
-<useSharedReadWritePool>true</useSharedReadWritePool>
-<enableDirectBuffers>true</enableDirectBuffers>
-<enablePooledAllocator>false</enablePooledAllocator>
-<tcpNoDelay>true</tcpNoDelay>
-<sendBufferSizeInKb>32</sendBufferSizeInKb>
-<reciveBufferSizeInKb>32</reciveBufferSizeInKb>
-<qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass>
-
-<!-- Execution Layer properties -->
-<maxAccumilatedResponses>20</maxAccumilatedResponses>
-
-<!-- Model Phase properties -->
-<serverTimeoutInMilliSeconds>1000</serverTimeoutInMilliSeconds>
-<maxAccumilatedResponses>20</maxAccumilatedResponses>
-<stateManager></stateManager>
-<stateListerners>
-	<stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CONNECTION_STATE">
-		<stateListerner></stateListerner>
-    </stateType>			
-   	<stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CHANNEL_STATE">
-		<stateListerner></stateListerner>
-    </stateType>			
-</stateListerners>
-
-<methodListeners>
-  <methodListener class="org.apache.qpid.nclient.amqp.AMQPConnection">
-  	<methodClass>org.apache.qpid.framing.ConnectionStartBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.ConnectionSecureBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.ConnectionTuneBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.ConnectionOpenOkBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.ConnectionCloseBody</methodClass>
-    <methodClass>org.apache.qpid.framing.ConnectionCloseOkBody</methodClass>  	
-    
-  	<methodClass>org.apache.qpid.framing.ChannelOpenOkBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.ChannelCloseBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.ChannelCloseOkBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.ChannelFlowBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.ChannelFlowOkBody</methodClass>
-    <methodClass>org.apache.qpid.framing.ChannelOkBody</methodClass>    	
-  
-  	<methodClass>org.apache.qpid.framing.ExchangeDeclareOkBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.ExchangeDeleteOkBody</methodClass>  	
-  
-	<methodClass>org.apache.qpid.framing.QueueDeclareOkBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.QueueBindOkBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.QueueUnbindOkBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.QueuePurgeOkBody</methodClass>
-  	<methodClass>org.apache.qpid.framing.QueueDeleteOkBody</methodClass>	
-
-  	<methodClass>org.apache.qpid.framing.MessageAppendBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageCancelBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageCheckpointBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageCloseBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageGetBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageOffsetBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageOkBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageOpenBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageQosBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageRecoverBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageRejectBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageResumeBody</methodClass>
-	<methodClass>org.apache.qpid.framing.MessageTransferBody</methodClass>
-  </methodListener>  
-</methodListeners>
-
-<phasePipe>
-	<phase index="0">org.apache.qpid.nclient.transport.TransportPhase<phase>
-	<phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase<phase>
-	<phase index="2">org.apache.qpid.nclient.model.ModelPhase<phase>
-</phasePipe>
+	<security>
+		<saslClientFactoryTypes>
+			<saslClientFactory type="AMQPLAIN">org.apache.qpid.nclient.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory>
+		</saslClientFactoryTypes>
+		<securityMechanisms>
+			<securityMechanismHandler type="PLAIN">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+			<securityMechanismHandler type="CRAM_MD5">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+		</securityMechanisms>
+	</security>
+	
+	<!-- Transport Layer properties -->
+	<useSharedReadWritePool>false</useSharedReadWritePool>
+	<enableDirectBuffers>true</enableDirectBuffers>
+	<enablePooledAllocator>false</enablePooledAllocator>
+	<tcpNoDelay>true</tcpNoDelay>
+	<sendBufferSizeInKb>32</sendBufferSizeInKb>
+	<reciveBufferSizeInKb>32</reciveBufferSizeInKb>
+	<qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass>
+	
+	<!-- Execution Layer properties -->
+	<maxAccumilatedResponses>20</maxAccumilatedResponses>
+	
+	<!-- Model Phase properties -->
+	<serverTimeoutInMilliSeconds>60000</serverTimeoutInMilliSeconds>
+	<maxAccumilatedResponses>20</maxAccumilatedResponses>
+	
+	<phasePipe>
+		<phase index="0">org.apache.qpid.nclient.transport.TransportPhase</phase>
+		<phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase</phase>
+		<phase index="2">org.apache.qpid.nclient.model.ModelPhase</phase>
+	</phasePipe>
 
 </qpidClientConfig>

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java Thu Mar 29 15:24:20 2007
@@ -24,19 +24,22 @@
      */
     public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
     {
+	String key = QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE;
 	Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
-	List<String> list = ClientConfiguration.get().getList(QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE);
+	List<String> list = ClientConfiguration.get().getList(key);
+	int index = 0;
 	for(String s:list)
 	{
 	    try
 	    {
-		Phase temp = (Phase)Class.forName(ClientConfiguration.get().getString(s)).newInstance();
-		phaseMap.put(ClientConfiguration.get().getInt(s + "." + QpidConstants.INDEX),temp) ;
+		Phase temp = (Phase)Class.forName(s).newInstance();
+		phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index +  ")." + QpidConstants.INDEX),temp) ;
 	    }
 	    catch(Exception e)
 	    {
 		throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e);
 	    }    
+	    index++;
 	}
 	
 	Phase current = null;
@@ -46,7 +49,7 @@
 	for (int i=0; i<phaseMap.size();i++)
 	{
 	   current = phaseMap.get(i);	   
-	   if (1+1 < phaseMap.size())
+	   if (i+1 < phaseMap.size())
 	   {
 	       next = phaseMap.get(i+1);
 	   }

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java Thu Mar 29 15:24:20 2007
@@ -10,13 +10,7 @@
 	// Phase Context properties
 	public final static String AMQP_BROKER_DETAILS = "AMQP_BROKER_DETAILS";
 	public final static String MINA_IO_CONNECTOR = "MINA_IO_CONNECTOR";
-	//public final static String AMQP_MAJOR_VERSION = "AMQP_MAJOR_VERSION";
-	//public final static String AMQP_MINOR_VERSION = "AMQP_MINOR_VERSION";
-	//public final static String AMQP_SASL_CLIENT = "AMQP_SASL_CLIENT";
-	//public final static String AMQP_CLIENT_ID = "AMQP_CLIENT_ID";
-	//public final static String AMQP_CONNECTION_TUNE_PARAMETERS = "AMQP_CONNECTION_TUNE_PARAMETERS";
-	//public final static String AMQP_VIRTUAL_HOST = "AMQP_VIRTUAL_HOST";	
-	//public final static String AMQP_MESSAGE_STORE = "AMQP_MESSAGE_STORE";
+	public final static String EVENT_MANAGER = "EVENT_MANAGER";
 	
 	/**---------------------------------------------------------------
 	 * 	Configuration file properties
@@ -24,17 +18,7 @@
 	*/
 	
 	// Model Layer properties
-	public final static String STATE_MANAGER = "stateManager";
-	public final static String METHOD_LISTENERS = "methodListeners";
-	public final static String METHOD_LISTENER = "methodListener";
-	public final static String CLASS = "[@class]";
-	public final static String METHOD_CLASS = "methodClass";
 	
-	public final static String STATE_LISTENERS = "stateListeners";
-	public final static String STATE_LISTENER = "stateListener";
-	public final static String STATE_TYPE = "stateType";
-	
-	public final static String AMQP_MESSAGE_STORE_CLASS = "AMQP_MESSAGE_STORE_CLASS";
 	public final static String SERVER_TIMEOUT_IN_MILLISECONDS = "serverTimeoutInMilliSeconds";
 
 	// MINA properties
@@ -50,6 +34,7 @@
 	public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY  = "saslClientFactory";
 	public final static String TYPE = "[@type]";
 	
+	public final static String AMQP_SECURITY = "security"; 
 	public final static String AMQP_SECURITY_MECHANISMS = "securityMechanisms";
 	public final static String AMQP_SECURITY_MECHANISM_HANDLER = "securityMechanismHandler";
 		

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java Thu Mar 29 15:24:20 2007
@@ -10,142 +10,179 @@
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQRequestBody;
 import org.apache.qpid.framing.AMQResponseBody;
-import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
 import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.AbstractPhase;
 import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 /**
  * Corressponds to the Layer 2 in AMQP.
  * This phase handles the correlation of amqp messages
  * This class implements the 0.9 spec (request/response) 
  */
-public class ExecutionPhase extends AbstractPhase{
+public class ExecutionPhase extends AbstractPhase
+{
+
+    protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);
 
-	protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);
     protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
+
     protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();
 
-    
-	/**
-	 * --------------------------------------------------
-	 * Phase related methods
-	 * --------------------------------------------------
-	 */	
-    
+    /**
+     * --------------------------------------------------
+     * Phase related methods
+     * --------------------------------------------------
+     */
+
     // should add these in the init method
     //_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
     //_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
-    
-	public void messageReceived(Object msg) throws AMQPException 
-	{		
-		AMQFrame frame = (AMQFrame) msg;
-        final AMQBody bodyFrame = frame.getBodyFrame();
-
-        if (bodyFrame instanceof AMQRequestBody)
-        {   
-        	AMQPMethodEvent event;
-			try 
-			{
-				event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody)bodyFrame);
-				super.messageReceived(event);
-			} 
-			catch (Exception e) 
-			{
-				_logger.error("Error handling request",e);
-			}
-            
-        }
-        else if (bodyFrame instanceof AMQResponseBody)
-        {
-           List<AMQPMethodEvent> events;
-		   try 
-		   {
-			   events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody)bodyFrame);
-			   for (AMQPMethodEvent event: events)
-			   {
-		        	  super.messageReceived(event);  
-		       }
-		   }
-		   catch (Exception e) 
-		   {
-			   _logger.error("Error handling response",e);
-		   }           
-        }
-	}
-
-	/**
-	 * Need to figure out if the message is a request or a response 
-	 * that needs to be sent and then delegate it to the Request or response manager 
-	 * to prepare it.
-	 */
-	public void messageSent(Object msg) throws AMQPException 
+    public void messageReceived(Object msg) throws AMQPException
+    {
+	AMQFrame frame = (AMQFrame) msg;
+	final AMQBody bodyFrame = frame.getBodyFrame();
+
+	if (bodyFrame instanceof AMQRequestBody)
 	{
-		AMQPMethodEvent evt = (AMQPMethodEvent)msg;		
-		if(evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
+	    AMQPMethodEvent event;
+	    try
+	    {
+		event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody) bodyFrame);
+		super.messageReceived(event);
+	    }
+	    catch (Exception e)
+	    {
+		_logger.error("Error handling request", e);
+	    }
+
+	}
+	else if (bodyFrame instanceof AMQResponseBody)
+	{
+	    List<AMQPMethodEvent> events;
+	    try
+	    {
+		events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody) bodyFrame);
+		for (AMQPMethodEvent event : events)
 		{
-			// This is a request
-			AMQFrame frame = handleRequest(evt);
-			super.messageSent(frame);
+		    super.messageReceived(event);
 		}
-		else
-		{
-//			 This is a response
-			List<AMQFrame> frames = handleResponse(evt);
-			for(AMQFrame frame: frames)
-			{
-				super.messageSent(frame);
-			}
-		}		
+	    }
+	    catch (Exception e)
+	    {
+		_logger.error("Error handling response", e);
+	    }
+	}
+    }
+
+    /**
+     * Need to figure out if the message is a request or a response 
+     * that needs to be sent and then delegate it to the Request or response manager 
+     * to prepare it.
+     */
+    public void messageSent(Object msg) throws AMQPException
+    {
+	AMQPMethodEvent evt = (AMQPMethodEvent) msg;
+	if (evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
+	{
+	    // This is a request
+	    AMQFrame frame = handleRequest(evt);
+	    super.messageSent(frame);
 	}
+	else
+	{
+	    //			 This is a response
+	    List<AMQFrame> frames = handleResponse(evt);
+	    for (AMQFrame frame : frames)
+	    {
+		super.messageSent(frame);
+	    }
+	}
+    }
 
-	/**
-	 * ------------------------------------------------
-	 * Methods to handle request response
-	 * -----------------------------------------------
-	 */
+    /**
+     * ------------------------------------------------
+     * Methods to handle request response
+     * -----------------------------------------------
+     */
     private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
     {
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Request frame received: " + requestBody);
-        }
-        ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelId);
-        if (responseManager == null)
-            throw new AMQException("Unable to find ResponseManager for channel " + channelId);
-        return responseManager.requestReceived(requestBody);
+	if (_logger.isDebugEnabled())
+	{
+	    _logger.debug("Request frame received: " + requestBody);
+	}
+	
+	ResponseManager responseManager;
+	if(_channelId2ResponseMgrMap.containsKey(channelId))
+	{
+	    responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);		
+	}	
+	else
+	{
+	    responseManager = new ResponseManager(0,channelId,false);
+	    _channelId2ResponseMgrMap.put(channelId, responseManager);
+	}
+	return responseManager.requestReceived(requestBody);
     }
-    
-    private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) throws Exception
+
+    private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody)
+	    throws Exception
     {
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Response frame received: " + responseBody);
-        }
-        RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelId);
-        if (requestManager == null)
-            throw new AMQException("Unable to find RequestManager for channel " + channelId);
-        return requestManager.responseReceived(responseBody);
+	if (_logger.isDebugEnabled())
+	{
+	    _logger.debug("Response frame received: " + responseBody);
+	}
+
+	RequestManager requestManager;
+	if (_channelId2RequestMgrMap.containsKey(channelId))
+	{
+	    requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
+	}
+	else
+	{
+	    requestManager = new RequestManager(0,channelId,false);
+	    _channelId2RequestMgrMap.put(channelId, requestManager);
+	}
+	    
+	return requestManager.responseReceived(responseBody);
     }
-    
+
     private AMQFrame handleRequest(AMQPMethodEvent evt)
     {
-    	RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(evt.getChannelId());
-    	return requestManager.sendRequest(evt);
+	int channelId =  evt.getChannelId();
+	RequestManager requestManager;
+	if (_channelId2RequestMgrMap.containsKey(channelId))
+	{
+	    requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
+	}
+	else
+	{
+	    requestManager = new RequestManager(0,channelId,false);
+	    _channelId2RequestMgrMap.put(channelId, requestManager);
+	}
+	return requestManager.sendRequest(evt);
     }
-    
+
     private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException
     {
-    	ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(evt.getChannelId());
-    	try
-    	{
-    		return responseManager.sendResponse(evt);
-    	}
-    	catch(Exception e)
-    	{
-    		throw new AMQPException("Error handling response",e);
-    	}
+	int channelId =  evt.getChannelId();
+	ResponseManager responseManager;	
+	if(_channelId2ResponseMgrMap.containsKey(channelId))
+	{
+	    responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);		
+	}	
+	else
+	{
+	    responseManager = new ResponseManager(0,channelId,false);
+	    _channelId2ResponseMgrMap.put(channelId, responseManager);
+	}
+	try
+	{
+	    return responseManager.sendResponse(evt);
+	}
+	catch (Exception e)
+	{
+	    throw new AMQPException("Error handling response", e);
+	}
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java Thu Mar 29 15:24:20 2007
@@ -29,7 +29,7 @@
 import org.apache.qpid.framing.AMQRequestBody;
 import org.apache.qpid.framing.AMQResponseBody;
 import org.apache.qpid.framing.RequestResponseMappingException;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
 
 public class RequestManager
 {
@@ -56,7 +56,7 @@
      */
     private long lastProcessedResponseId;
 
-    private ConcurrentHashMap<Long, Long> requestSentMap;
+    private ConcurrentHashMap<Long, CorrelationID> requestSentMap;
 
     public RequestManager(long connectionId, int channel, boolean serverFlag)
     {
@@ -65,7 +65,7 @@
         this.connectionId = connectionId;
         requestIdCount = 1L;
         lastProcessedResponseId = 0L;
-        requestSentMap = new ConcurrentHashMap<Long, Long>();
+        requestSentMap = new ConcurrentHashMap<Long, CorrelationID>();
     }
 
     // *** Functions to originate a request ***
@@ -80,7 +80,7 @@
             logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
                 "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod());
         }
-        requestSentMap.put(requestId, evt.getCorrelationId());
+        requestSentMap.put(requestId, new CorrelationID(evt.getCorrelationId(), evt.getLocalCorrelationId()));
         return requestFrame;
     }
 
@@ -103,9 +103,9 @@
                 throw new RequestResponseMappingException(requestId,
                     "Failed to locate requestId " + requestId + " in requestSentMap.");
             }
-            long localCorrelationId = requestSentMap.get(requestId);
+            CorrelationID correlationID = requestSentMap.get(requestId);
             AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(),
-                requestId,localCorrelationId);
+        	    correlationID.getSystemCorrelationID(),correlationID.getLocalCorrelationID());
             events.add(methodEvent);
             requestSentMap.remove(requestId);
         }
@@ -125,5 +125,41 @@
     private long getNextRequestId()
     {
         return requestIdCount++;
+    }
+    
+    private class CorrelationID
+    {
+	// Use for the request/response stuff
+	private long _systemCorrelationID;
+	// used internally to track callbacks
+	private long _localCorrelationID;
+	
+	CorrelationID(long systemCorrelationID,long localCorrelationID)
+	{
+	    _localCorrelationID = localCorrelationID;
+	    _systemCorrelationID = systemCorrelationID;
+	}
+
+	public long getLocalCorrelationID()
+	{
+	    return _localCorrelationID;
+	}
+
+	public void setLocalCorrelationID(long correlationID)
+	{
+	    _localCorrelationID = correlationID;
+	}
+
+	public long getSystemCorrelationID()
+	{
+	    return _systemCorrelationID;
+	}
+
+	public void setSystemCorrelationID(long correlationID)
+	{
+	    _systemCorrelationID = correlationID;
+	}
+	
+	
     }
 } 

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java Thu Mar 29 15:24:20 2007
@@ -31,9 +31,9 @@
 import org.apache.qpid.framing.AMQRequestBody;
 import org.apache.qpid.framing.AMQResponseBody;
 import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
 import org.apache.qpid.nclient.config.ClientConfiguration;
 import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
 
 public class ResponseManager
 {

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java Thu Mar 29 15:24:20 2007
@@ -24,8 +24,6 @@
 import java.util.LinkedList;
 import java.util.List;
 
-import org.apache.qpid.client.message.MessageHeaders;
-
 public class AMQPApplicationMessage {
 
 	private int bytesReceived = 0;

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,679 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.qpid.nclient.message;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import java.util.Enumeration;
+
+public class MessageHeaders
+{
+    private static final Logger _logger = Logger.getLogger(MessageHeaders.class);
+    
+    private AMQShortString _contentType;
+
+    private AMQShortString _encoding;
+    
+    private AMQShortString _destination;
+    
+    private AMQShortString _exchange;
+
+    private FieldTable _jmsHeaders;
+
+    private short _deliveryMode;
+
+    private short _priority;
+
+    private AMQShortString _correlationId;
+
+    private AMQShortString _replyTo;
+
+    private long _expiration;
+
+    private AMQShortString _messageId;
+
+    private long _timestamp;
+
+    private AMQShortString _type;
+
+    private AMQShortString _userId;
+
+    private AMQShortString _appId;
+
+    private AMQShortString _transactionId;
+
+    private AMQShortString _routingKey;
+    
+    private int _size;
+    
+    public int getSize()
+    {
+		return _size;
+	}
+
+	public void setSize(int size)
+    {
+		this._size = size;
+	}
+
+	public MessageHeaders()
+    {
+    }
+
+    public AMQShortString getContentType()
+    {
+        return _contentType;
+    }
+
+    public void setContentType(AMQShortString contentType)
+    {
+        _contentType = contentType;
+    }
+
+    public AMQShortString getEncoding()
+    {
+        return _encoding;
+    }
+
+    public void setEncoding(AMQShortString encoding)
+    {
+        _encoding = encoding;
+    }
+
+    public FieldTable getJMSHeaders()
+    {
+        if (_jmsHeaders == null)
+        {
+            setJMSHeaders(FieldTableFactory.newFieldTable());
+        }
+
+        return _jmsHeaders;
+    }
+
+    public void setJMSHeaders(FieldTable headers)
+    {
+        _jmsHeaders = headers;
+    }
+
+
+    public short getDeliveryMode()
+    {
+        return _deliveryMode;
+    }
+
+    public void setDeliveryMode(short deliveryMode)
+    {
+        _deliveryMode = deliveryMode;
+    }
+
+    public short getPriority()
+    {
+        return _priority;
+    }
+
+    public void setPriority(short priority)
+    {
+        _priority = priority;
+    }
+
+    public AMQShortString getCorrelationId()
+    {
+        return _correlationId;
+    }
+
+    public void setCorrelationId(AMQShortString correlationId)
+    {
+        _correlationId = correlationId;
+    }
+
+    public AMQShortString getReplyTo()
+    {
+        return _replyTo;
+    }
+    
+    public void setReplyTo(AMQShortString replyTo)
+    {
+        _replyTo = replyTo;
+    }
+
+    public long getExpiration()
+    {
+        return _expiration;
+    }
+
+    public void setExpiration(long expiration)
+    {
+        _expiration = expiration;
+    }
+
+
+    public AMQShortString getMessageId()
+    {
+        return _messageId;
+    }
+
+    public void setMessageId(AMQShortString messageId)
+    {
+        _messageId = messageId;
+    }
+
+    public long getTimestamp()
+    {
+        return _timestamp;
+    }
+
+    public void setTimestamp(long timestamp)
+    {
+        _timestamp = timestamp;
+    }
+
+    public AMQShortString getType()
+    {
+        return _type;
+    }
+
+    public void setType(AMQShortString type)
+    {
+        _type = type;
+    }
+
+    public AMQShortString getUserId()
+    {
+        return _userId;
+    }
+
+    public void setUserId(AMQShortString userId)
+    {
+        _userId = userId;
+    }
+
+    public AMQShortString getAppId()
+    {
+        return _appId;
+    }
+
+    public void setAppId(AMQShortString appId)
+    {
+        _appId = appId;
+    }
+
+    // MapMessage  Interface
+
+    public boolean getBoolean(AMQShortString string) throws JMSException
+    {
+        Boolean b = getJMSHeaders().getBoolean(string);
+
+        if (b == null)
+        {
+            if (getJMSHeaders().containsKey(string))
+            {
+                Object str = getJMSHeaders().getObject(string);
+
+                if (str == null || !(str instanceof AMQShortString))
+                {
+                    throw new MessageFormatException("getBoolean can't use " + string + " item.");
+                }
+                else
+                {
+                    return Boolean.valueOf(((AMQShortString)str).asString());
+                }
+            }
+            else
+            {
+                b = Boolean.valueOf(null);
+            }
+        }
+
+        return b;
+    }
+
+    public char getCharacter(AMQShortString string) throws JMSException
+    {
+        Character c = getJMSHeaders().getCharacter(string);
+
+        if (c == null)
+        {
+            if (getJMSHeaders().isNullStringValue(string.asString()))
+            {
+                throw new NullPointerException("Cannot convert null char");
+            }
+            else
+            {
+                throw new MessageFormatException("getChar can't use " + string + " item.");
+            }
+        }
+        else
+        {
+            return (char) c;
+        }
+    }
+
+    public byte[] getBytes(AMQShortString string) throws JMSException
+    {
+        byte[] bs = getJMSHeaders().getBytes(string);
+
+        if (bs == null)
+        {
+            throw new MessageFormatException("getBytes can't use " + string + " item.");
+        }
+        else
+        {
+            return bs;
+        }
+    }
+
+    public byte getByte(AMQShortString string) throws JMSException
+    {
+            Byte b = getJMSHeaders().getByte(string);
+            if (b == null)
+            {
+                if (getJMSHeaders().containsKey(string))
+                {
+                    Object str = getJMSHeaders().getObject(string);
+
+                    if (str == null || !(str instanceof AMQShortString))
+                    {
+                        throw new MessageFormatException("getByte can't use " + string + " item.");
+                    }
+                    else
+                    {
+                        return Byte.valueOf(((AMQShortString)str).asString());
+                    }
+                }
+                else
+                {
+                    b = Byte.valueOf(null);
+                }
+            }
+
+            return b;
+    }
+
+    public short getShort(AMQShortString string) throws JMSException
+    {
+            Short s = getJMSHeaders().getShort(string);
+
+            if (s == null)
+            {
+                s = Short.valueOf(getByte(string));
+            }
+
+            return s;
+    }
+
+    public int getInteger(AMQShortString string) throws JMSException
+    {
+            Integer i = getJMSHeaders().getInteger(string);
+
+            if (i == null)
+            {
+                i = Integer.valueOf(getShort(string));
+            }
+
+            return i;
+    }
+
+    public long getLong(AMQShortString string) throws JMSException
+    {
+            Long l = getJMSHeaders().getLong(string);
+
+            if (l == null)
+            {
+                l = Long.valueOf(getInteger(string));
+            }
+
+            return l;
+    }
+
+    public float getFloat(AMQShortString string) throws JMSException
+    {
+            Float f = getJMSHeaders().getFloat(string);
+
+            if (f == null)
+            {
+                if (getJMSHeaders().containsKey(string))
+                {
+                    Object str = getJMSHeaders().getObject(string);
+
+                    if (str == null || !(str instanceof AMQShortString))
+                    {
+                        throw new MessageFormatException("getFloat can't use " + string + " item.");
+                    }
+                    else
+                    {
+                        return Float.valueOf(((AMQShortString)str).asString());
+                    }
+                }
+                else
+                {
+                    f = Float.valueOf(null);
+                }
+
+            }
+
+            return f;
+    }
+
+    public double getDouble(AMQShortString string) throws JMSException
+    {
+            Double d = getJMSHeaders().getDouble(string);
+
+            if (d == null)
+            {
+                d = Double.valueOf(getFloat(string));
+            }
+
+            return d;
+    }
+
+    public AMQShortString getString(AMQShortString string) throws JMSException
+    {
+        AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString()));
+
+        if (s == null)
+        {
+            if (getJMSHeaders().containsKey(string))
+            {
+                Object o = getJMSHeaders().getObject(string);
+                if (o instanceof byte[])
+                {
+                    throw new MessageFormatException("getObject couldn't find " + string + " item.");
+                }
+                else
+                {
+                    if (o == null)
+                    {
+                        return null;
+                    }
+                    else
+                    {
+                        s = (AMQShortString) o;
+                    }
+                }
+            }
+        }
+
+        return s;
+    }
+
+    public Object getObject(AMQShortString string) throws JMSException
+    {
+        return getJMSHeaders().getObject(string);
+    }
+
+    public void setBoolean(AMQShortString string, boolean b) throws JMSException
+    {
+        checkPropertyName(string);
+        getJMSHeaders().setBoolean(string, b);
+    }
+
+    public void setChar(AMQShortString string, char c) throws JMSException
+    {
+        checkPropertyName(string);
+        getJMSHeaders().setChar(string, c);
+    }
+
+    public Object setBytes(AMQShortString string, byte[] bytes)
+    {
+        return getJMSHeaders().setBytes(string, bytes);
+    }
+
+    public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
+    {
+        return getJMSHeaders().setBytes(string, bytes, start, length);
+    }
+
+    public void setByte(AMQShortString string, byte b) throws JMSException
+    {
+        checkPropertyName(string);
+        getJMSHeaders().setByte(string, b);
+    }
+
+    public void setShort(AMQShortString string, short i) throws JMSException
+    {
+        checkPropertyName(string);
+        getJMSHeaders().setShort(string, i);
+    }
+
+    public void setInteger(AMQShortString string, int i) throws JMSException
+    {
+        checkPropertyName(string);
+        getJMSHeaders().setInteger(string, i);
+    }
+
+    public void setLong(AMQShortString string, long l) throws JMSException
+    {
+        checkPropertyName(string);
+        getJMSHeaders().setLong(string, l);
+    }
+
+    public void setFloat(AMQShortString string, float v) throws JMSException
+    {
+        checkPropertyName(string);
+        getJMSHeaders().setFloat(string, v);
+    }
+
+    public void setDouble(AMQShortString string, double v) throws JMSException
+    {
+        checkPropertyName(string);
+        getJMSHeaders().setDouble(string, v);
+    }
+
+    public void setString(AMQShortString string, AMQShortString string1) throws JMSException
+    {
+        checkPropertyName(string);
+        getJMSHeaders().setString(string.asString(), string1.asString());
+    }
+
+    public void setObject(AMQShortString string, Object object) throws JMSException
+    {
+        checkPropertyName(string);
+        try
+        {
+            getJMSHeaders().setObject(string, object);
+        }
+        catch (AMQPInvalidClassException aice)
+        {
+            throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+        }
+    }
+
+    public boolean itemExists(AMQShortString string) throws JMSException
+    {
+        return getJMSHeaders().containsKey(string);
+    }
+
+    public Enumeration getPropertyNames()
+    {
+        return getJMSHeaders().getPropertyNames();
+    }
+
+    public void clear()
+    {
+        getJMSHeaders().clear();
+    }
+
+    public boolean propertyExists(AMQShortString propertyName)
+    {
+        return getJMSHeaders().propertyExists(propertyName);
+    }
+
+    public Object put(Object key, Object value)
+    {
+        return getJMSHeaders().setObject(key.toString(), value);
+    }
+
+    public Object remove(AMQShortString propertyName)
+    {
+        return getJMSHeaders().remove(propertyName);
+    }
+
+    public boolean isEmpty()
+    {
+        return getJMSHeaders().isEmpty();
+    }
+
+    public void writeToBuffer(ByteBuffer data)
+    {
+        getJMSHeaders().writeToBuffer(data);
+    }
+
+    public Enumeration getMapNames()
+    {
+        return getPropertyNames();
+    }
+
+    protected static void checkPropertyName(CharSequence propertyName)
+    {
+        if (propertyName == null)
+        {
+            throw new IllegalArgumentException("Property name must not be null");
+        }
+        else if (propertyName.length() == 0)
+        {
+            throw new IllegalArgumentException("Property name must not be the empty string");
+        }
+
+        checkIdentiferFormat(propertyName);
+    }
+
+    protected static void checkIdentiferFormat(CharSequence propertyName)
+    {
+//        JMS requirements 3.5.1 Property Names
+//        Identifiers:
+//        - An identifier is an unlimited-length character sequence that must begin
+//          with a Java identifier start character; all following characters must be Java
+//          identifier part characters. An identifier start character is any character for
+//          which the method Character.isJavaIdentifierStart returns true. This includes
+//          '_' and '$'. An identifier part character is any character for which the
+//          method Character.isJavaIdentifierPart returns true.
+//        - Identifiers cannot be the names NULL, TRUE, or FALSE.
+//        – Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
+//          ESCAPE.
+//        – Identifiers are either header field references or property references. The
+//          type of a property value in a message selector corresponds to the type
+//          used to set the property. If a property that does not exist in a message is
+//          referenced, its value is NULL. The semantics of evaluating NULL values
+//          in a selector are described in Section 3.8.1.2, “Null Values.”
+//        – The conversions that apply to the get methods for properties do not
+//          apply when a property is used in a message selector expression. For
+//          example, suppose you set a property as a string value, as in the
+//          following:
+//              myMessage.setStringProperty("NumberOfOrders", "2");
+//          The following expression in a message selector would evaluate to false,
+//          because a string cannot be used in an arithmetic expression:
+//          "NumberOfOrders > 1"
+//        – Identifiers are case sensitive.
+//        – Message header field references are restricted to JMSDeliveryMode,
+//          JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
+//          JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
+//          null and if so are treated as a NULL value.
+
+        if (Boolean.getBoolean("strict-jms"))
+        {
+            // JMS start character
+            if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
+            {
+                throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
+            }
+
+            // JMS part character
+            int length = propertyName.length();
+            for (int c = 1; c < length; c++)
+            {
+                if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
+                {
+                    throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
+                }
+            }
+
+
+
+
+            // JMS invalid names
+            if ((propertyName.equals("NULL")
+                 || propertyName.equals("TRUE")
+                 || propertyName.equals("FALSE")
+                 || propertyName.equals("NOT")
+                 || propertyName.equals("AND")
+                 || propertyName.equals("OR")
+                 || propertyName.equals("BETWEEN")
+                 || propertyName.equals("LIKE")
+                 || propertyName.equals("IN")
+                 || propertyName.equals("IS")
+                 || propertyName.equals("ESCAPE")))
+            {
+                throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
+            }
+        }
+
+    }
+
+	public AMQShortString getTransactionId()
+    {
+		return _transactionId;
+	}
+
+	public void setTransactionId(AMQShortString id)
+    {
+		_transactionId = id;
+	}
+
+	public AMQShortString getDestination()
+    {
+		return _destination;
+	}
+
+	public void setDestination(AMQShortString destination)
+    {
+		this._destination = destination;
+	}
+
+	public AMQShortString getExchange()
+    {
+		return _exchange;
+	}
+
+	public void setExchange(AMQShortString exchange)
+    {
+		this._exchange = exchange;
+	}
+
+	public AMQShortString getRoutingKey()
+    {
+		return _routingKey;
+	}
+
+	public void setRoutingKey(AMQShortString routingKey)
+    {
+		this._routingKey = routingKey;
+	}
+}
+
+   

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java Thu Mar 29 15:24:20 2007
@@ -1,7 +1,6 @@
 package org.apache.qpid.nclient.message;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
 
 public interface MessageStore {
 	

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java Thu Mar 29 15:24:20 2007
@@ -4,7 +4,6 @@
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
 
 public class TransientMessageStore implements MessageStore {
 

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java Thu Mar 29 15:24:20 2007
@@ -1,13 +1,12 @@
 package org.apache.qpid.nclient.model;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.log4j.Logger;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
 import org.apache.qpid.nclient.core.AMQPException;
 import org.apache.qpid.nclient.core.AbstractPhase;
 import org.apache.qpid.nclient.core.Phase;
@@ -31,15 +30,7 @@
 	 */    
 	public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase) 
 	{
-		super.init(ctx, nextInFlowPhase, nextOutFlowPhase);
-		try
-		{
-			loadMethodListeners();
-		}
-		catch(Exception e)
-		{
-			_logger.fatal("Error loading method listeners", e);
-		}
+		super.init(ctx, nextInFlowPhase, nextOutFlowPhase);		
 	}
 
 	public void messageReceived(Object msg) throws AMQPException 
@@ -68,66 +59,13 @@
 	
 	public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException
 	{
-		if (_methodListners.containsKey(event.getMethod().getClass()))
-		{
-			List<AMQPMethodListener> listeners = _methodListners.get(event.getMethod().getClass()); 
-		
-			if(listeners.size()>0)
-			{
-				throw new AMQPException("There are no registered listeners for this method");
-			}
-			
-			for(AMQPMethodListener l : listeners)
-			{
-				try 
-				{
-					l.methodReceived(event);
-				} 
-				catch (Exception e) 
-				{
-					_logger.error("Error handling method event " +  event, e);
-				}
-			}
-		}
+	    AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(QpidConstants.EVENT_MANAGER);
+	    eventManager.notifyEvent(event);	    
 	}
 	
 	/**
 	 * ------------------------------------------------
 	 *  Configuration 
 	 * ------------------------------------------------
-	 */    
-
-	/**
-	 * This method loads method listeners from the client.xml file
-	 * For each method class there is a list of listeners
-	 */
-	private void loadMethodListeners() throws Exception
-	{
-		int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER);
-		System.out.println(count);
-				
-		for(int i=0 ;i<count;i++)
-		{
-			String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")";
-			String className =  ClientConfiguration.get().getString(methodListener + "." + QpidConstants.CLASS);
-			Class listenerClass = Class.forName(className);
-			List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS);
-			for(String s:list)
-			{
-				List listeners;
-				Class methodClass = Class.forName(s);
-				if (_methodListners.containsKey(methodClass))
-				{
-					listeners = _methodListners.get(methodClass); 
-				}
-				else
-				{
-					listeners = new ArrayList();
-					_methodListners.put(methodClass,listeners);
-				}
-				listeners.add(listenerClass);
-			}
-		}		
-	}
-	
+	 */	
 }

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java Thu Mar 29 15:24:20 2007
@@ -62,11 +62,16 @@
 
     private void parseProperties()
     {
-    	List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_MECHANISMS);
-    	
-        for (String mechanism : mechanisms)
+	String key = QpidConstants.AMQP_SECURITY + "." + 
+        QpidConstants.AMQP_SECURITY_MECHANISMS + "." +
+        QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER;
+        
+        int index = ClientConfiguration.get().getMaxIndex(key);                                       
+        	
+        for (int i=0; i<index+1;i++)
         {
-            String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER + "_" + mechanism);
+            String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]");
+            String className = ClientConfiguration.get().getString(key + "(" + i + ")" );
             Class clazz = null;
             try
             {

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security.amqplain;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.Callback;
+
+/**
+ * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd.
+ *
+ */
+public class AmqPlainSaslClient implements SaslClient
+{
+    /**
+     *  The name of this mechanism
+     */
+    public static final String MECHANISM = "AMQPLAIN";
+
+    private CallbackHandler _cbh;
+
+    public AmqPlainSaslClient(CallbackHandler cbh)
+    {
+        _cbh = cbh;
+    }
+
+    public String getMechanismName()
+    {
+        return "AMQPLAIN";
+    }
+
+    public boolean hasInitialResponse()
+    {
+        return true;
+    }
+
+    public byte[] evaluateChallenge(byte[] challenge) throws SaslException
+    {
+        // we do not care about the prompt or the default name
+        NameCallback nameCallback = new NameCallback("prompt", "defaultName");
+        PasswordCallback pwdCallback = new PasswordCallback("prompt", false);
+        Callback[] callbacks = new Callback[]{nameCallback, pwdCallback};
+        try
+        {
+            _cbh.handle(callbacks);
+        }
+        catch (Exception e)
+        {
+            throw new SaslException("Error handling SASL callbacks: " + e, e);
+        }
+        FieldTable table = FieldTableFactory.newFieldTable();
+        table.setString("LOGIN", nameCallback.getName());
+        table.setString("PASSWORD", new String(pwdCallback.getPassword()));
+        return table.getDataAsBytes();
+    }
+
+    public boolean isComplete()
+    {
+        return true;
+    }
+
+    public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
+    {
+        throw new SaslException("Not supported");
+    }
+
+    public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException
+    {
+        throw new SaslException("Not supported");
+    }
+
+    public Object getNegotiatedProperty(String propName)
+    {
+        return null;
+    }
+
+    public void dispose() throws SaslException
+    {
+        _cbh = null;
+    }
+}

Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security.amqplain;
+
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.Sasl;
+import javax.security.auth.callback.CallbackHandler;
+import java.util.Map;
+
+public class AmqPlainSaslClientFactory implements SaslClientFactory
+{
+    public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException
+    {
+        for (int i = 0; i < mechanisms.length; i++)
+        {
+            if (mechanisms[i].equals(AmqPlainSaslClient.MECHANISM))
+            {
+                if (cbh == null)
+                {
+                    throw new SaslException("CallbackHandler must not be null");
+                }
+                return new AmqPlainSaslClient(cbh);
+            }
+        }
+        return null;
+    }
+
+    public String[] getMechanismNames(Map props)
+    {
+        if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+            props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+            props.containsKey(Sasl.POLICY_NOACTIVE))
+        {
+            // returned array must be non null according to interface documentation
+            return new String[0];
+        }
+        else
+        {
+            return new String[]{AmqPlainSaslClient.MECHANISM};
+        }
+    }
+}

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java Thu Mar 29 15:24:20 2007
@@ -22,10 +22,12 @@
     private BrokerDetails _brokerDetails;
     private IoConnector _ioConnector;
     private Phase _phase;  
+    private PhaseContext _ctx;
     
-    protected TCPConnection(ConnectionURL url)
+    protected TCPConnection(ConnectionURL url, PhaseContext ctx)
     {
 	_brokerDetails = url.getBrokerDetails(0);
+	_ctx = ctx;
 	
 	ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS));
 
@@ -41,8 +43,8 @@
             ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
         }
 
-        final IoConnector ioConnector = new SocketConnector();
-        SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+        _ioConnector = new SocketConnector();
+        SocketConnectorConfig cfg = (SocketConnectorConfig) _ioConnector.getDefaultConfig();
 
         // if we do not use our own thread model we get the MINA default which is to use
         // its own leader-follower model
@@ -59,12 +61,11 @@
 
     // Returns the phase pipe
     public Phase connect() throws AMQPException
-    {	
-	PhaseContext ctx = new DefaultPhaseContext();
-	ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
-	ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+    {		
+	_ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+	_ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
 	
-	_phase = PhaseFactory.createPhasePipe(ctx);
+	_phase = PhaseFactory.createPhasePipe(_ctx);
 	_phase.start();
 	
 	return _phase;

Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java Thu Mar 29 15:24:20 2007
@@ -2,6 +2,8 @@
 
 import java.net.URISyntaxException;
 
+import org.apache.qpid.nclient.core.PhaseContext;
+
 public class TransportConnectionFactory
 {
     public enum ConnectionType 
@@ -9,36 +11,26 @@
 	TCP,VM
     }
     
-    public static TransportConnection createTransportConnection(String url,ConnectionType type) throws URISyntaxException
+    public static TransportConnection createTransportConnection(String url,ConnectionType type, PhaseContext ctx) throws URISyntaxException
     {
-	return createTransportConnection(new AMQPConnectionURL(url),type);
+	return createTransportConnection(new AMQPConnectionURL(url),type,ctx);
 	
     }
     
-    public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type)
+    public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type, PhaseContext ctx)
     {
 	switch (type)
 	{
 	    case TCP : default:
 	    {
-		return createTCPConnection(url);
+		return new TCPConnection(url,ctx);
 	    }
 	    
 	    case VM :
 	    {
-		return createVMConnection(url);
+		return new VMConnection(url,ctx);
 	    }
 	}
 	
-    }
-
-    private static TransportConnection createTCPConnection(ConnectionURL url)
-    {
-	return new TCPConnection(url);
-    }
-    
-    private static TransportConnection createVMConnection(ConnectionURL url)
-    {
-	return null;
     }
 }