You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/30 17:53:19 UTC

svn commit: r524144 [2/2] - in /incubator/qpid/branches/client_restructure/java: ./ common/src/main/java/org/apache/qpid/framing/ newclient/ newclient/src/main/java/org/apache/qpid/nclient/amqp/ newclient/src/main/java/org/apache/qpid/nclient/amqp/samp...

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java Fri Mar 30 08:53:18 2007
@@ -65,6 +65,7 @@
 import org.apache.qpid.nclient.amqp.AMQPExchange;
 import org.apache.qpid.nclient.amqp.AMQPMessage;
 import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
 import org.apache.qpid.nclient.transport.AMQPConnectionURL;
 import org.apache.qpid.nclient.transport.ConnectionURL;
 import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
@@ -80,371 +81,371 @@
 @SuppressWarnings("unused")
 public class TestClient
 {
-    private byte _major;
-    private byte _minor;
-    private ConnectionURL _url;
-    private static int _channel = 2;   
-    // Need a Class factory per connection
-    private AMQPClassFactory _classFactory = new AMQPClassFactory();
-    private int _ticket;    
-    
-    public AMQPConnection openConnection() throws Exception
-    {
-	//_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'");
-	
-	_url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
-	return _classFactory.createConnectionClass(_url,ConnectionType.TCP);
-    }
-    
-    public void handleConnectionNegotiation(AMQPConnection con) throws Exception
-    {
-	// ConnectionStartBody
-	ConnectionStartBody connectionStartBody = con.openTCPConnection();
-	_major = connectionStartBody.getMajor();
-	_minor = connectionStartBody.getMinor();
-	
-	FieldTable clientProperties = FieldTableFactory.newFieldTable();        
-        clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id
-        
-        final String locales = new String(connectionStartBody.getLocales(), "utf8");
-        final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
-        
-        final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms()); 
-            
-        SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
-                null, "AMQP", "localhost",
-                null, SecurityHelper.createCallbackHandler(mechanism,_url));
-        
-	ConnectionStartOkBody connectionStartOkBody = 
-	    ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, 
-		                                   new AMQShortString(tokenizer.nextToken()), 
-		                                   new AMQShortString(mechanism), 
-		                                   (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
-	// ConnectionSecureBody 
-	AMQMethodBody body = con.startOk(connectionStartOkBody);
-	ConnectionTuneBody connectionTuneBody;
-	
-	if (body instanceof ConnectionSecureBody)
-	{
-        	ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody)body;
-        	ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(
-        							_major,_minor,sc.evaluateChallenge(connectionSecureBody.getChallenge()));
-        	//Assuming the server is not going to send another challenge
-        	connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody);
-        	
-	}
-	else 
-	{
-	    connectionTuneBody = (ConnectionTuneBody)body;
-	}
-	
-	
-	// Using broker supplied values
-	ConnectionTuneOkBody connectionTuneOkBody = 
-	    	ConnectionTuneOkBody.createMethodBody(_major,_minor,
-	    					      connectionTuneBody.getChannelMax(),
-	    					      connectionTuneBody.getFrameMax(),
-	    					      connectionTuneBody.getHeartbeat());
-	con.tuneOk(connectionTuneOkBody);
-	
-	ConnectionOpenBody connectionOpenBody = 
-	    ConnectionOpenBody.createMethodBody(_major,_minor,null, true,new AMQShortString(_url.getVirtualHost()));
-	
-	ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody);
-    }
-    
-    public void handleChannelNegotiation() throws Exception
-    {
-	AMQPChannel channel = _classFactory.createChannelClass(_channel);
-	
-	ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));	
-	ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody);
-	
-	//lets have some fun
-	ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false);
-	
-	ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody);
-	System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
-	
-	channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true);
-	channelFlowOkBody = channel.flow(channelFlowBody);
-	System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
-    }
-    
-    public void createExchange() throws Exception
-    {
-	AMQPExchange exchange = _classFactory.createExchangeClass(_channel);
-	
-	ExchangeDeclareBody exchangeDeclareBody = 
-	    ExchangeDeclareBody.createMethodBody(_major, _minor, 
-		    				  null, // arguments
-		    				  false,//auto delete
-		    				  false,// durable 
-		    				  new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),
-		    				  true, //internal
-		    				  false,// nowait
-		    				  false,// passive
-		    				  _ticket,
-		    				  new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
-	
-	AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange");
-	exchange.declare(exchangeDeclareBody, cb);
-	// Blocking for response
-	while (!cb.isComplete()){}
-    }
-         
-    
-    public void createAndBindQueue()throws Exception
-    {
-	AMQPQueue queue = _classFactory.createQueueClass(_channel);
-	
-	QueueDeclareBody queueDeclareBody = 
-	    QueueDeclareBody.createMethodBody(_major, _minor,
-		                             null, //arguments
-		                             false,//auto delete
-		                             false,// durable
-		                             false, //exclusive,
-		                             false, //nowait, 
-		                             false, //passive,
-		                             new AMQShortString("MyTestQueue"),
-		                             0);
-	
-	AMQPCallBack cb = new AMQPCallBack(){
-
-	    @Override
-	    public void brokerResponded(AMQMethodBody body)
-	    {		
-		QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody)body;
-		System.out.println("[Broker has created the queue, " +
-			"message count " + queueDeclareOkBody.getMessageCount() + 
-			"consumer count " + queueDeclareOkBody.getConsumerCount() + "]\n");
-	    }
-
-	    @Override
-	    public void brokerRespondedWithError(AMQException e)
-	    {		
-	    }
-	    
-	};
-	
-	queue.declare(queueDeclareBody, cb);
-        //Blocking for response
-	while (!cb.isComplete()){}
-	
-	QueueBindBody queueBindBody = 
-	    QueueBindBody.createMethodBody(_major, _minor,
-		                           null, //arguments
-		                           new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange
-		                           false, //nowait
-		                           new AMQShortString("MyTestQueue"), //queue
-		                           new AMQShortString("RH"), //routingKey
-		                           0 //ticket
-		                           );
-	
-	cb = createCallBackWithMessage("Broker has bound the queue");
-	queue.bind(queueBindBody, cb);
-	//Blocking for response
-	while (!cb.isComplete()){}	
-    }
-    
-    public void purgeQueue()throws Exception
-    {
-	AMQPQueue queue = _classFactory.createQueueClass(_channel);
-	
-	QueuePurgeBody queuePurgeBody = 
-	    QueuePurgeBody.createMethodBody(_major, _minor, 
-		                            false, //nowait
-		                            new AMQShortString("MyTestQueue"), //queue
-		                            0 //ticket
-		                            );
-	    
-	AMQPCallBack cb = new AMQPCallBack(){
-
-	    @Override
-	    public void brokerResponded(AMQMethodBody body)
-	    {		
-		QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody)body;
-		System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n");
-	    }
-
-	    @Override
-	    public void brokerRespondedWithError(AMQException e)
-	    {		
-	    }
-	    
-	};
-	
-	queue.purge(queuePurgeBody, cb);
-        //Blocking for response
-	while (!cb.isComplete()){}
-	    
-    }
-    
-    public void deleteQueue()throws Exception
-    {
-	AMQPQueue queue = _classFactory.createQueueClass(_channel);
-	
-	QueueDeleteBody queueDeleteBody = 
-	    QueueDeleteBody.createMethodBody(_major, _minor,
-	    				    false, //ifEmpty
-	    				    false, //ifUnused
-		                            false, //nowait
-		                            new AMQShortString("MyTestQueue"), //queue
-		                            0 //ticket
-		                            );
-	    
-	AMQPCallBack cb = new AMQPCallBack(){
-
-	    @Override
-	    public void brokerResponded(AMQMethodBody body)
-	    {		
-		QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody)body;
-		System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n");
-	    }
-
-	    @Override
-	    public void brokerRespondedWithError(AMQException e)
-	    {		
-	    }
-	    
-	};
-	
-	queue.delete(queueDeleteBody, cb);
-        //Blocking for response
-	while (!cb.isComplete()){}
-	    
-    }
-
-    public void publishAndSubscribe() throws Exception
-    {
-	AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
-	MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor,
-		                                                                   new AMQShortString("myClient"),// destination
-		                                                                   false, //exclusive
-		                                                                   null, //filter
-		                                                                   false, //noAck,
-		                                                                   false, //noLocal, 
-		                                                                   new AMQShortString("MyTestQueue"), //queue
-		                                                                   0 //ticket
-		                                                                   );
-	
-	AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume");	
-	message.consume(messageConsumeBody, cb);
-	//Blocking for response
-	while (!cb.isComplete()){}
-	
-	// Sending 5 messages serially
-	for (int i=0; i<5; i++)
-	{
-        	cb = createCallBackWithMessage("Broker has accepted msg " + i);
-        	message.transfer(createMessages("Test" + i),cb);
-        	while (!cb.isComplete()){}	
-	}
-	
-    	MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient"));
-    
-    	AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel");	
-    	message.cancel(messageCancelBody, cb2);
-	
-    }
-    
-    private MessageTransferBody createMessages(String content) throws Exception
-    {
-	FieldTable headers = FieldTableFactory.newFieldTable();  
-	headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + "");
-		
-	MessageTransferBody messageTransferBody = 
-	    MessageTransferBody.createMethodBody(_major, _minor,
-		    				 new AMQShortString("testApp"), //appId
-		                                 headers, //applicationHeaders
-		                                 new Content(Content.TypeEnum.INLINE_T,content.getBytes()), //body
-		                                 new AMQShortString(""), //contentEncoding, 
-		                                 new AMQShortString("text/plain"), //contentType
-		                                 new AMQShortString("testApp"), //correlationId
-		                                 (short)1, //deliveryMode non persistant
-		                                 new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
-		                                 new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
-		                                 0l, //expiration
-		                                 false, //immediate
-		                                 false, //mandatory
-		                                 new AMQShortString(UUID.randomUUID().toString()), //messageId
-		                                 (short)0, //priority
-		                                 false, //redelivered
-		                                 new AMQShortString("RH"), //replyTo
-		                                 new AMQShortString("RH"), //routingKey, 
-		                                 "abc".getBytes(), //securityToken
-		                                 0, //ticket
-		                                 System.currentTimeMillis(), //timestamp
-		                                 new AMQShortString(""), //transactionId
-		                                 0l, //ttl, 
-		                                 new AMQShortString("Hello") //userId
-		                                 );
-	
-	return messageTransferBody;
-		                                 
-    }
-    
-    public void publishAndGet() throws Exception
-    {
-	AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
-	AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5");
-	
-	MessageGetBody messageGetBody = 
-	    	MessageGetBody.createMethodBody(_major, _minor, 
-						new AMQShortString("myClient"), 
-						false, //noAck
-						new AMQShortString("MyTestQueue"), //queue
-						0 //ticket
-						);
-	
-	//AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper());
-	message.transfer(createMessages("Test"),cb);
-	while(!cb.isComplete()){}
-	
-	cb = createCallBackWithMessage("Broker has accepted get");
-	message.get(messageGetBody, cb);	
-    }
-    
-    // Creates a gneric call back and prints the given message
-    private AMQPCallBack createCallBackWithMessage(final String msg)
-    {
-	AMQPCallBack cb = new AMQPCallBack(){
-
-	    @Override
-	    public void brokerResponded(AMQMethodBody body)
-	    {		
-		System.out.println(msg);
-	    }
-
-	    @Override
-	    public void brokerRespondedWithError(AMQException e)
-	    {		
-	    }
-	    
-	};
-	
-	return cb;
-    }
-    
-    public static void main(String[] args)
-    {
-	TestClient test = new TestClient();
-	try
-	{
-	    AMQPConnection con = test.openConnection();
-	    test.handleConnectionNegotiation(con);
-	    test.handleChannelNegotiation();
-	    test.createExchange();
-	    test.createAndBindQueue();
-	    test.publishAndSubscribe();
-	    test.purgeQueue();	    
-	    test.publishAndGet();
-	    test.deleteQueue();
+	private byte _major;
+
+	private byte _minor;
+
+	private ConnectionURL _url;
+
+	private static int _channel = 2;
+
+	// Need a Class factory per connection
+	private AMQPClassFactory _classFactory = new AMQPClassFactory();
+
+	private int _ticket;
+
+	public AMQPConnection openConnection() throws Exception
+	{
+		//_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'");
+
+		_url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
+		return _classFactory.createConnectionClass(_url, ConnectionType.TCP);
+	}
+
+	public void handleConnectionNegotiation(AMQPConnection con) throws Exception
+	{
+		StateHelper stateHelper = new StateHelper();
+		_classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, stateHelper);
+		_classFactory.getStateManager().addListener(AMQPStateType.CHANNEL_STATE, stateHelper);
+
+		//ConnectionStartBody
+		ConnectionStartBody connectionStartBody = con.openTCPConnection();
+		_major = connectionStartBody.getMajor();
+		_minor = connectionStartBody.getMinor();
+
+		FieldTable clientProperties = FieldTableFactory.newFieldTable();
+		clientProperties.put(new AMQShortString(ClientProperties.instance.toString()), "Test"); // setting only the client id
+
+		final String locales = new String(connectionStartBody.getLocales(), "utf8");
+		final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
+
+		final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms());
+
+		SaslClient sc = Sasl.createSaslClient(new String[]
+		{ mechanism }, null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism, _url));
+
+		ConnectionStartOkBody connectionStartOkBody = ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString(
+				tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
+		// ConnectionSecureBody 
+		AMQMethodBody body = con.startOk(connectionStartOkBody);
+		ConnectionTuneBody connectionTuneBody;
+
+		if (body instanceof ConnectionSecureBody)
+		{
+			ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody) body;
+			ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(_major, _minor, sc
+					.evaluateChallenge(connectionSecureBody.getChallenge()));
+			//Assuming the server is not going to send another challenge
+			connectionTuneBody = (ConnectionTuneBody) con.secureOk(connectionSecureOkBody);
+
+		}
+		else
+		{
+			connectionTuneBody = (ConnectionTuneBody) body;
+		}
+
+		// Using broker supplied values
+		ConnectionTuneOkBody connectionTuneOkBody = ConnectionTuneOkBody.createMethodBody(_major, _minor, connectionTuneBody.getChannelMax(),
+				connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat());
+		con.tuneOk(connectionTuneOkBody);
+
+		ConnectionOpenBody connectionOpenBody = ConnectionOpenBody.createMethodBody(_major, _minor, null, true, new AMQShortString(_url
+				.getVirtualHost()));
+
+		ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody);
+	}
+
+	public void handleChannelNegotiation() throws Exception
+	{
+		AMQPChannel channel = _classFactory.createChannelClass(_channel);
+
+		ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));
+		ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody);
+
+		//lets have some fun
+		ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false);
+
+		ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody);
+		System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend"));
+
+		channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true);
+		channelFlowOkBody = channel.flow(channelFlowBody);
+		System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend"));
+	}
+
+	public void createExchange() throws Exception
+	{
+		AMQPExchange exchange = _classFactory.createExchangeClass(_channel);
+
+		ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody(_major, _minor, null, // arguments
+				false,//auto delete
+				false,// durable 
+				new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), true, //internal
+				false,// nowait
+				false,// passive
+				_ticket, new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
+
+		AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange");
+		exchange.declare(exchangeDeclareBody, cb);
+		// Blocking for response
+		while (!cb.isComplete())
+		{
+		}
+	}
+
+	public void createAndBindQueue() throws Exception
+	{
+		AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+		QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody(_major, _minor, null, //arguments
+				false,//auto delete
+				false,// durable
+				false, //exclusive,
+				false, //nowait, 
+				false, //passive,
+				new AMQShortString("MyTestQueue"), 0);
+
+		AMQPCallBack cb = new AMQPCallBack()
+		{
+
+			@Override
+			public void brokerResponded(AMQMethodBody body)
+			{
+				QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody) body;
+				System.out.println("[Broker has created the queue, " + "message count " + queueDeclareOkBody.getMessageCount() + "consumer count "
+						+ queueDeclareOkBody.getConsumerCount() + "]\n");
+			}
+
+			@Override
+			public void brokerRespondedWithError(AMQException e)
+			{
+			}
+
+		};
+
+		queue.declare(queueDeclareBody, cb);
+		//Blocking for response
+		while (!cb.isComplete())
+		{
+		}
+
+		QueueBindBody queueBindBody = QueueBindBody.createMethodBody(_major, _minor, null, //arguments
+				new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange
+				false, //nowait
+				new AMQShortString("MyTestQueue"), //queue
+				new AMQShortString("RH"), //routingKey
+				0 //ticket
+				);
+
+		cb = createCallBackWithMessage("Broker has bound the queue");
+		queue.bind(queueBindBody, cb);
+		//Blocking for response
+		while (!cb.isComplete())
+		{
+		}
+	}
+
+	public void purgeQueue() throws Exception
+	{
+		AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+		QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody(_major, _minor, false, //nowait
+				new AMQShortString("MyTestQueue"), //queue
+				0 //ticket
+				);
+
+		AMQPCallBack cb = new AMQPCallBack()
+		{
+
+			@Override
+			public void brokerResponded(AMQMethodBody body)
+			{
+				QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody) body;
+				System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n");
+			}
+
+			@Override
+			public void brokerRespondedWithError(AMQException e)
+			{
+			}
+
+		};
+
+		queue.purge(queuePurgeBody, cb);
+		//Blocking for response
+		while (!cb.isComplete())
+		{
+		}
+
+	}
+
+	public void deleteQueue() throws Exception
+	{
+		AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+		QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody(_major, _minor, false, //ifEmpty
+				false, //ifUnused
+				false, //nowait
+				new AMQShortString("MyTestQueue"), //queue
+				0 //ticket
+				);
+
+		AMQPCallBack cb = new AMQPCallBack()
+		{
+
+			@Override
+			public void brokerResponded(AMQMethodBody body)
+			{
+				QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody) body;
+				System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n");
+			}
+
+			@Override
+			public void brokerRespondedWithError(AMQException e)
+			{
+			}
+
+		};
+
+		queue.delete(queueDeleteBody, cb);
+		//Blocking for response
+		while (!cb.isComplete())
+		{
+		}
+
+	}
+
+	public void publishAndSubscribe() throws Exception
+	{
+		AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper());
+		MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, new AMQShortString("myClient"),// destination
+				false, //exclusive
+				null, //filter
+				false, //noAck,
+				false, //noLocal, 
+				new AMQShortString("MyTestQueue"), //queue
+				0 //ticket
+				);
+
+		AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume");
+		message.consume(messageConsumeBody, cb);
+		//Blocking for response
+		while (!cb.isComplete())
+		{
+		}
+
+		// Sending 5 messages serially
+		for (int i = 0; i < 5; i++)
+		{
+			cb = createCallBackWithMessage("Broker has accepted msg " + i);
+			message.transfer(createMessages("Test" + i), cb);
+			while (!cb.isComplete())
+			{
+			}
+		}
+
+		MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient"));
+
+		AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel");
+		message.cancel(messageCancelBody, cb2);
+
 	}
