You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/30 00:24:22 UTC
svn commit: r523854 [2/3] - in
/incubator/qpid/branches/qpid.0-9/java/newclient: ./ src/main/java/
src/main/java/org/apache/qpid/nclient/amqp/
src/main/java/org/apache/qpid/nclient/amqp/event/
src/main/java/org/apache/qpid/nclient/amqp/sample/ src/main...
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/amqp/sample/TestClient.java Thu Mar 29 15:24:20 2007
@@ -1,25 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.nclient.amqp.sample;
import java.util.StringTokenizer;
+import java.util.UUID;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslClient;
+import org.apache.qpid.AMQException;
import org.apache.qpid.common.ClientProperties;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionOpenOkBody;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionSecureOkBody;
import org.apache.qpid.framing.ConnectionStartBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.MessageCancelBody;
+import org.apache.qpid.framing.MessageConsumeBody;
+import org.apache.qpid.framing.MessageGetBody;
+import org.apache.qpid.framing.MessageTransferBody;
+import org.apache.qpid.framing.QueueBindBody;
+import org.apache.qpid.framing.QueueDeclareBody;
+import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.QueueDeleteBody;
+import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.QueuePurgeBody;
+import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.nclient.amqp.AMQPCallBack;
+import org.apache.qpid.nclient.amqp.AMQPChannel;
+import org.apache.qpid.nclient.amqp.AMQPClassFactory;
import org.apache.qpid.nclient.amqp.AMQPConnection;
+import org.apache.qpid.nclient.amqp.AMQPExchange;
+import org.apache.qpid.nclient.amqp.AMQPMessage;
+import org.apache.qpid.nclient.amqp.AMQPQueue;
import org.apache.qpid.nclient.transport.AMQPConnectionURL;
import org.apache.qpid.nclient.transport.ConnectionURL;
-import org.apache.qpid.nclient.transport.TransportConnection;
-import org.apache.qpid.nclient.transport.TransportConnectionFactory;
import org.apache.qpid.nclient.transport.TransportConnectionFactory.ConnectionType;
/**
@@ -27,26 +74,34 @@
* Notes this is just a simple demo.
*
* I have used Helper classes to keep the code cleaner.
+ * Will break this into unit tests later on
*/
+
+@SuppressWarnings("unused")
public class TestClient
{
- private byte major;
- private byte minor;
+ private byte _major;
+ private byte _minor;
private ConnectionURL _url;
+ private static int _channel = 2;
+ // Need a Class factory per connection
+ private AMQPClassFactory _classFactory = new AMQPClassFactory();
+ private int _ticket;
public AMQPConnection openConnection() throws Exception
{
- _url = new AMQPConnectionURL("");
- TransportConnection conn = TransportConnectionFactory.createTransportConnection(_url, ConnectionType.VM);
- return new AMQPConnection(conn);
+ //_url = new AMQPConnectionURL("amqp://guest:guest@test/localhost?brokerlist='vm://:3'");
+
+ _url = new AMQPConnectionURL("amqp://guest:guest@test/test?brokerlist='tcp://localhost:5672?'");
+ return _classFactory.createConnectionClass(_url,ConnectionType.TCP);
}
- public void handleProtocolNegotiation(AMQPConnection con) throws Exception
+ public void handleConnectionNegotiation(AMQPConnection con) throws Exception
{
// ConnectionStartBody
ConnectionStartBody connectionStartBody = con.openTCPConnection();
- major = connectionStartBody.getMajor();
- minor = connectionStartBody.getMajor();
+ _major = connectionStartBody.getMajor();
+ _minor = connectionStartBody.getMinor();
FieldTable clientProperties = FieldTableFactory.newFieldTable();
clientProperties.put(new AMQShortString(ClientProperties.instance.toString()),"Test"); // setting only the client id
@@ -61,33 +116,335 @@
null, SecurityHelper.createCallbackHandler(mechanism,_url));
ConnectionStartOkBody connectionStartOkBody =
- ConnectionStartOkBody.createMethodBody(major, minor, clientProperties,
+ ConnectionStartOkBody.createMethodBody(_major, _minor, clientProperties,
new AMQShortString(tokenizer.nextToken()),
new AMQShortString(mechanism),
(sc.hasInitialResponse() ? sc.evaluateChallenge(new byte[0]) : null));
// ConnectionSecureBody
- ConnectionSecureBody connectionSecureBody = con.startOk(connectionStartOkBody);
+ AMQMethodBody body = con.startOk(connectionStartOkBody);
+ ConnectionTuneBody connectionTuneBody;
- ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(
- major,minor,sc.evaluateChallenge(connectionSecureBody.getChallenge()));
+ if (body instanceof ConnectionSecureBody)
+ {
+ ConnectionSecureBody connectionSecureBody = (ConnectionSecureBody)body;
+ ConnectionSecureOkBody connectionSecureOkBody = ConnectionSecureOkBody.createMethodBody(
+ _major,_minor,sc.evaluateChallenge(connectionSecureBody.getChallenge()));
+ //Assuming the server is not going to send another challenge
+ connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody);
+
+ }
+ else
+ {
+ connectionTuneBody = (ConnectionTuneBody)body;
+ }
- // Assuming the server is not going to send another challenge
- ConnectionTuneBody connectionTuneBody = (ConnectionTuneBody)con.secureOk(connectionSecureOkBody);
// Using broker supplied values
ConnectionTuneOkBody connectionTuneOkBody =
- ConnectionTuneOkBody.createMethodBody(major,minor,
+ ConnectionTuneOkBody.createMethodBody(_major,_minor,
connectionTuneBody.getChannelMax(),
connectionTuneBody.getFrameMax(),
connectionTuneBody.getHeartbeat());
con.tuneOk(connectionTuneOkBody);
+
+ ConnectionOpenBody connectionOpenBody =
+ ConnectionOpenBody.createMethodBody(_major,_minor,null, true,new AMQShortString(_url.getVirtualHost()));
+
+ ConnectionOpenOkBody connectionOpenOkBody = con.open(connectionOpenBody);
+ }
+
+ public void handleChannelNegotiation() throws Exception
+ {
+ AMQPChannel channel = _classFactory.createChannelClass(_channel);
+
+ ChannelOpenBody channelOpenBody = ChannelOpenBody.createMethodBody(_major, _minor, new AMQShortString("myChannel1"));
+ ChannelOpenOkBody channelOpenOkBody = channel.open(channelOpenBody);
+
+ //lets have some fun
+ ChannelFlowBody channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, false);
+
+ ChannelFlowOkBody channelFlowOkBody = channel.flow(channelFlowBody);
+ System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
+
+ channelFlowBody = ChannelFlowBody.createMethodBody(_major, _minor, true);
+ channelFlowOkBody = channel.flow(channelFlowBody);
+ System.out.println("Channel is " + (channelFlowOkBody.getActive()? "active" : "suspend"));
+ }
+
+ public void createExchange() throws Exception
+ {
+ AMQPExchange exchange = _classFactory.createExchangeClass(_channel);
+
+ ExchangeDeclareBody exchangeDeclareBody =
+ ExchangeDeclareBody.createMethodBody(_major, _minor,
+ null, // arguments
+ false,//auto delete
+ false,// durable
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),
+ true, //internal
+ false,// nowait
+ false,// passive
+ _ticket,
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS));
+
+ AMQPCallBack cb = createCallBackWithMessage("Broker has created the exchange");
+ exchange.declare(exchangeDeclareBody, cb);
+ // Blocking for response
+ while (!cb.isComplete()){}
+ }
+
+
+ public void createAndBindQueue()throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueueDeclareBody queueDeclareBody =
+ QueueDeclareBody.createMethodBody(_major, _minor,
+ null, //arguments
+ false,//auto delete
+ false,// durable
+ false, //exclusive,
+ false, //nowait,
+ false, //passive,
+ new AMQShortString("MyTestQueue"),
+ 0);
+
+ AMQPCallBack cb = new AMQPCallBack(){
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueueDeclareOkBody queueDeclareOkBody = (QueueDeclareOkBody)body;
+ System.out.println("[Broker has created the queue, " +
+ "message count " + queueDeclareOkBody.getMessageCount() +
+ "consumer count " + queueDeclareOkBody.getConsumerCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.declare(queueDeclareBody, cb);
+ //Blocking for response
+ while (!cb.isComplete()){}
+
+ QueueBindBody queueBindBody =
+ QueueBindBody.createMethodBody(_major, _minor,
+ null, //arguments
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),//exchange
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ new AMQShortString("RH"), //routingKey
+ 0 //ticket
+ );
+
+ cb = createCallBackWithMessage("Broker has bound the queue");
+ queue.bind(queueBindBody, cb);
+ //Blocking for response
+ while (!cb.isComplete()){}
+ }
+
+ public void purgeQueue()throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueuePurgeBody queuePurgeBody =
+ QueuePurgeBody.createMethodBody(_major, _minor,
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = new AMQPCallBack(){
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueuePurgeOkBody queuePurgeOkBody = (QueuePurgeOkBody)body;
+ System.out.println("[Broker has purged the queue, message count " + queuePurgeOkBody.getMessageCount() + "]\n");
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.purge(queuePurgeBody, cb);
+ //Blocking for response
+ while (!cb.isComplete()){}
+
}
+
+ public void deleteQueue()throws Exception
+ {
+ AMQPQueue queue = _classFactory.createQueueClass(_channel);
+
+ QueueDeleteBody queueDeleteBody =
+ QueueDeleteBody.createMethodBody(_major, _minor,
+ false, //ifEmpty
+ false, //ifUnused
+ false, //nowait
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = new AMQPCallBack(){
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ QueueDeleteOkBody queueDeleteOkBody = (QueueDeleteOkBody)body;
+ System.out.println("[Broker has deleted the queue, message count " + queueDeleteOkBody.getMessageCount() + "]\n");
+ }
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ queue.delete(queueDeleteBody, cb);
+ //Blocking for response
+ while (!cb.isComplete()){}
+
+ }
+
+ public void publishAndSubscribe() throws Exception
+ {
+ AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
+ MessageConsumeBody messageConsumeBody = MessageConsumeBody.createMethodBody(_major, _minor,
+ new AMQShortString("myClient"),// destination
+ false, //exclusive
+ null, //filter
+ false, //noAck,
+ false, //noLocal,
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ AMQPCallBack cb = createCallBackWithMessage("Broker has accepted the consume");
+ message.consume(messageConsumeBody, cb);
+ //Blocking for response
+ while (!cb.isComplete()){}
+
+ // Sending 5 messages serially
+ for (int i=0; i<5; i++)
+ {
+ cb = createCallBackWithMessage("Broker has accepted msg " + i);
+ message.transfer(createMessages("Test" + i),cb);
+ while (!cb.isComplete()){}
+ }
+
+ MessageCancelBody messageCancelBody = MessageCancelBody.createMethodBody(_major, _minor, new AMQShortString("myClient"));
+
+ AMQPCallBack cb2 = createCallBackWithMessage("Broker has accepted the consume cancel");
+ message.cancel(messageCancelBody, cb2);
+
+ }
+
+ private MessageTransferBody createMessages(String content) throws Exception
+ {
+ FieldTable headers = FieldTableFactory.newFieldTable();
+ headers.setAsciiString(new AMQShortString("Test"), System.currentTimeMillis() + "");
+
+ MessageTransferBody messageTransferBody =
+ MessageTransferBody.createMethodBody(_major, _minor,
+ new AMQShortString("testApp"), //appId
+ headers, //applicationHeaders
+ new Content(Content.TypeEnum.INLINE_T,content.getBytes()), //body
+ new AMQShortString(""), //contentEncoding,
+ new AMQShortString("text/plain"), //contentType
+ new AMQShortString("testApp"), //correlationId
+ (short)1, //deliveryMode non persistant
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// destination
+ new AMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_NAME),// exchange
+ 0l, //expiration
+ false, //immediate
+ false, //mandatory
+ new AMQShortString(UUID.randomUUID().toString()), //messageId
+ (short)0, //priority
+ false, //redelivered
+ new AMQShortString("RH"), //replyTo
+ new AMQShortString("RH"), //routingKey,
+ "abc".getBytes(), //securityToken
+ 0, //ticket
+ System.currentTimeMillis(), //timestamp
+ new AMQShortString(""), //transactionId
+ 0l, //ttl,
+ new AMQShortString("Hello") //userId
+ );
+
+ return messageTransferBody;
+
+ }
+
+ public void publishAndGet() throws Exception
+ {
+ AMQPMessage message = _classFactory.createMessageClass(_channel,new MessageHelper());
+ AMQPCallBack cb = createCallBackWithMessage("Broker has accepted msg 5");
+
+ MessageGetBody messageGetBody =
+ MessageGetBody.createMethodBody(_major, _minor,
+ new AMQShortString("myClient"),
+ false, //noAck
+ new AMQShortString("MyTestQueue"), //queue
+ 0 //ticket
+ );
+
+ //AMQPMessage message = _classFactory.createMessage(_channel,new MessageHelper());
+ message.transfer(createMessages("Test"),cb);
+ while(!cb.isComplete()){}
+
+ cb = createCallBackWithMessage("Broker has accepted get");
+ message.get(messageGetBody, cb);
+ }
+
+ // Creates a gneric call back and prints the given message
+ private AMQPCallBack createCallBackWithMessage(final String msg)
+ {
+ AMQPCallBack cb = new AMQPCallBack(){
+
+ @Override
+ public void brokerResponded(AMQMethodBody body)
+ {
+ System.out.println(msg);
+ }
+
+ @Override
+ public void brokerRespondedWithError(AMQException e)
+ {
+ }
+
+ };
+
+ return cb;
+ }
+
public static void main(String[] args)
{
TestClient test = new TestClient();
- AMQPConnection con = test.openConnection();
-
+ try
+ {
+ AMQPConnection con = test.openConnection();
+ test.handleConnectionNegotiation(con);
+ test.handleChannelNegotiation();
+ test.createExchange();
+ test.createAndBindQueue();
+ test.publishAndSubscribe();
+ test.purgeQueue();
+ test.publishAndGet();
+ test.deleteQueue();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java Thu Mar 29 15:24:20 2007
@@ -4,6 +4,9 @@
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
+import java.util.TreeMap;
+
+import javax.security.sasl.SaslClientFactory;
import org.apache.commons.configuration.CombinedConfiguration;
import org.apache.commons.configuration.ConfigurationException;
@@ -11,6 +14,7 @@
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.security.AMQPCallbackHandler;
/**
* Loads a properties file from classpath.
@@ -64,21 +68,33 @@
public static void main(String[] args)
{
- System.out.println(ClientConfiguration.get().getString(QpidConstants.USE_SHARED_READ_WRITE_POOL));
-
- //System.out.println(ClientConfiguration.get().getString("methodListeners.methodListener(1).[@class]"));
- int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER);
- System.out.println(count);
-
- for(int i=0 ;i<count;i++)
- {
- String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")";
- System.out.println("\n\n"+ClientConfiguration.get().getString(methodListener + QpidConstants.CLASS));
- List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS);
- for(String s:list)
- {
- System.out.println(s);
- }
- }
+ String key = QpidConstants.AMQP_SECURITY + "." +
+ QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES + "." +
+ QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY;
+
+ TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
+ new TreeMap<String, Class<? extends SaslClientFactory>>();
+
+ int index = ClientConfiguration.get().getMaxIndex(key);
+
+ for (int i=0; i<index+1;i++)
+ {
+ String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]");
+ String className = ClientConfiguration.get().getString(key + "(" + i + ")" );
+ try
+ {
+ Class<?> clazz = Class.forName(className);
+ if (!(SaslClientFactory.class.isAssignableFrom(clazz)))
+ {
+ _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping");
+ continue;
+ }
+ factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
+ }
+ catch (Exception ex)
+ {
+ _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping");
+ }
+ }
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml Thu Mar 29 15:24:20 2007
@@ -1,86 +1,36 @@
<?xml version="1.0" encoding="ISO-8859-1" ?>
<qpidClientConfig>
-<security>
- <saslClientFactoryTypes>
- <saslClientFactory type="AMQPLAIN">org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory>
- </saslClientFactoryTypes>
- <securityMechanisms>
- <securityMechanismHandler type="PLAIN">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
- <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
- </securityMechanisms>
-</security>
-
-<!-- Transport Layer properties -->
-<useSharedReadWritePool>true</useSharedReadWritePool>
-<enableDirectBuffers>true</enableDirectBuffers>
-<enablePooledAllocator>false</enablePooledAllocator>
-<tcpNoDelay>true</tcpNoDelay>
-<sendBufferSizeInKb>32</sendBufferSizeInKb>
-<reciveBufferSizeInKb>32</reciveBufferSizeInKb>
-<qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass>
-
-<!-- Execution Layer properties -->
-<maxAccumilatedResponses>20</maxAccumilatedResponses>
-
-<!-- Model Phase properties -->
-<serverTimeoutInMilliSeconds>1000</serverTimeoutInMilliSeconds>
-<maxAccumilatedResponses>20</maxAccumilatedResponses>
-<stateManager></stateManager>
-<stateListerners>
- <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CONNECTION_STATE">
- <stateListerner></stateListerner>
- </stateType>
- <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CHANNEL_STATE">
- <stateListerner></stateListerner>
- </stateType>
-</stateListerners>
-
-<methodListeners>
- <methodListener class="org.apache.qpid.nclient.amqp.AMQPConnection">
- <methodClass>org.apache.qpid.framing.ConnectionStartBody</methodClass>
- <methodClass>org.apache.qpid.framing.ConnectionSecureBody</methodClass>
- <methodClass>org.apache.qpid.framing.ConnectionTuneBody</methodClass>
- <methodClass>org.apache.qpid.framing.ConnectionOpenOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.ConnectionCloseBody</methodClass>
- <methodClass>org.apache.qpid.framing.ConnectionCloseOkBody</methodClass>
-
- <methodClass>org.apache.qpid.framing.ChannelOpenOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.ChannelCloseBody</methodClass>
- <methodClass>org.apache.qpid.framing.ChannelCloseOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.ChannelFlowBody</methodClass>
- <methodClass>org.apache.qpid.framing.ChannelFlowOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.ChannelOkBody</methodClass>
-
- <methodClass>org.apache.qpid.framing.ExchangeDeclareOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.ExchangeDeleteOkBody</methodClass>
-
- <methodClass>org.apache.qpid.framing.QueueDeclareOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.QueueBindOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.QueueUnbindOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.QueuePurgeOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.QueueDeleteOkBody</methodClass>
-
- <methodClass>org.apache.qpid.framing.MessageAppendBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageCancelBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageCheckpointBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageCloseBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageGetBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageOffsetBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageOkBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageOpenBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageQosBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageRecoverBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageRejectBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageResumeBody</methodClass>
- <methodClass>org.apache.qpid.framing.MessageTransferBody</methodClass>
- </methodListener>
-</methodListeners>
-
-<phasePipe>
- <phase index="0">org.apache.qpid.nclient.transport.TransportPhase<phase>
- <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase<phase>
- <phase index="2">org.apache.qpid.nclient.model.ModelPhase<phase>
-</phasePipe>
+ <security>
+ <saslClientFactoryTypes>
+ <saslClientFactory type="AMQPLAIN">org.apache.qpid.nclient.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory>
+ </saslClientFactoryTypes>
+ <securityMechanisms>
+ <securityMechanismHandler type="PLAIN">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+ <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.nclient.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+ </securityMechanisms>
+ </security>
+
+ <!-- Transport Layer properties -->
+ <useSharedReadWritePool>false</useSharedReadWritePool>
+ <enableDirectBuffers>true</enableDirectBuffers>
+ <enablePooledAllocator>false</enablePooledAllocator>
+ <tcpNoDelay>true</tcpNoDelay>
+ <sendBufferSizeInKb>32</sendBufferSizeInKb>
+ <reciveBufferSizeInKb>32</reciveBufferSizeInKb>
+ <qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass>
+
+ <!-- Execution Layer properties -->
+ <maxAccumilatedResponses>20</maxAccumilatedResponses>
+
+ <!-- Model Phase properties -->
+ <serverTimeoutInMilliSeconds>60000</serverTimeoutInMilliSeconds>
+ <maxAccumilatedResponses>20</maxAccumilatedResponses>
+
+ <phasePipe>
+ <phase index="0">org.apache.qpid.nclient.transport.TransportPhase</phase>
+ <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase</phase>
+ <phase index="2">org.apache.qpid.nclient.model.ModelPhase</phase>
+ </phasePipe>
</qpidClientConfig>
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java Thu Mar 29 15:24:20 2007
@@ -24,19 +24,22 @@
*/
public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
{
+ String key = QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE;
Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
- List<String> list = ClientConfiguration.get().getList(QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE);
+ List<String> list = ClientConfiguration.get().getList(key);
+ int index = 0;
for(String s:list)
{
try
{
- Phase temp = (Phase)Class.forName(ClientConfiguration.get().getString(s)).newInstance();
- phaseMap.put(ClientConfiguration.get().getInt(s + "." + QpidConstants.INDEX),temp) ;
+ Phase temp = (Phase)Class.forName(s).newInstance();
+ phaseMap.put(ClientConfiguration.get().getInt(key + "(" + index + ")." + QpidConstants.INDEX),temp) ;
}
catch(Exception e)
{
throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e);
}
+ index++;
}
Phase current = null;
@@ -46,7 +49,7 @@
for (int i=0; i<phaseMap.size();i++)
{
current = phaseMap.get(i);
- if (1+1 < phaseMap.size())
+ if (i+1 < phaseMap.size())
{
next = phaseMap.get(i+1);
}
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java Thu Mar 29 15:24:20 2007
@@ -10,13 +10,7 @@
// Phase Context properties
public final static String AMQP_BROKER_DETAILS = "AMQP_BROKER_DETAILS";
public final static String MINA_IO_CONNECTOR = "MINA_IO_CONNECTOR";
- //public final static String AMQP_MAJOR_VERSION = "AMQP_MAJOR_VERSION";
- //public final static String AMQP_MINOR_VERSION = "AMQP_MINOR_VERSION";
- //public final static String AMQP_SASL_CLIENT = "AMQP_SASL_CLIENT";
- //public final static String AMQP_CLIENT_ID = "AMQP_CLIENT_ID";
- //public final static String AMQP_CONNECTION_TUNE_PARAMETERS = "AMQP_CONNECTION_TUNE_PARAMETERS";
- //public final static String AMQP_VIRTUAL_HOST = "AMQP_VIRTUAL_HOST";
- //public final static String AMQP_MESSAGE_STORE = "AMQP_MESSAGE_STORE";
+ public final static String EVENT_MANAGER = "EVENT_MANAGER";
/**---------------------------------------------------------------
* Configuration file properties
@@ -24,17 +18,7 @@
*/
// Model Layer properties
- public final static String STATE_MANAGER = "stateManager";
- public final static String METHOD_LISTENERS = "methodListeners";
- public final static String METHOD_LISTENER = "methodListener";
- public final static String CLASS = "[@class]";
- public final static String METHOD_CLASS = "methodClass";
- public final static String STATE_LISTENERS = "stateListeners";
- public final static String STATE_LISTENER = "stateListener";
- public final static String STATE_TYPE = "stateType";
-
- public final static String AMQP_MESSAGE_STORE_CLASS = "AMQP_MESSAGE_STORE_CLASS";
public final static String SERVER_TIMEOUT_IN_MILLISECONDS = "serverTimeoutInMilliSeconds";
// MINA properties
@@ -50,6 +34,7 @@
public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY = "saslClientFactory";
public final static String TYPE = "[@type]";
+ public final static String AMQP_SECURITY = "security";
public final static String AMQP_SECURITY_MECHANISMS = "securityMechanisms";
public final static String AMQP_SECURITY_MECHANISM_HANDLER = "securityMechanismHandler";
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java Thu Mar 29 15:24:20 2007
@@ -10,142 +10,179 @@
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
-import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.AbstractPhase;
import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
-import org.apache.qpid.protocol.AMQMethodEvent;
/**
* Corressponds to the Layer 2 in AMQP.
* This phase handles the correlation of amqp messages
* This class implements the 0.9 spec (request/response)
*/
-public class ExecutionPhase extends AbstractPhase{
+public class ExecutionPhase extends AbstractPhase
+{
+
+ protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);
- protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);
protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
+
protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();
-
- /**
- * --------------------------------------------------
- * Phase related methods
- * --------------------------------------------------
- */
-
+ /**
+ * --------------------------------------------------
+ * Phase related methods
+ * --------------------------------------------------
+ */
+
// should add these in the init method
//_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
//_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
-
- public void messageReceived(Object msg) throws AMQPException
- {
- AMQFrame frame = (AMQFrame) msg;
- final AMQBody bodyFrame = frame.getBodyFrame();
-
- if (bodyFrame instanceof AMQRequestBody)
- {
- AMQPMethodEvent event;
- try
- {
- event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody)bodyFrame);
- super.messageReceived(event);
- }
- catch (Exception e)
- {
- _logger.error("Error handling request",e);
- }
-
- }
- else if (bodyFrame instanceof AMQResponseBody)
- {
- List<AMQPMethodEvent> events;
- try
- {
- events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody)bodyFrame);
- for (AMQPMethodEvent event: events)
- {
- super.messageReceived(event);
- }
- }
- catch (Exception e)
- {
- _logger.error("Error handling response",e);
- }
- }
- }
-
- /**
- * Need to figure out if the message is a request or a response
- * that needs to be sent and then delegate it to the Request or response manager
- * to prepare it.
- */
- public void messageSent(Object msg) throws AMQPException
+ public void messageReceived(Object msg) throws AMQPException
+ {
+ AMQFrame frame = (AMQFrame) msg;
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ if (bodyFrame instanceof AMQRequestBody)
{
- AMQPMethodEvent evt = (AMQPMethodEvent)msg;
- if(evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
+ AMQPMethodEvent event;
+ try
+ {
+ event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody) bodyFrame);
+ super.messageReceived(event);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling request", e);
+ }
+
+ }
+ else if (bodyFrame instanceof AMQResponseBody)
+ {
+ List<AMQPMethodEvent> events;
+ try
+ {
+ events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody) bodyFrame);
+ for (AMQPMethodEvent event : events)
{
- // This is a request
- AMQFrame frame = handleRequest(evt);
- super.messageSent(frame);
+ super.messageReceived(event);
}
- else
- {
-// This is a response
- List<AMQFrame> frames = handleResponse(evt);
- for(AMQFrame frame: frames)
- {
- super.messageSent(frame);
- }
- }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling response", e);
+ }
+ }
+ }
+
+ /**
+ * Need to figure out if the message is a request or a response
+ * that needs to be sent and then delegate it to the Request or response manager
+ * to prepare it.
+ */
+ public void messageSent(Object msg) throws AMQPException
+ {
+ AMQPMethodEvent evt = (AMQPMethodEvent) msg;
+ if (evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
+ {
+ // This is a request
+ AMQFrame frame = handleRequest(evt);
+ super.messageSent(frame);
}
+ else
+ {
+ // This is a response
+ List<AMQFrame> frames = handleResponse(evt);
+ for (AMQFrame frame : frames)
+ {
+ super.messageSent(frame);
+ }
+ }
+ }
- /**
- * ------------------------------------------------
- * Methods to handle request response
- * -----------------------------------------------
- */
+ /**
+ * ------------------------------------------------
+ * Methods to handle request response
+ * -----------------------------------------------
+ */
private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Request frame received: " + requestBody);
- }
- ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelId);
- if (responseManager == null)
- throw new AMQException("Unable to find ResponseManager for channel " + channelId);
- return responseManager.requestReceived(requestBody);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request frame received: " + requestBody);
+ }
+
+ ResponseManager responseManager;
+ if(_channelId2ResponseMgrMap.containsKey(channelId))
+ {
+ responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);
+ }
+ else
+ {
+ responseManager = new ResponseManager(0,channelId,false);
+ _channelId2ResponseMgrMap.put(channelId, responseManager);
+ }
+ return responseManager.requestReceived(requestBody);
}
-
- private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) throws Exception
+
+ private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody)
+ throws Exception
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Response frame received: " + responseBody);
- }
- RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelId);
- if (requestManager == null)
- throw new AMQException("Unable to find RequestManager for channel " + channelId);
- return requestManager.responseReceived(responseBody);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Response frame received: " + responseBody);
+ }
+
+ RequestManager requestManager;
+ if (_channelId2RequestMgrMap.containsKey(channelId))
+ {
+ requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
+ }
+ else
+ {
+ requestManager = new RequestManager(0,channelId,false);
+ _channelId2RequestMgrMap.put(channelId, requestManager);
+ }
+
+ return requestManager.responseReceived(responseBody);
}
-
+
private AMQFrame handleRequest(AMQPMethodEvent evt)
{
- RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(evt.getChannelId());
- return requestManager.sendRequest(evt);
+ int channelId = evt.getChannelId();
+ RequestManager requestManager;
+ if (_channelId2RequestMgrMap.containsKey(channelId))
+ {
+ requestManager = (RequestManager) _channelId2RequestMgrMap.get(channelId);
+ }
+ else
+ {
+ requestManager = new RequestManager(0,channelId,false);
+ _channelId2RequestMgrMap.put(channelId, requestManager);
+ }
+ return requestManager.sendRequest(evt);
}
-
+
private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException
{
- ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(evt.getChannelId());
- try
- {
- return responseManager.sendResponse(evt);
- }
- catch(Exception e)
- {
- throw new AMQPException("Error handling response",e);
- }
+ int channelId = evt.getChannelId();
+ ResponseManager responseManager;
+ if(_channelId2ResponseMgrMap.containsKey(channelId))
+ {
+ responseManager = (ResponseManager) _channelId2ResponseMgrMap.get(channelId);
+ }
+ else
+ {
+ responseManager = new ResponseManager(0,channelId,false);
+ _channelId2ResponseMgrMap.put(channelId, responseManager);
+ }
+ try
+ {
+ return responseManager.sendResponse(evt);
+ }
+ catch (Exception e)
+ {
+ throw new AMQPException("Error handling response", e);
+ }
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java Thu Mar 29 15:24:20 2007
@@ -29,7 +29,7 @@
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.RequestResponseMappingException;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
public class RequestManager
{
@@ -56,7 +56,7 @@
*/
private long lastProcessedResponseId;
- private ConcurrentHashMap<Long, Long> requestSentMap;
+ private ConcurrentHashMap<Long, CorrelationID> requestSentMap;
public RequestManager(long connectionId, int channel, boolean serverFlag)
{
@@ -65,7 +65,7 @@
this.connectionId = connectionId;
requestIdCount = 1L;
lastProcessedResponseId = 0L;
- requestSentMap = new ConcurrentHashMap<Long, Long>();
+ requestSentMap = new ConcurrentHashMap<Long, CorrelationID>();
}
// *** Functions to originate a request ***
@@ -80,7 +80,7 @@
logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
"] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod());
}
- requestSentMap.put(requestId, evt.getCorrelationId());
+ requestSentMap.put(requestId, new CorrelationID(evt.getCorrelationId(), evt.getLocalCorrelationId()));
return requestFrame;
}
@@ -103,9 +103,9 @@
throw new RequestResponseMappingException(requestId,
"Failed to locate requestId " + requestId + " in requestSentMap.");
}
- long localCorrelationId = requestSentMap.get(requestId);
+ CorrelationID correlationID = requestSentMap.get(requestId);
AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(),
- requestId,localCorrelationId);
+ correlationID.getSystemCorrelationID(),correlationID.getLocalCorrelationID());
events.add(methodEvent);
requestSentMap.remove(requestId);
}
@@ -125,5 +125,41 @@
private long getNextRequestId()
{
return requestIdCount++;
+ }
+
+ private class CorrelationID
+ {
+ // Use for the request/response stuff
+ private long _systemCorrelationID;
+ // used internally to track callbacks
+ private long _localCorrelationID;
+
+ CorrelationID(long systemCorrelationID,long localCorrelationID)
+ {
+ _localCorrelationID = localCorrelationID;
+ _systemCorrelationID = systemCorrelationID;
+ }
+
+ public long getLocalCorrelationID()
+ {
+ return _localCorrelationID;
+ }
+
+ public void setLocalCorrelationID(long correlationID)
+ {
+ _localCorrelationID = correlationID;
+ }
+
+ public long getSystemCorrelationID()
+ {
+ return _systemCorrelationID;
+ }
+
+ public void setSystemCorrelationID(long correlationID)
+ {
+ _systemCorrelationID = correlationID;
+ }
+
+
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java Thu Mar 29 15:24:20 2007
@@ -31,9 +31,9 @@
import org.apache.qpid.framing.AMQRequestBody;
import org.apache.qpid.framing.AMQResponseBody;
import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.config.ClientConfiguration;
import org.apache.qpid.nclient.core.QpidConstants;
-import org.apache.qpid.nclient.model.AMQPMethodEvent;
public class ResponseManager
{
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java Thu Mar 29 15:24:20 2007
@@ -24,8 +24,6 @@
import java.util.LinkedList;
import java.util.List;
-import org.apache.qpid.client.message.MessageHeaders;
-
public class AMQPApplicationMessage {
private int bytesReceived = 0;
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageHeaders.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,679 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+package org.apache.qpid.nclient.message;
+
+import org.apache.log4j.Logger;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import java.util.Enumeration;
+
+public class MessageHeaders
+{
+ private static final Logger _logger = Logger.getLogger(MessageHeaders.class);
+
+ private AMQShortString _contentType;
+
+ private AMQShortString _encoding;
+
+ private AMQShortString _destination;
+
+ private AMQShortString _exchange;
+
+ private FieldTable _jmsHeaders;
+
+ private short _deliveryMode;
+
+ private short _priority;
+
+ private AMQShortString _correlationId;
+
+ private AMQShortString _replyTo;
+
+ private long _expiration;
+
+ private AMQShortString _messageId;
+
+ private long _timestamp;
+
+ private AMQShortString _type;
+
+ private AMQShortString _userId;
+
+ private AMQShortString _appId;
+
+ private AMQShortString _transactionId;
+
+ private AMQShortString _routingKey;
+
+ private int _size;
+
+ public int getSize()
+ {
+ return _size;
+ }
+
+ public void setSize(int size)
+ {
+ this._size = size;
+ }
+
+ public MessageHeaders()
+ {
+ }
+
+ public AMQShortString getContentType()
+ {
+ return _contentType;
+ }
+
+ public void setContentType(AMQShortString contentType)
+ {
+ _contentType = contentType;
+ }
+
+ public AMQShortString getEncoding()
+ {
+ return _encoding;
+ }
+
+ public void setEncoding(AMQShortString encoding)
+ {
+ _encoding = encoding;
+ }
+
+ public FieldTable getJMSHeaders()
+ {
+ if (_jmsHeaders == null)
+ {
+ setJMSHeaders(FieldTableFactory.newFieldTable());
+ }
+
+ return _jmsHeaders;
+ }
+
+ public void setJMSHeaders(FieldTable headers)
+ {
+ _jmsHeaders = headers;
+ }
+
+
+ public short getDeliveryMode()
+ {
+ return _deliveryMode;
+ }
+
+ public void setDeliveryMode(short deliveryMode)
+ {
+ _deliveryMode = deliveryMode;
+ }
+
+ public short getPriority()
+ {
+ return _priority;
+ }
+
+ public void setPriority(short priority)
+ {
+ _priority = priority;
+ }
+
+ public AMQShortString getCorrelationId()
+ {
+ return _correlationId;
+ }
+
+ public void setCorrelationId(AMQShortString correlationId)
+ {
+ _correlationId = correlationId;
+ }
+
+ public AMQShortString getReplyTo()
+ {
+ return _replyTo;
+ }
+
+ public void setReplyTo(AMQShortString replyTo)
+ {
+ _replyTo = replyTo;
+ }
+
+ public long getExpiration()
+ {
+ return _expiration;
+ }
+
+ public void setExpiration(long expiration)
+ {
+ _expiration = expiration;
+ }
+
+
+ public AMQShortString getMessageId()
+ {
+ return _messageId;
+ }
+
+ public void setMessageId(AMQShortString messageId)
+ {
+ _messageId = messageId;
+ }
+
+ public long getTimestamp()
+ {
+ return _timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ _timestamp = timestamp;
+ }
+
+ public AMQShortString getType()
+ {
+ return _type;
+ }
+
+ public void setType(AMQShortString type)
+ {
+ _type = type;
+ }
+
+ public AMQShortString getUserId()
+ {
+ return _userId;
+ }
+
+ public void setUserId(AMQShortString userId)
+ {
+ _userId = userId;
+ }
+
+ public AMQShortString getAppId()
+ {
+ return _appId;
+ }
+
+ public void setAppId(AMQShortString appId)
+ {
+ _appId = appId;
+ }
+
+ // MapMessage Interface
+
+ public boolean getBoolean(AMQShortString string) throws JMSException
+ {
+ Boolean b = getJMSHeaders().getBoolean(string);
+
+ if (b == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object str = getJMSHeaders().getObject(string);
+
+ if (str == null || !(str instanceof AMQShortString))
+ {
+ throw new MessageFormatException("getBoolean can't use " + string + " item.");
+ }
+ else
+ {
+ return Boolean.valueOf(((AMQShortString)str).asString());
+ }
+ }
+ else
+ {
+ b = Boolean.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public char getCharacter(AMQShortString string) throws JMSException
+ {
+ Character c = getJMSHeaders().getCharacter(string);
+
+ if (c == null)
+ {
+ if (getJMSHeaders().isNullStringValue(string.asString()))
+ {
+ throw new NullPointerException("Cannot convert null char");
+ }
+ else
+ {
+ throw new MessageFormatException("getChar can't use " + string + " item.");
+ }
+ }
+ else
+ {
+ return (char) c;
+ }
+ }
+
+ public byte[] getBytes(AMQShortString string) throws JMSException
+ {
+ byte[] bs = getJMSHeaders().getBytes(string);
+
+ if (bs == null)
+ {
+ throw new MessageFormatException("getBytes can't use " + string + " item.");
+ }
+ else
+ {
+ return bs;
+ }
+ }
+
+ public byte getByte(AMQShortString string) throws JMSException
+ {
+ Byte b = getJMSHeaders().getByte(string);
+ if (b == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object str = getJMSHeaders().getObject(string);
+
+ if (str == null || !(str instanceof AMQShortString))
+ {
+ throw new MessageFormatException("getByte can't use " + string + " item.");
+ }
+ else
+ {
+ return Byte.valueOf(((AMQShortString)str).asString());
+ }
+ }
+ else
+ {
+ b = Byte.valueOf(null);
+ }
+ }
+
+ return b;
+ }
+
+ public short getShort(AMQShortString string) throws JMSException
+ {
+ Short s = getJMSHeaders().getShort(string);
+
+ if (s == null)
+ {
+ s = Short.valueOf(getByte(string));
+ }
+
+ return s;
+ }
+
+ public int getInteger(AMQShortString string) throws JMSException
+ {
+ Integer i = getJMSHeaders().getInteger(string);
+
+ if (i == null)
+ {
+ i = Integer.valueOf(getShort(string));
+ }
+
+ return i;
+ }
+
+ public long getLong(AMQShortString string) throws JMSException
+ {
+ Long l = getJMSHeaders().getLong(string);
+
+ if (l == null)
+ {
+ l = Long.valueOf(getInteger(string));
+ }
+
+ return l;
+ }
+
+ public float getFloat(AMQShortString string) throws JMSException
+ {
+ Float f = getJMSHeaders().getFloat(string);
+
+ if (f == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object str = getJMSHeaders().getObject(string);
+
+ if (str == null || !(str instanceof AMQShortString))
+ {
+ throw new MessageFormatException("getFloat can't use " + string + " item.");
+ }
+ else
+ {
+ return Float.valueOf(((AMQShortString)str).asString());
+ }
+ }
+ else
+ {
+ f = Float.valueOf(null);
+ }
+
+ }
+
+ return f;
+ }
+
+ public double getDouble(AMQShortString string) throws JMSException
+ {
+ Double d = getJMSHeaders().getDouble(string);
+
+ if (d == null)
+ {
+ d = Double.valueOf(getFloat(string));
+ }
+
+ return d;
+ }
+
+ public AMQShortString getString(AMQShortString string) throws JMSException
+ {
+ AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString()));
+
+ if (s == null)
+ {
+ if (getJMSHeaders().containsKey(string))
+ {
+ Object o = getJMSHeaders().getObject(string);
+ if (o instanceof byte[])
+ {
+ throw new MessageFormatException("getObject couldn't find " + string + " item.");
+ }
+ else
+ {
+ if (o == null)
+ {
+ return null;
+ }
+ else
+ {
+ s = (AMQShortString) o;
+ }
+ }
+ }
+ }
+
+ return s;
+ }
+
+ public Object getObject(AMQShortString string) throws JMSException
+ {
+ return getJMSHeaders().getObject(string);
+ }
+
+ public void setBoolean(AMQShortString string, boolean b) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setBoolean(string, b);
+ }
+
+ public void setChar(AMQShortString string, char c) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setChar(string, c);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes)
+ {
+ return getJMSHeaders().setBytes(string, bytes);
+ }
+
+ public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
+ {
+ return getJMSHeaders().setBytes(string, bytes, start, length);
+ }
+
+ public void setByte(AMQShortString string, byte b) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setByte(string, b);
+ }
+
+ public void setShort(AMQShortString string, short i) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setShort(string, i);
+ }
+
+ public void setInteger(AMQShortString string, int i) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setInteger(string, i);
+ }
+
+ public void setLong(AMQShortString string, long l) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setLong(string, l);
+ }
+
+ public void setFloat(AMQShortString string, float v) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setFloat(string, v);
+ }
+
+ public void setDouble(AMQShortString string, double v) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setDouble(string, v);
+ }
+
+ public void setString(AMQShortString string, AMQShortString string1) throws JMSException
+ {
+ checkPropertyName(string);
+ getJMSHeaders().setString(string.asString(), string1.asString());
+ }
+
+ public void setObject(AMQShortString string, Object object) throws JMSException
+ {
+ checkPropertyName(string);
+ try
+ {
+ getJMSHeaders().setObject(string, object);
+ }
+ catch (AMQPInvalidClassException aice)
+ {
+ throw new MessageFormatException("Only primatives are allowed object is:" + object.getClass());
+ }
+ }
+
+ public boolean itemExists(AMQShortString string) throws JMSException
+ {
+ return getJMSHeaders().containsKey(string);
+ }
+
+ public Enumeration getPropertyNames()
+ {
+ return getJMSHeaders().getPropertyNames();
+ }
+
+ public void clear()
+ {
+ getJMSHeaders().clear();
+ }
+
+ public boolean propertyExists(AMQShortString propertyName)
+ {
+ return getJMSHeaders().propertyExists(propertyName);
+ }
+
+ public Object put(Object key, Object value)
+ {
+ return getJMSHeaders().setObject(key.toString(), value);
+ }
+
+ public Object remove(AMQShortString propertyName)
+ {
+ return getJMSHeaders().remove(propertyName);
+ }
+
+ public boolean isEmpty()
+ {
+ return getJMSHeaders().isEmpty();
+ }
+
+ public void writeToBuffer(ByteBuffer data)
+ {
+ getJMSHeaders().writeToBuffer(data);
+ }
+
+ public Enumeration getMapNames()
+ {
+ return getPropertyNames();
+ }
+
+ protected static void checkPropertyName(CharSequence propertyName)
+ {
+ if (propertyName == null)
+ {
+ throw new IllegalArgumentException("Property name must not be null");
+ }
+ else if (propertyName.length() == 0)
+ {
+ throw new IllegalArgumentException("Property name must not be the empty string");
+ }
+
+ checkIdentiferFormat(propertyName);
+ }
+
+ protected static void checkIdentiferFormat(CharSequence propertyName)
+ {
+// JMS requirements 3.5.1 Property Names
+// Identifiers:
+// - An identifier is an unlimited-length character sequence that must begin
+// with a Java identifier start character; all following characters must be Java
+// identifier part characters. An identifier start character is any character for
+// which the method Character.isJavaIdentifierStart returns true. This includes
+// '_' and '$'. An identifier part character is any character for which the
+// method Character.isJavaIdentifierPart returns true.
+// - Identifiers cannot be the names NULL, TRUE, or FALSE.
+// Ãâ Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
+// ESCAPE.
+// Ãâ Identifiers are either header field references or property references. The
+// type of a property value in a message selector corresponds to the type
+// used to set the property. If a property that does not exist in a message is
+// referenced, its value is NULL. The semantics of evaluating NULL values
+// in a selector are described in Section 3.8.1.2, ÃâNull Values.Ãâ
+// Ãâ The conversions that apply to the get methods for properties do not
+// apply when a property is used in a message selector expression. For
+// example, suppose you set a property as a string value, as in the
+// following:
+// myMessage.setStringProperty("NumberOfOrders", "2");
+// The following expression in a message selector would evaluate to false,
+// because a string cannot be used in an arithmetic expression:
+// "NumberOfOrders > 1"
+// Ãâ Identifiers are case sensitive.
+// Ãâ Message header field references are restricted to JMSDeliveryMode,
+// JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
+// JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
+// null and if so are treated as a NULL value.
+
+ if (Boolean.getBoolean("strict-jms"))
+ {
+ // JMS start character
+ if (!(Character.isJavaIdentifierStart(propertyName.charAt(0))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid JMS identifier start character");
+ }
+
+ // JMS part character
+ int length = propertyName.length();
+ for (int c = 1; c < length; c++)
+ {
+ if (!(Character.isJavaIdentifierPart(propertyName.charAt(c))))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' contains an invalid JMS identifier character");
+ }
+ }
+
+
+
+
+ // JMS invalid names
+ if ((propertyName.equals("NULL")
+ || propertyName.equals("TRUE")
+ || propertyName.equals("FALSE")
+ || propertyName.equals("NOT")
+ || propertyName.equals("AND")
+ || propertyName.equals("OR")
+ || propertyName.equals("BETWEEN")
+ || propertyName.equals("LIKE")
+ || propertyName.equals("IN")
+ || propertyName.equals("IS")
+ || propertyName.equals("ESCAPE")))
+ {
+ throw new IllegalArgumentException("Identifier '" + propertyName + "' is not allowed in JMS");
+ }
+ }
+
+ }
+
+ public AMQShortString getTransactionId()
+ {
+ return _transactionId;
+ }
+
+ public void setTransactionId(AMQShortString id)
+ {
+ _transactionId = id;
+ }
+
+ public AMQShortString getDestination()
+ {
+ return _destination;
+ }
+
+ public void setDestination(AMQShortString destination)
+ {
+ this._destination = destination;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ this._exchange = exchange;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ public void setRoutingKey(AMQShortString routingKey)
+ {
+ this._routingKey = routingKey;
+ }
+}
+
+
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java Thu Mar 29 15:24:20 2007
@@ -1,7 +1,6 @@
package org.apache.qpid.nclient.message;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
public interface MessageStore {
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java Thu Mar 29 15:24:20 2007
@@ -4,7 +4,6 @@
import java.util.concurrent.ConcurrentHashMap;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.message.MessageHeaders;
public class TransientMessageStore implements MessageStore {
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java Thu Mar 29 15:24:20 2007
@@ -1,13 +1,12 @@
package org.apache.qpid.nclient.model;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
-import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
-import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.amqp.event.AMQPEventManager;
+import org.apache.qpid.nclient.amqp.event.AMQPMethodEvent;
import org.apache.qpid.nclient.core.AMQPException;
import org.apache.qpid.nclient.core.AbstractPhase;
import org.apache.qpid.nclient.core.Phase;
@@ -31,15 +30,7 @@
*/
public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase)
{
- super.init(ctx, nextInFlowPhase, nextOutFlowPhase);
- try
- {
- loadMethodListeners();
- }
- catch(Exception e)
- {
- _logger.fatal("Error loading method listeners", e);
- }
+ super.init(ctx, nextInFlowPhase, nextOutFlowPhase);
}
public void messageReceived(Object msg) throws AMQPException
@@ -68,66 +59,13 @@
public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException
{
- if (_methodListners.containsKey(event.getMethod().getClass()))
- {
- List<AMQPMethodListener> listeners = _methodListners.get(event.getMethod().getClass());
-
- if(listeners.size()>0)
- {
- throw new AMQPException("There are no registered listeners for this method");
- }
-
- for(AMQPMethodListener l : listeners)
- {
- try
- {
- l.methodReceived(event);
- }
- catch (Exception e)
- {
- _logger.error("Error handling method event " + event, e);
- }
- }
- }
+ AMQPEventManager eventManager = (AMQPEventManager)_ctx.getProperty(QpidConstants.EVENT_MANAGER);
+ eventManager.notifyEvent(event);
}
/**
* ------------------------------------------------
* Configuration
* ------------------------------------------------
- */
-
- /**
- * This method loads method listeners from the client.xml file
- * For each method class there is a list of listeners
- */
- private void loadMethodListeners() throws Exception
- {
- int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER);
- System.out.println(count);
-
- for(int i=0 ;i<count;i++)
- {
- String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")";
- String className = ClientConfiguration.get().getString(methodListener + "." + QpidConstants.CLASS);
- Class listenerClass = Class.forName(className);
- List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS);
- for(String s:list)
- {
- List listeners;
- Class methodClass = Class.forName(s);
- if (_methodListners.containsKey(methodClass))
- {
- listeners = _methodListners.get(methodClass);
- }
- else
- {
- listeners = new ArrayList();
- _methodListners.put(methodClass,listeners);
- }
- listeners.add(listenerClass);
- }
- }
- }
-
+ */
}
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java Thu Mar 29 15:24:20 2007
@@ -62,11 +62,16 @@
private void parseProperties()
{
- List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_MECHANISMS);
-
- for (String mechanism : mechanisms)
+ String key = QpidConstants.AMQP_SECURITY + "." +
+ QpidConstants.AMQP_SECURITY_MECHANISMS + "." +
+ QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER;
+
+ int index = ClientConfiguration.get().getMaxIndex(key);
+
+ for (int i=0; i<index+1;i++)
{
- String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER + "_" + mechanism);
+ String mechanism = ClientConfiguration.get().getString(key + "(" + i + ")[@type]");
+ String className = ClientConfiguration.get().getString(key + "(" + i + ")" );
Class clazz = null;
try
{
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClient.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security.amqplain;
+
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTableFactory;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.Callback;
+
+/**
+ * Implements the "AMQPlain" authentication protocol that uses FieldTables to send username and pwd.
+ *
+ */
+public class AmqPlainSaslClient implements SaslClient
+{
+ /**
+ * The name of this mechanism
+ */
+ public static final String MECHANISM = "AMQPLAIN";
+
+ private CallbackHandler _cbh;
+
+ public AmqPlainSaslClient(CallbackHandler cbh)
+ {
+ _cbh = cbh;
+ }
+
+ public String getMechanismName()
+ {
+ return "AMQPLAIN";
+ }
+
+ public boolean hasInitialResponse()
+ {
+ return true;
+ }
+
+ public byte[] evaluateChallenge(byte[] challenge) throws SaslException
+ {
+ // we do not care about the prompt or the default name
+ NameCallback nameCallback = new NameCallback("prompt", "defaultName");
+ PasswordCallback pwdCallback = new PasswordCallback("prompt", false);
+ Callback[] callbacks = new Callback[]{nameCallback, pwdCallback};
+ try
+ {
+ _cbh.handle(callbacks);
+ }
+ catch (Exception e)
+ {
+ throw new SaslException("Error handling SASL callbacks: " + e, e);
+ }
+ FieldTable table = FieldTableFactory.newFieldTable();
+ table.setString("LOGIN", nameCallback.getName());
+ table.setString("PASSWORD", new String(pwdCallback.getPassword()));
+ return table.getDataAsBytes();
+ }
+
+ public boolean isComplete()
+ {
+ return true;
+ }
+
+ public byte[] unwrap(byte[] incoming, int offset, int len) throws SaslException
+ {
+ throw new SaslException("Not supported");
+ }
+
+ public byte[] wrap(byte[] outgoing, int offset, int len) throws SaslException
+ {
+ throw new SaslException("Not supported");
+ }
+
+ public Object getNegotiatedProperty(String propName)
+ {
+ return null;
+ }
+
+ public void dispose() throws SaslException
+ {
+ _cbh = null;
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java?view=auto&rev=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/amqplain/AmqPlainSaslClientFactory.java Thu Mar 29 15:24:20 2007
@@ -0,0 +1,62 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security.amqplain;
+
+import javax.security.sasl.SaslClientFactory;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.Sasl;
+import javax.security.auth.callback.CallbackHandler;
+import java.util.Map;
+
+public class AmqPlainSaslClientFactory implements SaslClientFactory
+{
+ public SaslClient createSaslClient(String[] mechanisms, String authorizationId, String protocol, String serverName, Map props, CallbackHandler cbh) throws SaslException
+ {
+ for (int i = 0; i < mechanisms.length; i++)
+ {
+ if (mechanisms[i].equals(AmqPlainSaslClient.MECHANISM))
+ {
+ if (cbh == null)
+ {
+ throw new SaslException("CallbackHandler must not be null");
+ }
+ return new AmqPlainSaslClient(cbh);
+ }
+ }
+ return null;
+ }
+
+ public String[] getMechanismNames(Map props)
+ {
+ if (props.containsKey(Sasl.POLICY_NOPLAINTEXT) ||
+ props.containsKey(Sasl.POLICY_NODICTIONARY) ||
+ props.containsKey(Sasl.POLICY_NOACTIVE))
+ {
+ // returned array must be non null according to interface documentation
+ return new String[0];
+ }
+ else
+ {
+ return new String[]{AmqPlainSaslClient.MECHANISM};
+ }
+ }
+}
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TCPConnection.java Thu Mar 29 15:24:20 2007
@@ -22,10 +22,12 @@
private BrokerDetails _brokerDetails;
private IoConnector _ioConnector;
private Phase _phase;
+ private PhaseContext _ctx;
- protected TCPConnection(ConnectionURL url)
+ protected TCPConnection(ConnectionURL url, PhaseContext ctx)
{
_brokerDetails = url.getBrokerDetails(0);
+ _ctx = ctx;
ByteBuffer.setUseDirectBuffers(ClientConfiguration.get().getBoolean(QpidConstants.ENABLE_DIRECT_BUFFERS));
@@ -41,8 +43,8 @@
ByteBuffer.setAllocator(new SimpleByteBufferAllocator());
}
- final IoConnector ioConnector = new SocketConnector();
- SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
+ _ioConnector = new SocketConnector();
+ SocketConnectorConfig cfg = (SocketConnectorConfig) _ioConnector.getDefaultConfig();
// if we do not use our own thread model we get the MINA default which is to use
// its own leader-follower model
@@ -59,12 +61,11 @@
// Returns the phase pipe
public Phase connect() throws AMQPException
- {
- PhaseContext ctx = new DefaultPhaseContext();
- ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
- ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
+ {
+ _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails);
+ _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector);
- _phase = PhaseFactory.createPhasePipe(ctx);
+ _phase = PhaseFactory.createPhasePipe(_ctx);
_phase.start();
return _phase;
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java?view=diff&rev=523854&r1=523853&r2=523854
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportConnectionFactory.java Thu Mar 29 15:24:20 2007
@@ -2,6 +2,8 @@
import java.net.URISyntaxException;
+import org.apache.qpid.nclient.core.PhaseContext;
+
public class TransportConnectionFactory
{
public enum ConnectionType
@@ -9,36 +11,26 @@
TCP,VM
}
- public static TransportConnection createTransportConnection(String url,ConnectionType type) throws URISyntaxException
+ public static TransportConnection createTransportConnection(String url,ConnectionType type, PhaseContext ctx) throws URISyntaxException
{
- return createTransportConnection(new AMQPConnectionURL(url),type);
+ return createTransportConnection(new AMQPConnectionURL(url),type,ctx);
}
- public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type)
+ public static TransportConnection createTransportConnection(ConnectionURL url,ConnectionType type, PhaseContext ctx)
{
switch (type)
{
case TCP : default:
{
- return createTCPConnection(url);
+ return new TCPConnection(url,ctx);
}
case VM :
{
- return createVMConnection(url);
+ return new VMConnection(url,ctx);
}
}
- }
-
- private static TransportConnection createTCPConnection(ConnectionURL url)
- {
- return new TCPConnection(url);
- }
-
- private static TransportConnection createVMConnection(ConnectionURL url)
- {
- return null;
}
}