You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/30 17:53:19 UTC
svn commit: r524144 [2/2] - in
/incubator/qpid/branches/client_restructure/java: ./
common/src/main/java/org/apache/qpid/framing/ newclient/
newclient/src/main/java/org/apache/qpid/nclient/amqp/
newclient/src/main/java/org/apache/qpid/nclient/amqp/samp...
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java Fri Mar 30 08:53:18 2007
@@ -65,6 +65,7 @@
import org.apache.qpid.nclient.amqp.AMQPExchange;
import org.apache.qpid.nclient.amqp.AMQPMessage;
import org.apache.qpid.nclient.amqp.AMQPQueue;
+import org.apache.qpid.nclient.amqp.state.AMQPStateType;
import org.apache.qpid.nclient.transport.AMQPConnectionURL;
import org.apache.qpid.nclient.transport.ConnectionURL;
import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
@@ -80,371 +81,371 @@
@SuppressWarnings("unused")
public class TestClient
{
- private byte _major;
- private byte _minor;
- private ConnectionURL _url;
- private static int _channel = 2;
- // Need a Class factory per connection
- private AMQPClassFactory _classFactory = new AMQPClassFactory();
- private int _ticket;
-
- public AMQPConnection openConnection() throws Exception
- {
- //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'");
-
- _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
- return _classFactory.createConnectionClass(_url,ConnectionType.TCP);
- }
-
- public void handleConnectionNegotiation(AMQPConnection con) throws Exception
- {
- // ConnectionStartBody
- ConnectionStartBody connectionStartBody = con.openTCPConnection();
- _major = connectionStartBody.getMajor();
- _minor = connectionStartBody.getMinor();
-
- FieldTable clientProperties = FieldTableFactory.newFieldTable();
- clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id
-
- final String locales = new String(connectionStartBody.getLocales(), "utf8");
- final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
-
- final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms());
-
- SaslClient sc = Sasl.createSaslClient(new String[]{mechanism},
- null, "AMQP", "localhost",
- null, SecurityHelper.createCallbackHandler(mechanism,_url));
-
- ConnectionStartOkBody connectionStartOkBody =
- ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties,
- new AMQShortString(tokenizer.nextToken()),
- new AMQShortString(mechanism),
- (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
- // ConnectionSecureBody
- AMQMethodBody body = con.startOk(connectionStartOkBody);
- ConnectionTuneBody connectionTuneBody;
-
- if (body instanceof ConnectionSecureBody)
- {
- ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody)body;
- ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(
- _major,_minor,sc.evaluateChallenge(connectionSecureBody.getChallenge()));
- //Assuming the server is not going to send another challenge
- connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody);
-
- }
- else
- {
- connectionTuneBody = (ConnectionTuneBody)body;
- }
-
-
- // Using broker supplied values
- ConnectionTuneOkBody connectionTuneOkBody =
- ConnectionTuneOkBody.createMethodBody(_major,_minor,
- connectionTuneBody.getChannelMax(),
- connectionTuneBody.getFrameMax(),
- connectionTuneBody.getHeartbeat());
- con.tuneOk(connectionTuneOkBody);
-
- ConnectionOpenBody connectionOpenBody =
- ConnectionOpenBody.createMethodBody(_major,_minor,null, true,new AMQShortString(_url.getVirtualHost()));
-
- ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody);
- }
-
- public void handleChannelNegotiation() throws Exception
- {
- AMQPChannel channel = _classFactory.createChannelClass(_channel);
-
- ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));
- ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody);
-
- //lets have some fun
- ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false);
-
- ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody);
- System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
-
- channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true);
- channelFlowOkBody = channel.flow(channelFlowBody);
- System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
- }
-
- public void createExchange() throws Exception
- {
- AMQPExchange exchange = _classFactory.createExchangeClass(_channel);
-
- ExchangeDeclareBody exchangeDeclareBody =
- ExchangeDeclareBody.createMethodBody(_major, _minor,
- null, // arguments
- false,//auto delete
- false,// durable
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),
- true, //internal
- false,// nowait
- false,// passive
- _ticket,
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
-
- AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange");
- exchange.declare(exchangeDeclareBody, cb);
- // Blocking for response
- while (!cb.isComplete()){}
- }
-
-
- public void createAndBindQueue()throws Exception
- {
- AMQPQueue queue = _classFactory.createQueueClass(_channel);
-
- QueueDeclareBody queueDeclareBody =
- QueueDeclareBody.createMethodBody(_major, _minor,
- null, //arguments
- false,//auto delete
- false,// durable
- false, //exclusive,
- false, //nowait,
- false, //passive,
- new AMQShortString("MyTestQueue"),
- 0);
-
- AMQPCallBack cb = new AMQPCallBack(){
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody)body;
- System.out.println("[Broker has created the queue, " +
- "message count " + queueDeclareOkBody.getMessageCount() +
- "consumer count " + queueDeclareOkBody.getConsumerCount() + "]\n");
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- queue.declare(queueDeclareBody, cb);
- //Blocking for response
- while (!cb.isComplete()){}
-
- QueueBindBody queueBindBody =
- QueueBindBody.createMethodBody(_major, _minor,
- null, //arguments
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange
- false, //nowait
- new AMQShortString("MyTestQueue"), //queue
- new AMQShortString("RH"), //routingKey
- 0 //ticket
- );
-
- cb = createCallBackWithMessage("Broker has bound the queue");
- queue.bind(queueBindBody, cb);
- //Blocking for response
- while (!cb.isComplete()){}
- }
-
- public void purgeQueue()throws Exception
- {
- AMQPQueue queue = _classFactory.createQueueClass(_channel);
-
- QueuePurgeBody queuePurgeBody =
- QueuePurgeBody.createMethodBody(_major, _minor,
- false, //nowait
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- AMQPCallBack cb = new AMQPCallBack(){
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody)body;
- System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n");
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- queue.purge(queuePurgeBody, cb);
- //Blocking for response
- while (!cb.isComplete()){}
-
- }
-
- public void deleteQueue()throws Exception
- {
- AMQPQueue queue = _classFactory.createQueueClass(_channel);
-
- QueueDeleteBody queueDeleteBody =
- QueueDeleteBody.createMethodBody(_major, _minor,
- false, //ifEmpty
- false, //ifUnused
- false, //nowait
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- AMQPCallBack cb = new AMQPCallBack(){
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody)body;
- System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n");
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- queue.delete(queueDeleteBody, cb);
- //Blocking for response
- while (!cb.isComplete()){}
-
- }
-
- public void publishAndSubscribe() throws Exception
- {
- AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
- MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor,
- new AMQShortString("myClient"),// destination
- false, //exclusive
- null, //filter
- false, //noAck,
- false, //noLocal,
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume");
- message.consume(messageConsumeBody, cb);
- //Blocking for response
- while (!cb.isComplete()){}
-
- // Sending 5 messages serially
- for (int i=0; i<5; i++)
- {
- cb = createCallBackWithMessage("Broker has accepted msg " + i);
- message.transfer(createMessages("Test" + i),cb);
- while (!cb.isComplete()){}
- }
-
- MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient"));
-
- AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel");
- message.cancel(messageCancelBody, cb2);
-
- }
-
- private MessageTransferBody createMessages(String content) throws Exception
- {
- FieldTable headers = FieldTableFactory.newFieldTable();
- headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + "");
-
- MessageTransferBody messageTransferBody =
- MessageTransferBody.createMethodBody(_major, _minor,
- new AMQShortString("testApp"), //appId
- headers, //applicationHeaders
- new Content(Content.TypeEnum.INLINE_T,content.getBytes()), //body
- new AMQShortString(""), //contentEncoding,
- new AMQShortString("text/plain"), //contentType
- new AMQShortString("testApp"), //correlationId
- (short)1, //deliveryMode non persistant
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
- new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
- 0l, //expiration
- false, //immediate
- false, //mandatory
- new AMQShortString(UUID.randomUUID().toString()), //messageId
- (short)0, //priority
- false, //redelivered
- new AMQShortString("RH"), //replyTo
- new AMQShortString("RH"), //routingKey,
- "abc".getBytes(), //securityToken
- 0, //ticket
- System.currentTimeMillis(), //timestamp
- new AMQShortString(""), //transactionId
- 0l, //ttl,
- new AMQShortString("Hello") //userId
- );
-
- return messageTransferBody;
-
- }
-
- public void publishAndGet() throws Exception
- {
- AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
- AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5");
-
- MessageGetBody messageGetBody =
- MessageGetBody.createMethodBody(_major, _minor,
- new AMQShortString("myClient"),
- false, //noAck
- new AMQShortString("MyTestQueue"), //queue
- 0 //ticket
- );
-
- //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper());
- message.transfer(createMessages("Test"),cb);
- while(!cb.isComplete()){}
-
- cb = createCallBackWithMessage("Broker has accepted get");
- message.get(messageGetBody, cb);
- }
-
- // Creates a gneric call back and prints the given message
- private AMQPCallBack createCallBackWithMessage(final String msg)
- {
- AMQPCallBack cb = new AMQPCallBack(){
-
- @Override
- public void brokerResponded(AMQMethodBody body)
- {
- System.out.println(msg);
- }
-
- @Override
- public void brokerRespondedWithError(AMQException e)
- {
- }
-
- };
-
- return cb;
- }
-
- public static void main(String[] args)
- {
- TestClient test = new TestClient();
- try
- {
- AMQPConnection con = test.openConnection();
- test.handleConnectionNegotiation(con);
- test.handleChannelNegotiation();
- test.createExchange();
- test.createAndBindQueue();
- test.publishAndSubscribe();
- test.purgeQueue();
- test.publishAndGet();
- test.deleteQueue();
+ private byte _major;
+
+ private byte _minor;
+
+ private ConnectionURL _url;
+
+ private static int _channel = 2;
+
+ // Need a Class factory per connection
+ private AMQPClassFactory _classFactory = new AMQPClassFactory();
+
+ private int _ticket;
+
+ public AMQPConnection openConnection() throws Exception
+ {
+ //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'");
+
+ _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
+ return _classFactory.createConnectionClass(_url, ConnectionType.TCP);
+ }
+
+ public void handleConnectionNegotiation(AMQPConnection con) throws Exception
+ {
+ StateHelper stateHelper = new StateHelper();
+ _classFactory.getStateManager().addListener(AMQPStateType.CONNECTION_STATE, stateHelper);
+ _classFactory.getStateManager().addListener(AMQPStateType.CHANNEL_STATE, stateHelper);
+
+ //ConnectionStartBody
+ ConnectionStartBody connectionStartBody = con.openTCPConnection();
+ _major = connectionStartBody.getMajor();
+ _minor = connectionStartBody.getMinor();
+
+ FieldTable clientProperties = FieldTableFactory.newFieldTable();
+ clientProperties.put(new AMQShortString(ClientProperties.instance.toString()), "Test"); // setting only the client id
+
+ final String locales = new String(connectionStartBody.getLocales(), "utf8");
+ final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
+
+ final String mechanism = SecurityHelper.chooseMechanism(connectionStartBody.getMechanisms());
+
+ SaslClient sc = Sasl.createSaslClient(new String[]
+ { mechanism }, null, "AMQP", "localhost", null, SecurityHelper.createCallbackHandler(mechanism, _url));
+
+ ConnectionStartOkBody connectionStartOkBody = ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties, new AMQShortString(
+ tokenizer.nextToken()), new AMQShortString(mechanism), (sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
+ // ConnectionSecureBody
+ AMQMethodBody body = con.startOk(connectionStartOkBody);
+ ConnectionTuneBody connectionTuneBody;
+
+ if (body instanceof ConnectionSecureBody)
+ {
+ ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody) body;
+ ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(_major, _minor, sc
+ .evaluateChallenge(connectionSecureBody.getChallenge()));
+ //Assuming the server is not going to send another challenge
+ connectionTuneBody = (ConnectionTuneBody) con.secureOk(connectionSecureOkBody);
+
+ }
+ else
+ {
+ connectionTuneBody = (ConnectionTuneBody) body;
+ }
+
+ // Using broker supplied values
+ ConnectionTuneOkBody connectionTuneOkBody = ConnectionTuneOkBody.createMethodBody(_major, _minor, connectionTuneBody.getChannelMax(),
+ connectionTuneBody.getFrameMax(), connectionTuneBody.getHeartbeat());
+ con.tuneOk(connectionTuneOkBody);
+
+ ConnectionOpenBody connectionOpenBody = ConnectionOpenBody.createMethodBody(_major, _minor, null, true, new AMQShortString(_url
+ .getVirtualHost()));
+
+ ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody);
+ }
+
+ public void handleChannelNegotiation() throws Exception
+ {
+ AMQPChannel channel = _classFactory.createChannelClass(_channel);
+
+ ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));
+ ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody);
+
+ //lets have some fun
+ ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false);
+
+ ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody);
+ System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend"));
+
+ channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true);
+ channelFlowOkBody = channel.flow(channelFlowBody);
+ System.out.println("Channel is " + (channelFlowOkBody.getActive() ? "active" : "suspend"));
+ }
+
+ public void createExchange() throws Exception
+ {
+ AMQPExchange exchange = _classFactory.createExchangeClass(_channel);
+
+ ExchangeDeclareBody exchangeDeclareBody = ExchangeDeclareBody.createMethodBody(_major, _minor, null, // arguments
+ false,//auto delete
+ false,// durable
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME), true, //internal
+ false,// nowait
+ false,// passive
+ _ticket, new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
+
+ AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange");
+ exchange.declare(exchangeDeclareBody, cb);
+ // Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+ }
+
+ public void createAndBindQueue() throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueueDeclareBody queueDeclareBody = QueueDeclareBody.createMethodBody(_major, _minor, null, //arguments
+ false,//auto delete
+ false,// durable
+ false, //exclusive,
+ false, //nowait,
+ false, //passive,
+ new AMQShortString("MyTestQueue"), 0);
+
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody) body;
+ System.out.println("[Broker has created the queue, " + "message count " + queueDeclareOkBody.getMessageCount() + "consumer count "
+ + queueDeclareOkBody.getConsumerCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.declare(queueDeclareBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
+ QueueBindBody queueBindBody = QueueBindBody.createMethodBody(_major, _minor, null, //arguments
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ new AMQShortString("RH"), //routingKey
+ 0 //ticket
+ );
+
+ cb = createCallBackWithMessage("Broker has bound the queue");
+ queue.bind(queueBindBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+ }
+
+ public void purgeQueue() throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueuePurgeBody queuePurgeBody = QueuePurgeBody.createMethodBody(_major, _minor, false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody) body;
+ System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.purge(queuePurgeBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
+ }
+
+ public void deleteQueue() throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueueDeleteBody queueDeleteBody = QueueDeleteBody.createMethodBody(_major, _minor, false, //ifEmpty
+ false, //ifUnused
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody) body;
+ System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.delete(queueDeleteBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
+ }
+
+ public void publishAndSubscribe() throws Exception
+ {
+ AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper());
+ MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor, new AMQShortString("myClient"),// destination
+ false, //exclusive
+ null, //filter
+ false, //noAck,
+ false, //noLocal,
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume");
+ message.consume(messageConsumeBody, cb);
+ //Blocking for response
+ while (!cb.isComplete())
+ {
+ }
+
+ // Sending 5 messages serially
+ for (int i = 0; i < 5; i++)
+ {
+ cb = createCallBackWithMessage("Broker has accepted msg " + i);
+ message.transfer(createMessages("Test" + i), cb);
+ while (!cb.isComplete())
+ {
+ }
+ }
+
+ MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient"));
+
+ AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel");
+ message.cancel(messageCancelBody, cb2);
+
}
- catch (Exception e)
+
+ private MessageTransferBody createMessages(String content) throws Exception
+ {
+ FieldTable headers = FieldTableFactory.newFieldTable();
+ headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + "");
+
+ MessageTransferBody messageTransferBody = MessageTransferBody.createMethodBody(_major, _minor, new AMQShortString("testApp"), //appId
+ headers, //applicationHeaders
+ new Content(Content.TypeEnum.INLINE_T, content.getBytes()), //body
+ new AMQShortString(""), //contentEncoding,
+ new AMQShortString("text/plain"), //contentType
+ new AMQShortString("testApp"), //correlationId
+ (short) 1, //deliveryMode non persistant
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+ 0l, //expiration
+ false, //immediate
+ false, //mandatory
+ new AMQShortString(UUID.randomUUID().toString()), //messageId
+ (short) 0, //priority
+ false, //redelivered
+ new AMQShortString("RH"), //replyTo
+ new AMQShortString("RH"), //routingKey,
+ "abc".getBytes(), //securityToken
+ 0, //ticket
+ System.currentTimeMillis(), //timestamp
+ new AMQShortString(""), //transactionId
+ 0l, //ttl,
+ new AMQShortString("Hello") //userId
+ );
+
+ return messageTransferBody;
+
+ }
+
+ public void publishAndGet() throws Exception
+ {
+ AMQPMessage message = _classFactory.createMessageClass(_channel, new MessageHelper());
+ AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5");
+
+ MessageGetBody messageGetBody = MessageGetBody.createMethodBody(_major, _minor, new AMQShortString("myClient"), false, //noAck
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper());
+ message.transfer(createMessages("Test"), cb);
+ while (!cb.isComplete())
+ {
+ }
+
+ cb = createCallBackWithMessage("Broker has accepted get");
+ message.get(messageGetBody, cb);
+ }
+
+ // Creates a gneric call back and prints the given message
+ private AMQPCallBack createCallBackWithMessage(final String msg)
+ {
+ AMQPCallBack cb = new AMQPCallBack()
+ {
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ System.out.println(msg);
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ return cb;
+ }
+
+ public static void main(String[] args)
{
- e.printStackTrace();
+ TestClient test = new TestClient();
+ try
+ {
+ AMQPConnection con = test.openConnection();
+ test.handleConnectionNegotiation(con);
+ test.handleChannelNegotiation();
+ test.createExchange();
+ test.createAndBindQueue();
+ test.publishAndSubscribe();
+ test.purgeQueue();
+ test.publishAndGet();
+ test.deleteQueue();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
}
- }
}
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPState.java Fri Mar 30 08:53:18 2007
@@ -38,7 +38,8 @@
public String toString()
{
- return "AMQState: id = " + _id + " name: " + _name;
+ //return "AMQState: id = " + _id + " name: " + _name;
+ return _name; // looks better with loggin
}
// Connection state
Added: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java?view=auto&rev=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java (added)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateChangedEvent.java Fri Mar 30 08:53:18 2007
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.amqp.state;
+
+public class AMQPStateChangedEvent
+{
+ private AMQPState _oldState;
+
+ private AMQPState _newState;
+
+ private AMQPStateType _stateType;
+
+ public AMQPStateChangedEvent(AMQPState oldState, AMQPState newState, AMQPStateType stateType)
+ {
+ _oldState = oldState;
+ _newState = newState;
+ _stateType = stateType;
+ }
+
+ public AMQPState getNewState()
+ {
+ return _newState;
+ }
+
+ public void setNewState(AMQPState newState)
+ {
+ this._newState = newState;
+ }
+
+ public AMQPState getOldState()
+ {
+ return _oldState;
+ }
+
+ public void setOldState(AMQPState oldState)
+ {
+ this._oldState = oldState;
+ }
+
+ public AMQPStateType getStateType()
+ {
+ return _stateType;
+ }
+
+ public void setStateType(AMQPStateType stateType)
+ {
+ this._stateType = stateType;
+ }
+
+}
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateListener.java Fri Mar 30 08:53:18 2007
@@ -4,5 +4,5 @@
public interface AMQPStateListener
{
- public void stateChanged(AMQPState oldState, AMQPState newState) throws AMQPException;
+ public void stateChanged(AMQPStateChangedEvent event) throws AMQPException;
}
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateManager.java Fri Mar 30 08:53:18 2007
@@ -1,11 +1,13 @@
package org.apache.qpid.nclient.amqp.state;
import org.apache.qpid.AMQException;
+import org.apache.qpid.nclient.core.AMQPException;
public interface AMQPStateManager
{
-
- public void addListener(AMQPStateListener l)throws AMQException;
+ public void addListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
+
+ public void removeListener(AMQPStateType stateType, AMQPStateListener l)throws AMQException;
- public void removeListener(AMQPStateListener l)throws AMQException;
+ public void notifyStateChanged(AMQPStateChangedEvent event) throws AMQPException;
}
Modified: incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java (original)
+++ incubator/qpid/branches/client_restructure/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/state/AMQPStateType.java Fri Mar 30 08:53:18 2007
@@ -40,7 +40,7 @@
public String toString()
{
- return "AMQState: id = " + _typeId + " name: " + _typeName;
+ return _typeName;
}
// Connection state
Modified: incubator/qpid/branches/client_restructure/java/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/client_restructure/java/pom.xml?view=diff&rev=524144&r1=524143&r2=524144
==============================================================================
--- incubator/qpid/branches/client_restructure/java/pom.xml (original)
+++ incubator/qpid/branches/client_restructure/java/pom.xml Fri Mar 30 08:53:18 2007
@@ -130,6 +130,7 @@
<module>common</module>
<module>broker</module>
<module>client</module>
+ <module>newclient</module>
<module>cluster</module>
<module>systests</module>
<module>perftests</module>
@@ -382,7 +383,7 @@
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
- <version>1.2</version>
+ <version>1.3</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
@@ -471,6 +472,11 @@
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-newclient</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>