-	catch (Exception e)
+
+	private MessageTransferBody createMessages(String content) throws Exception
+	{
+		FieldTable headers = FieldTableFactory.newFieldTable();
+		headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + "");
+
+		MessageTransferBody messageTransferBody = MessageTransferBody.createMethodBody(_major, _minor, new AMQShortString("testApp"), //appId
+				headers, //applicationHeaders
+				new Content(Content.TypeEnum.INLINE_T, content.getBytes()), //body
+				new AMQShortString(""), //contentEncoding, 
+				new AMQShortString("text/plain"), //contentType
+				new AMQShortString("testApp"), //correlationId
+				(short) 1, //deliveryMode non persistant
+				new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
+				new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+				0l, //expiration
+				false, //immediate
+				false, //mandatory
+				new AMQShortString(UUID.randomUUID().toString()), //messageId
+				(short) 0, //priority
+				false, //redelivered
+				new AMQShortString("RH"), //replyTo
+				new AMQShortString("RH"), //routingKey, 
+				"abc".getBytes(), //securityToken
+				0, //ticket
+				System.currentTimeMillis(), //timestamp
+				new AMQShortString(""), //transactionId
+				0l, //ttl, 
+				new AMQShortString("Hello") //userId
+				);
+
+		return messageTransferBody;
+
+	}
+
+	public void publishAndGet() throws Exception
+	{
+		AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper());
+		AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5");
+
+		MessageGetBody messageGetBody = MessageGetBody.createMethodBody(_major, _minor, new AMQShortString("myClient"), false, //noAck
+				new AMQShortString("MyTestQueue"), //queue
+				0 //ticket
+				);
+
+		//AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper());
+		message.transfer(createMessages("Test"), cb);
+		while (!cb.isComplete())
+		{
+		}
+
+		cb = createCallBackWithMessage("Broker has accepted get");
+		message.get(messageGetBody, cb);
+	}
+
+	// Creates a gneric call back and prints the given message
+	private AMQPCallBack createCallBackWithMessage(final String msg)
+	{
+		AMQPCallBack cb = new AMQPCallBack()
+		{
+
+			@Override
+			public void brokerResponded(AMQMethodBody body)
+			{
+				System.out.println(msg);
+			}
+
+			@Override
+			public void brokerRespondedWithError(AMQException e)
+			{
+			}
+
+		};
+
+		return cb;
+	}
+
+	public static void main(String[] args)
 	{
-	    e.printStackTrace();
+		TestClient test = new TestClient();
+		try
+		{
+			AMQPConnection con = test.openConnection();
+			test.handleConnectionNegotiation(con);
+			test.handleChannelNegotiation();
+			test.createExchange();
+			test.createAndBindQueue();
+			test.publishAndSubscribe();
+			test.purgeQueue();
+			test.publishAndGet();
+			test.deleteQueue();
+		}
+		catch (Exception e)
+		{
+			e.printStackTrace();
+		}
 	}
-    }
 
 }

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java Fri Mar 30 08:53:18 2007
@@ -38,7 +38,8 @@
 
     public String toString()
     {
-        return "AMQState: id = " + _id + " name: " + _name;
+        //return "AMQState: id = " + _id + " name: " + _name;
+    	return _name; // looks better with loggin
     }
 
     // Connection state

Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java?view=auto&rev=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java Fri Mar 30 08:53:18 2007
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.state;
+
+public class AMQPStateChangedEvent 
+{
+	private AMQPState _oldState;
+	
+	private AMQPState _newState;
+
+	private AMQPStateType _stateType;
+
+	public AMQPStateChangedEvent(AMQPState oldState, AMQPState newState, AMQPStateType stateType)
+	{
+		_oldState  = oldState;
+		_newState  = newState;
+		_stateType = stateType; 
+	}
+	
+	public AMQPState getNewState() 
+	{
+		return _newState;
+	}
+
+	public void setNewState(AMQPState newState) 
+	{
+		this._newState = newState;
+	}
+
+	public AMQPState getOldState() 
+	{
+		return _oldState;
+	}
+
+	public void setOldState(AMQPState oldState) 
+	{
+		this._oldState = oldState;
+	}
+
+	public AMQPStateType getStateType() 
+	{
+		return _stateType;
+	}
+
+	public void setStateType(AMQPStateType stateType) 
+	{
+		this._stateType = stateType;
+	}
+	
+}

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java Fri Mar 30 08:53:18 2007
@@ -4,5 +4,5 @@
 
 public interface AMQPStateListener 
 {
-	public void stateChanged(AMQPState oldState, AMQPState newState) throws AMQPException;
+	public void stateChanged(AMQPStateChangedEvent event) throws AMQPException;
 }

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java Fri Mar 30 08:53:18 2007
@@ -1,11 +1,13 @@
 package org.apache.qpid.nclient.amqp.state;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.core.AMQPException;
 
 public interface AMQPStateManager 
 {
-
-	public void addListener(AMQPStateListener l)throws AMQException;
+	public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
+	
+	public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
 	
-	public void removeListener(AMQPStateListener l)throws AMQException;
+	public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException;
 }

Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java Fri Mar 30 08:53:18 2007
@@ -40,7 +40,7 @@
 
     public String toString()
     {
-        return "AMQState: id = " + _typeId + " name: " + _typeName;
+        return _typeName;
     }
 
     // Connection state

Modified: incubator/qpid/branches/client_restructure/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/pom.xml?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/pom.xml (original)
+++ incubator/qpid/branches/client_restructure/java/pom.xml Fri Mar 30 08:53:18 2007
@@ -130,6 +130,7 @@
         <module>common</module>
         <module>broker</module>
         <module>client</module>
+        <module>newclient</module>
         <module>cluster</module>
         <module>systests</module>
         <module>perftests</module>
@@ -382,7 +383,7 @@
             <dependency>
                 <groupId>commons-configuration</groupId>
                 <artifactId>commons-configuration</artifactId>
-                <version>1.2</version>
+                <version>1.3</version>
             </dependency>
             <dependency>
                 <groupId>commons-lang</groupId>
@@ -471,6 +472,11 @@
             <dependency>
                 <groupId>org.apache.qpid</groupId>
                 <artifactId>qpid-client</artifactId>
+                <version>${project.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.qpid</groupId>
+                <artifactId>qpid-newclient</artifactId>
                 <version>${project.version}</version>
             </dependency>
             <dependency>