You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/03/27 18:31:15 UTC
svn commit: r522988 [2/3] - in
/incubator/qpid/branches/qpid.0-9/java/newclient: ./ src/ src/main/
src/main/java/ src/main/java/org/ src/main/java/org/apache/
src/main/java/org/apache/qpid/ src/main/java/org/apache/qpid/nclient/
src/main/java/org/apach...
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/ClientConfiguration.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,84 @@
+package org.apache.qpid.nclient.config;
+
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.configuration.CombinedConfiguration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.SystemConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+/**
+ * Loads a properties file from classpath.
+ * These values can be overwritten using system properties
+ */
+public class ClientConfiguration extends CombinedConfiguration {
+
+ private static final Logger _logger = Logger.getLogger(ClientConfiguration.class);
+ private static ClientConfiguration _instance = new ClientConfiguration();
+
+ ClientConfiguration()
+ {
+ super();
+ addConfiguration(new SystemConfiguration());
+ try
+ {
+ XMLConfiguration config = new XMLConfiguration();
+ config.load(getInputStream());
+ addConfiguration(config);
+ }
+ catch (ConfigurationException e)
+ {
+ _logger.warn("Client Properties missing, using defaults",e);
+ }
+ }
+
+ public static ClientConfiguration get()
+ {
+ return _instance;
+ }
+
+ private InputStream getInputStream()
+ {
+ if (System.getProperty(QpidConstants.CONFIG_FILE_PATH) != null)
+ {
+ try
+ {
+ return new FileInputStream((String)System.getProperty(QpidConstants.CONFIG_FILE_PATH));
+ }
+ catch(Exception e)
+ {
+ return this.getClass().getResourceAsStream("client.xml");
+ }
+ }
+ else
+ {
+ return this.getClass().getResourceAsStream("client.xml");
+ }
+
+ }
+
+ public static void main(String[] args)
+ {
+ System.out.println(ClientConfiguration.get().getString(QpidConstants.USE_SHARED_READ_WRITE_POOL));
+
+ //System.out.println(ClientConfiguration.get().getString("methodListeners.methodListener(1).[@class]"));
+ int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER);
+ System.out.println(count);
+
+ for(int i=0 ;i<count;i++)
+ {
+ String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")";
+ System.out.println("\n\n"+ClientConfiguration.get().getString(methodListener + QpidConstants.CLASS));
+ List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS);
+ for(String s:list)
+ {
+ System.out.println(s);
+ }
+ }
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/config/client.xml Tue Mar 27 09:31:05 2007
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="ISO-8859-1" ?>
+<qpidClientConfig>
+
+<security>
+ <saslClientFactoryTypes>
+ <saslClientFactory type="AMQPLAIN">org.apache.qpid.client.security.amqplain.AmqPlainSaslClientFactory</saslClientFactory>
+ </saslClientFactoryTypes>
+ <securityMechanisms>
+ <securityMechanismHandler type="PLAIN">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+ <securityMechanismHandler type="CRAM_MD5">org.apache.qpid.client.security.UsernamePasswordCallbackHandler</securityMechanismHandler>
+ </securityMechanisms>
+</security>
+
+<!-- Transport Layer properties -->
+<useSharedReadWritePool>true</useSharedReadWritePool>
+<enableDirectBuffers>true</enableDirectBuffers>
+<enablePooledAllocator>false</enablePooledAllocator>
+<tcpNoDelay>true</tcpNoDelay>
+<sendBufferSizeInKb>32</sendBufferSizeInKb>
+<reciveBufferSizeInKb>32</reciveBufferSizeInKb>
+<qpidVMBrokerClass>org.apache.qpid.server.protocol.AMQPFastProtocolHandler</qpidVMBrokerClass>
+
+<!-- Execution Layer properties -->
+<maxAccumilatedResponses>20</maxAccumilatedResponses>
+
+<!-- Model Phase properties -->
+<serverTimeoutInMilliSeconds>1000</serverTimeoutInMilliSeconds>
+<maxAccumilatedResponses>20</maxAccumilatedResponses>
+<stateManager></stateManager>
+<stateListerners>
+ <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CONNECTION_STATE">
+ <stateListerner></stateListerner>
+ </stateType>
+ <stateType class="org.apache.qpid.nclient.model.state.AMQPStateType.CHANNEL_STATE">
+ <stateListerner></stateListerner>
+ </stateType>
+</stateListerners>
+
+<methodListeners>
+ <methodListener class="org.apache.qpid.nclient.amqp.AMQPConnection">
+ <methodClass>org.apache.qpid.framing.ConnectionStartBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ConnectionSecureBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ConnectionTuneBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ConnectionOpenOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ConnectionCloseBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ConnectionCloseOkBody</methodClass>
+
+ <methodClass>org.apache.qpid.framing.ChannelOpenOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ChannelCloseBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ChannelCloseOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ChannelFlowBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ChannelFlowOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ChannelOkBody</methodClass>
+
+ <methodClass>org.apache.qpid.framing.ExchangeDeclareOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.ExchangeDeleteOkBody</methodClass>
+
+ <methodClass>org.apache.qpid.framing.QueueDeclareOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.QueueBindOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.QueueUnbindOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.QueuePurgeOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.QueueDeleteOkBody</methodClass>
+
+ <methodClass>org.apache.qpid.framing.MessageAppendBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageCancelBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageCheckpointBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageCloseBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageGetBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageOffsetBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageOkBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageOpenBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageQosBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageRecoverBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageRejectBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageResumeBody</methodClass>
+ <methodClass>org.apache.qpid.framing.MessageTransferBody</methodClass>
+ </methodListener>
+</methodListeners>
+
+<phasePipe>
+ <phase index="0">org.apache.qpid.nclient.transport.TransportPhase<phase>
+ <phase index="1">org.apache.qpid.nclient.execution.ExecutionPhase<phase>
+ <phase index="2">org.apache.qpid.nclient.model.ModelPhase<phase>
+</phasePipe>
+
+</qpidClientConfig>
\ No newline at end of file
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/AMQPException.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,55 @@
+package org.apache.qpid.nclient.core;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+
+
+public class AMQPException extends Exception
+{
+ private int _errorCode;
+
+ public AMQPException(String message)
+ {
+ super(message);
+ }
+
+ public AMQPException(String msg, Throwable t)
+ {
+ super(msg, t);
+ }
+
+ public AMQPException(int errorCode, String msg, Throwable t)
+ {
+ super(msg + " [error code " + errorCode + ']', t);
+ _errorCode = errorCode;
+ }
+
+ public AMQPException(int errorCode, String msg)
+ {
+ super(msg + " [error code " + errorCode + ']');
+ _errorCode = errorCode;
+ }
+
+ public AMQPException(Logger logger, String msg, Throwable t)
+ {
+ this(msg, t);
+ logger.error(getMessage(), this);
+ }
+
+ public AMQPException(Logger logger, String msg)
+ {
+ this(msg);
+ logger.error(getMessage(), this);
+ }
+
+ public AMQPException(Logger logger, int errorCode, String msg)
+ {
+ this(errorCode, msg);
+ logger.error(getMessage(), this);
+ }
+
+ public int getErrorCode()
+ {
+ return _errorCode;
+ }
+ }
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/AbstractPhase.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,97 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.core;
+
+public abstract class AbstractPhase implements Phase {
+
+ protected PhaseContext _ctx;
+ protected Phase _nextInFlowPhase;
+ protected Phase _nextOutFlowPhase;
+
+
+ /**
+ * ------------------------------------------------
+ * Phase - method introduced by Phase
+ * ------------------------------------------------
+ */
+ public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase) {
+ _nextInFlowPhase = nextInFlowPhase;
+ _nextOutFlowPhase = nextOutFlowPhase;
+ _ctx = ctx;
+ }
+
+ /**
+ * The start is called from the top
+ * of the pipe and is propogated the
+ * bottom.
+ *
+ * Each phase can override this to do
+ * any phase specific logic related
+ * pipe.start()
+ */
+ public void start()throws AMQPException
+ {
+ if(_nextOutFlowPhase != null)
+ {
+ _nextOutFlowPhase.start();
+ }
+ }
+
+ /**
+ * Each phase can override this to do
+ * any phase specific cleanup
+ */
+ public void close()throws AMQPException
+ {
+
+ }
+
+ public void messageReceived(Object frame) throws AMQPException
+ {
+ if(_nextInFlowPhase != null)
+ {
+ _nextInFlowPhase.messageReceived(frame);
+ }
+ }
+
+ public void messageSent(Object frame) throws AMQPException
+ {
+ if (_nextOutFlowPhase != null)
+ {
+ _nextOutFlowPhase.messageSent(frame);
+ }
+ }
+
+ public PhaseContext getPhaseContext()
+ {
+ return _ctx;
+ }
+
+ public Phase getNextInFlowPhase() {
+ return _nextInFlowPhase;
+ }
+
+ public Phase getNextOutFlowPhase() {
+ return _nextOutFlowPhase;
+ }
+
+
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/DefaultPhaseContext.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,20 @@
+package org.apache.qpid.nclient.core;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class DefaultPhaseContext implements PhaseContext
+{
+ public Map<String,Object> _props = new ConcurrentHashMap<String,Object>();
+
+ public Object getProperty(String name)
+ {
+ return _props.get(name);
+ }
+
+ public void setProperty(String name, Object value)
+ {
+ _props.put(name, value);
+ }
+
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/Phase.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,38 @@
+package org.apache.qpid.nclient.core;
+
+
+public interface Phase
+{
+
+ /**
+ * This method is used to initialize a phase
+ *
+ * @param ctx
+ * @param nextInFlowPhase
+ * @param nextOutFlowPhase
+ */
+ public void init(PhaseContext ctx,Phase nextInFlowPhase, Phase nextOutFlowPhase);
+
+ /**
+ *
+ * Implement logic related to physical opening
+ * of the pipe
+ */
+ public void start()throws AMQPException;
+
+ /**
+ * Implement cleanup in this method.
+ * This indicates the pipe is closing
+ */
+ public void close()throws AMQPException;
+
+ public void messageReceived(Object msg) throws AMQPException;
+
+ public void messageSent(Object msg) throws AMQPException;
+
+ public PhaseContext getPhaseContext();
+
+ public Phase getNextOutFlowPhase();
+
+ public Phase getNextInFlowPhase();
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseContext.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.core;
+
+/**
+ * This can be thought of as a session context associated
+ * with the pipe. This is transient and is scoped by the
+ * duration of the physical connection.
+ *
+ */
+public interface PhaseContext {
+
+ public Object getProperty(String name);
+
+ public void setProperty(String name, Object value);
+
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/PhaseFactory.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,60 @@
+package org.apache.qpid.nclient.core;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.nclient.config.ClientConfiguration;
+
+public class PhaseFactory
+{
+ /**
+ * This method will create the pipe and return a reference
+ * to the top of the pipeline.
+ *
+ * The application can then use this (top most) phase and all
+ * calls will propogated down the pipe.
+ *
+ * Simillar calls orginating at the bottom of the pipeline
+ * will be propogated to the top.
+ *
+ * @param ctx
+ * @return
+ * @throws AMQPException
+ */
+ public static Phase createPhasePipe(PhaseContext ctx) throws AMQPException
+ {
+ Map<Integer,Phase> phaseMap = new HashMap<Integer,Phase>();
+ List<String> list = ClientConfiguration.get().getList(QpidConstants.PHASE_PIPE + "." + QpidConstants.PHASE);
+ for(String s:list)
+ {
+ try
+ {
+ Phase temp = (Phase)Class.forName(ClientConfiguration.get().getString(s)).newInstance();
+ phaseMap.put(ClientConfiguration.get().getInt(s + "." + QpidConstants.INDEX),temp) ;
+ }
+ catch(Exception e)
+ {
+ throw new AMQPException("Error loading phase " + ClientConfiguration.get().getString(s),e);
+ }
+ }
+
+ Phase current = null;
+ Phase prev = null;
+ Phase next = null;
+ //Lets build the phase pipe.
+ for (int i=0; i<phaseMap.size();i++)
+ {
+ current = phaseMap.get(i);
+ if (1+1 < phaseMap.size())
+ {
+ next = phaseMap.get(i+1);
+ }
+ current.init(ctx, next, prev);
+ prev = current;
+ next = null;
+ }
+
+ return current;
+ }
+}
\ No newline at end of file
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/QpidConstants.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,66 @@
+package org.apache.qpid.nclient.core;
+
+public interface QpidConstants {
+
+ // Common properties
+ public static long EMPTY_CORRELATION_ID = -1;
+ public static int CHANNEL_ZERO = 0;
+ public static String CONFIG_FILE_PATH = "ConfigFilePath";
+
+ // Phase Context properties
+ public final static String AMQP_BROKER_DETAILS = "AMQP_BROKER_DETAILS";
+ public final static String MINA_IO_CONNECTOR = "MINA_IO_CONNECTOR";
+ //public final static String AMQP_MAJOR_VERSION = "AMQP_MAJOR_VERSION";
+ //public final static String AMQP_MINOR_VERSION = "AMQP_MINOR_VERSION";
+ //public final static String AMQP_SASL_CLIENT = "AMQP_SASL_CLIENT";
+ //public final static String AMQP_CLIENT_ID = "AMQP_CLIENT_ID";
+ //public final static String AMQP_CONNECTION_TUNE_PARAMETERS = "AMQP_CONNECTION_TUNE_PARAMETERS";
+ //public final static String AMQP_VIRTUAL_HOST = "AMQP_VIRTUAL_HOST";
+ //public final static String AMQP_MESSAGE_STORE = "AMQP_MESSAGE_STORE";
+
+ /**---------------------------------------------------------------
+ * Configuration file properties
+ * ------------------------------------------------------------
+ */
+
+ // Model Layer properties
+ public final static String STATE_MANAGER = "stateManager";
+ public final static String METHOD_LISTENERS = "methodListeners";
+ public final static String METHOD_LISTENER = "methodListener";
+ public final static String CLASS = "[@class]";
+ public final static String METHOD_CLASS = "methodClass";
+
+ public final static String STATE_LISTENERS = "stateListeners";
+ public final static String STATE_LISTENER = "stateListener";
+ public final static String STATE_TYPE = "stateType";
+
+ public final static String AMQP_MESSAGE_STORE_CLASS = "AMQP_MESSAGE_STORE_CLASS";
+ public final static String SERVER_TIMEOUT_IN_MILLISECONDS = "serverTimeoutInMilliSeconds";
+
+ // MINA properties
+ public final static String USE_SHARED_READ_WRITE_POOL = "useSharedReadWritePool";
+ public final static String ENABLE_DIRECT_BUFFERS = "enableDirectBuffers";
+ public final static String ENABLE_POOLED_ALLOCATOR = "enablePooledAllocator";
+ public final static String TCP_NO_DELAY = "tcpNoDelay";
+ public final static String SEND_BUFFER_SIZE_IN_KB = "sendBufferSizeInKb";
+ public final static String RECEIVE_BUFFER_SIZE_IN_KB = "reciveBufferSizeInKb";
+
+ // Security properties
+ public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES = "saslClientFactoryTypes";
+ public final static String AMQP_SECURITY_SASL_CLIENT_FACTORY = "saslClientFactory";
+ public final static String TYPE = "[@type]";
+
+ public final static String AMQP_SECURITY_MECHANISMS = "securityMechanisms";
+ public final static String AMQP_SECURITY_MECHANISM_HANDLER = "securityMechanismHandler";
+
+ // Execution Layer properties
+ public final static String MAX_ACCUMILATED_RESPONSES = "maxAccumilatedResponses";
+
+ //Transport Layer properties
+ public final static String QPID_VM_BROKER_CLASS = "qpidVMBrokerClass";
+
+ //Phase pipe properties
+ public final static String PHASE_PIPE = "phasePipe";
+ public final static String PHASE = "phase";
+ public final static String INDEX = "[@index]";
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/core/TransientPhaseContext.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,19 @@
+package org.apache.qpid.nclient.core;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class TransientPhaseContext implements PhaseContext {
+
+ private Map map = new ConcurrentHashMap();
+
+ public Object getProperty(String name) {
+ return map.get(name);
+ }
+
+ public void setProperty(String name, Object value) {
+ map.put(name, value);
+ }
+
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ExecutionPhase.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,151 @@
+package org.apache.qpid.nclient.execution;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
+/**
+ * Corressponds to the Layer 2 in AMQP.
+ * This phase handles the correlation of amqp messages
+ * This class implements the 0.9 spec (request/response)
+ */
+public class ExecutionPhase extends AbstractPhase{
+
+ protected static final Logger _logger = Logger.getLogger(ExecutionPhase.class);
+ protected ConcurrentMap _channelId2RequestMgrMap = new ConcurrentHashMap();
+ protected ConcurrentMap _channelId2ResponseMgrMap = new ConcurrentHashMap();
+
+
+ /**
+ * --------------------------------------------------
+ * Phase related methods
+ * --------------------------------------------------
+ */
+
+ // should add these in the init method
+ //_channelId2RequestMgrMap.put(0, new RequestManager(_ConnectionId, 0, this, false));
+ //_channelId2ResponseMgrMap.put(0, new ResponseManager(_ConnectionId, 0, _stateManager, this, false));
+
+ public void messageReceived(Object msg) throws AMQPException
+ {
+ AMQFrame frame = (AMQFrame) msg;
+ final AMQBody bodyFrame = frame.getBodyFrame();
+
+ if (bodyFrame instanceof AMQRequestBody)
+ {
+ AMQPMethodEvent event;
+ try
+ {
+ event = messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody)bodyFrame);
+ super.messageReceived(event);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling request",e);
+ }
+
+ }
+ else if (bodyFrame instanceof AMQResponseBody)
+ {
+ List<AMQPMethodEvent> events;
+ try
+ {
+ events = messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody)bodyFrame);
+ for (AMQPMethodEvent event: events)
+ {
+ super.messageReceived(event);
+ }
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling response",e);
+ }
+ }
+ }
+
+ /**
+ * Need to figure out if the message is a request or a response
+ * that needs to be sent and then delegate it to the Request or response manager
+ * to prepare it.
+ */
+ public void messageSent(Object msg) throws AMQPException
+ {
+ AMQPMethodEvent evt = (AMQPMethodEvent)msg;
+ if(evt.getCorrelationId() == QpidConstants.EMPTY_CORRELATION_ID)
+ {
+ // This is a request
+ AMQFrame frame = handleRequest(evt);
+ super.messageSent(frame);
+ }
+ else
+ {
+// This is a response
+ List<AMQFrame> frames = handleResponse(evt);
+ for(AMQFrame frame: frames)
+ {
+ super.messageSent(frame);
+ }
+ }
+ }
+
+ /**
+ * ------------------------------------------------
+ * Methods to handle request response
+ * -----------------------------------------------
+ */
+ private AMQPMethodEvent messageRequestBodyReceived(int channelId, AMQRequestBody requestBody) throws Exception
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Request frame received: " + requestBody);
+ }
+ ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(channelId);
+ if (responseManager == null)
+ throw new AMQException("Unable to find ResponseManager for channel " + channelId);
+ return responseManager.requestReceived(requestBody);
+ }
+
+ private List<AMQPMethodEvent> messageResponseBodyReceived(int channelId, AMQResponseBody responseBody) throws Exception
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Response frame received: " + responseBody);
+ }
+ RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(channelId);
+ if (requestManager == null)
+ throw new AMQException("Unable to find RequestManager for channel " + channelId);
+ return requestManager.responseReceived(responseBody);
+ }
+
+ private AMQFrame handleRequest(AMQPMethodEvent evt)
+ {
+ RequestManager requestManager = (RequestManager)_channelId2RequestMgrMap.get(evt.getChannelId());
+ return requestManager.sendRequest(evt);
+ }
+
+ private List<AMQFrame> handleResponse(AMQPMethodEvent evt) throws AMQPException
+ {
+ ResponseManager responseManager = (ResponseManager)_channelId2ResponseMgrMap.get(evt.getChannelId());
+ try
+ {
+ return responseManager.sendResponse(evt);
+ }
+ catch(Exception e)
+ {
+ throw new AMQPException("Error handling response",e);
+ }
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/RequestManager.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.execution;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+
+public class RequestManager
+{
+ private static final Logger logger = Logger.getLogger(RequestManager.class);
+
+ private int channel;
+
+ /**
+ * Used for logging and debugging only - allows the context of this instance
+ * to be known.
+ */
+ private boolean serverFlag;
+ private long connectionId;
+
+ /**
+ * Request and response frames must have a requestID and responseID which
+ * indepenedently increment from 0 on a per-channel basis. These are the
+ * counters, and contain the value of the next (not yet used) frame.
+ */
+ private long requestIdCount;
+
+ /**
+ * These keep track of the last requestId and responseId to be received.
+ */
+ private long lastProcessedResponseId;
+
+ private ConcurrentHashMap<Long, Long> requestSentMap;
+
+ public RequestManager(long connectionId, int channel, boolean serverFlag)
+ {
+ this.channel = channel;
+ this.serverFlag = serverFlag;
+ this.connectionId = connectionId;
+ requestIdCount = 1L;
+ lastProcessedResponseId = 0L;
+ requestSentMap = new ConcurrentHashMap<Long, Long>();
+ }
+
+ // *** Functions to originate a request ***
+
+ public AMQFrame sendRequest(AMQPMethodEvent evt)
+ {
+ long requestId = getNextRequestId(); // Get new request ID
+ AMQFrame requestFrame = AMQRequestBody.createAMQFrame(channel, requestId,
+ lastProcessedResponseId, evt.getMethod());
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ "] TX REQ: Req[" + requestId + " " + lastProcessedResponseId + "]; " + evt.getMethod());
+ }
+ requestSentMap.put(requestId, evt.getCorrelationId());
+ return requestFrame;
+ }
+
+ public List<AMQPMethodEvent> responseReceived(AMQResponseBody responseBody)
+ throws Exception
+ {
+ long requestIdStart = responseBody.getRequestId();
+ long requestIdStop = requestIdStart + responseBody.getBatchOffset();
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX RES: " +
+ responseBody + "; " + responseBody.getMethodPayload());
+ }
+
+ List<AMQPMethodEvent> events = new ArrayList<AMQPMethodEvent>();
+ for (long requestId = requestIdStart; requestId <= requestIdStop; requestId++)
+ {
+ if (requestSentMap.get(requestId) == null)
+ {
+ throw new RequestResponseMappingException(requestId,
+ "Failed to locate requestId " + requestId + " in requestSentMap.");
+ }
+ long localCorrelationId = requestSentMap.get(requestId);
+ AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel, responseBody.getMethodPayload(),
+ requestId,localCorrelationId);
+ events.add(methodEvent);
+ requestSentMap.remove(requestId);
+ }
+ lastProcessedResponseId = responseBody.getResponseId();
+ return events;
+ }
+
+ // *** Management functions ***
+
+ public int requestsMapSize()
+ {
+ return requestSentMap.size();
+ }
+
+ // *** Private helper functions ***
+
+ private long getNextRequestId()
+ {
+ return requestIdCount++;
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/execution/ResponseManager.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,240 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.execution;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQRequestBody;
+import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.RequestResponseMappingException;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.QpidConstants;
+import org.apache.qpid.nclient.model.AMQPMethodEvent;
+
+public class ResponseManager
+{
+ private static final Logger logger = Logger.getLogger(ResponseManager.class);
+
+ private int channel;
+
+ /**
+ * Used for logging and debugging only - allows the context of this instance
+ * to be known.
+ */
+ private boolean serverFlag;
+ private long connectionId;
+
+ private int maxAccumulatedResponses = 20; // Default
+
+ /**
+ * Request and response frames must have a requestID and responseID which
+ * indepenedently increment from 0 on a per-channel basis. These are the
+ * counters, and contain the value of the next (not yet used) frame.
+ */
+ private long responseIdCount;
+
+ /**
+ * These keep track of the last requestId and responseId to be received.
+ */
+ private long lastReceivedRequestId;
+
+ /**
+ * Last requestID sent in a response (for batching)
+ */
+ private long lastSentRequestId;
+
+ private class ResponseStatus implements Comparable<ResponseStatus>
+ {
+ private long requestId;
+ private AMQMethodBody responseMethodBody;
+
+ public ResponseStatus(long requestId)
+ {
+ this.requestId = requestId;
+ responseMethodBody = null;
+ }
+
+ public int compareTo(ResponseStatus o)
+ {
+ return (int)(requestId - o.requestId);
+ }
+
+ public String toString()
+ {
+ // Need to define this
+ return "";
+ }
+ }
+
+ private ConcurrentHashMap<Long, ResponseStatus> responseMap;
+
+ public ResponseManager(long connectionId, int channel, boolean serverFlag)
+ {
+ this.channel = channel;
+ this.serverFlag = serverFlag;
+ this.connectionId = connectionId;
+ responseIdCount = 1L;
+ lastReceivedRequestId = 0L;
+ maxAccumulatedResponses = ClientConfiguration.get().getInt(QpidConstants.MAX_ACCUMILATED_RESPONSES);
+ responseMap = new ConcurrentHashMap<Long, ResponseStatus>();
+ }
+
+ // *** Functions to handle an incoming request ***
+
+ public AMQPMethodEvent requestReceived(AMQRequestBody requestBody) throws Exception
+ {
+ long requestId = requestBody.getRequestId();
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel + "] RX REQ: " +
+ requestBody + "; " + requestBody.getMethodPayload());
+ }
+ long responseMark = requestBody.getResponseMark();
+ lastReceivedRequestId = requestId;
+ responseMap.put(requestId, new ResponseStatus(requestId));
+
+ AMQPMethodEvent methodEvent = new AMQPMethodEvent(channel,
+ requestBody.getMethodPayload(), requestId);
+
+ return methodEvent;
+ }
+
+ public List<AMQFrame> sendResponse(AMQPMethodEvent evt)
+ throws RequestResponseMappingException
+ {
+ long requestId = evt.getCorrelationId();
+ AMQMethodBody responseMethodBody = evt.getMethod();
+
+ if (logger.isDebugEnabled())
+ {
+ logger.debug((serverFlag ? "SRV[" : "CLI[") + connectionId + "," + channel +
+ "] TX RES: Res[# " + requestId + "]; " + responseMethodBody);
+ }
+
+ ResponseStatus responseStatus = responseMap.get(requestId);
+ if (responseStatus == null)
+ {
+ throw new RequestResponseMappingException(requestId,
+ "Failed to locate requestId " + requestId + " in responseMap." + responseMap);
+ }
+ if (responseStatus.responseMethodBody != null)
+ {
+ throw new RequestResponseMappingException(requestId, "RequestId " +
+ requestId + " already has a response in responseMap.");
+ }
+ responseStatus.responseMethodBody = responseMethodBody;
+ return doBatches();
+ }
+
+ // *** Management functions ***
+
+ /**
+ * Sends batched responses - i.e. all those members of responseMap that have
+ * received a response.
+ */
+ public synchronized List<AMQFrame> doBatches()
+ {
+ long startRequestId = 0;
+ int numAdditionalRequestIds = 0;
+ Class responseMethodBodyClass = null;
+ List<AMQFrame> frames = new ArrayList<AMQFrame>();
+ Iterator<Long> lItr = responseMap.keySet().iterator();
+ while (lItr.hasNext())
+ {
+ long requestId = lItr.next();
+ ResponseStatus responseStatus = responseMap.get(requestId);
+ if (responseStatus.responseMethodBody != null)
+ {
+ frames.add(sendResponseBatchFrame(requestId, 0, responseStatus.responseMethodBody));
+ lItr.remove();
+ }
+ }
+
+ return frames;
+ }
+
+ /**
+ * Total number of entries in the responseMap - including both those that
+ * are outstanding (i.e. no response has been received) and those that are
+ * batched (those for which responses have been received but have not yet
+ * been collected together and sent).
+ */
+ public int responsesMapSize()
+ {
+ return responseMap.size();
+ }
+
+ /**
+ * As the responseMap may contain both outstanding responses (those with
+ * ResponseStatus.responseMethodBody still null) and responses waiting to
+ * be batched (those with ResponseStatus.responseMethodBody not null), we
+ * need to count only those in the map with responseMethodBody null.
+ */
+ public int outstandingResponses()
+ {
+ int cnt = 0;
+ for (Long requestId : responseMap.keySet())
+ {
+ if (responseMap.get(requestId).responseMethodBody == null)
+ cnt++;
+ }
+ return cnt;
+ }
+
+ /**
+ * As the responseMap may contain both outstanding responses (those with
+ * ResponseStatus.responseMethodBody still null) and responses waiting to
+ * be batched (those with ResponseStatus.responseMethodBody not null), we
+ * need to count only those in the map with responseMethodBody not null.
+ */
+ public int batchedResponses()
+ {
+ int cnt = 0;
+ for (Long requestId : responseMap.keySet())
+ {
+ if (responseMap.get(requestId).responseMethodBody != null)
+ cnt++;
+ }
+ return cnt;
+ }
+
+ // *** Private helper functions ***
+
+ private long getNextResponseId()
+ {
+ return responseIdCount++;
+ }
+
+ private AMQFrame sendResponseBatchFrame(long firstRequestId, int numAdditionalRequests,
+ AMQMethodBody responseMethodBody)
+ {
+ long responseId = getNextResponseId(); // Get new response ID
+ AMQFrame responseFrame = AMQResponseBody.createAMQFrame(channel, responseId,
+ firstRequestId, numAdditionalRequests, responseMethodBody);
+ return responseFrame;
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/AMQPApplicationMessage.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.nclient.message;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.qpid.client.message.MessageHeaders;
+
+public class AMQPApplicationMessage {
+
+ private int bytesReceived = 0;
+ private int channelId;
+ private byte[] referenceId;
+ private List<byte[]> contents = new LinkedList<byte[]>();
+ private long deliveryTag;
+ private boolean redeliveredFlag;
+ private MessageHeaders messageHeaders;
+
+ public AMQPApplicationMessage(int channelId, byte[] referenceId)
+ {
+ this.channelId = channelId;
+ this.referenceId = referenceId;
+ }
+
+ public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, boolean redeliveredFlag)
+ {
+ this.channelId = channelId;
+ this.deliveryTag = deliveryTag;
+ this.messageHeaders = messageHeaders;
+ this.redeliveredFlag = redeliveredFlag;
+ }
+
+ public AMQPApplicationMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content, boolean redeliveredFlag)
+ {
+ this.channelId = channelId;
+ this.deliveryTag = deliveryTag;
+ this.messageHeaders = messageHeaders;
+ this.redeliveredFlag = redeliveredFlag;
+ addContent(content);
+ }
+
+ public void addContent(byte[] content)
+ {
+ contents.add(content);
+ bytesReceived += content.length;
+ }
+
+ public int getBytesReceived()
+ {
+ return bytesReceived;
+ }
+
+ public int getChannelId()
+ {
+ return channelId;
+ }
+
+ public byte[] getReferenceId()
+ {
+ return referenceId;
+ }
+
+ public List<byte[]> getContents()
+ {
+ return contents;
+ }
+
+ public long getDeliveryTag()
+ {
+ return deliveryTag;
+ }
+
+ public boolean getRedeliveredFlag()
+ {
+ return redeliveredFlag;
+ }
+
+ public MessageHeaders getMessageHeaders()
+ {
+ return messageHeaders;
+ }
+
+ public String toString()
+ {
+ return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" +
+ deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" +
+ new String(contents.get(0));
+ }
+
+ public void setDeliveryTag(long deliveryTag)
+ {
+ this.deliveryTag = deliveryTag;
+ }
+
+ public void setMessageHeaders(MessageHeaders messageHeaders)
+ {
+ this.messageHeaders = messageHeaders;
+ }
+
+ public void setRedeliveredFlag(boolean redeliveredFlag)
+ {
+ this.redeliveredFlag = redeliveredFlag;
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessagePhase.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,64 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.message;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.model.ModelPhase;
+
+public class MessagePhase extends AbstractPhase {
+
+ private final BlockingQueue<AMQPApplicationMessage> _queue = new LinkedBlockingQueue<AMQPApplicationMessage>();
+ private static final Logger _logger = Logger.getLogger(ModelPhase.class);
+
+ public void messageReceived(Object msg) throws AMQPException
+ {
+ try
+ {
+ _queue.put((AMQPApplicationMessage)msg);
+ }
+ catch (InterruptedException e)
+ {
+ _logger.error("Error adding message to queue", e);
+ }
+ super.messageReceived(msg);
+ }
+
+ public void messageSent(Object msg) throws AMQPException
+ {
+ super.messageSent(msg);
+ }
+
+ public AMQPApplicationMessage getNextMessage()
+ {
+ return _queue.poll();
+ }
+
+ public AMQPApplicationMessage getNextMessage(long timeout, TimeUnit tu) throws InterruptedException
+ {
+ return _queue.poll(timeout, tu);
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/MessageStore.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,17 @@
+package org.apache.qpid.nclient.message;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.MessageHeaders;
+
+public interface MessageStore {
+
+ public void removeMessage(String identifier);
+
+ public void storeContentBodyChunk(String identifier,byte[] contentBody) throws AMQException;
+
+ public void storeMessageMetaData(String identifier, MessageHeaders messageHeaders) throws AMQException;
+
+ public AMQPApplicationMessage getMessage(String identifier) throws AMQException;
+
+ public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException;
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/message/TransientMessageStore.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,40 @@
+package org.apache.qpid.nclient.message;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.MessageHeaders;
+
+public class TransientMessageStore implements MessageStore {
+
+ private Map<String,AMQPApplicationMessage> messageMap = new ConcurrentHashMap<String,AMQPApplicationMessage>();
+
+ public AMQPApplicationMessage getMessage(String identifier)
+ throws AMQException
+ {
+ return messageMap.get(identifier);
+ }
+
+ public void removeMessage(String identifier)
+ {
+ messageMap.remove(identifier);
+ }
+
+ public void storeContentBodyChunk(String identifier, byte[] contentBody)
+ throws AMQException
+ {
+
+ }
+
+ public void storeMessageMetaData(String identifier,
+ MessageHeaders messageHeaders) throws AMQException
+ {
+
+ }
+
+ public void storeMessage(String identifier,AMQPApplicationMessage message)throws AMQException
+ {
+
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodEvent.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,64 @@
+package org.apache.qpid.nclient.model;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+/**
+ * This class is exactly the same as the AMQMethod event.
+ * Except I renamed requestId to corelationId, so I could use it both ways.
+ *
+ * I didn't want to modify anything in common so that there is no
+ * impact on the existing code.
+ *
+ */
+public class AMQPMethodEvent <M extends AMQMethodBody> {
+
+ private final M _method;
+ private final int _channelId;
+ private final long _correlationId;
+ private long _localCorrletionId = 0;
+
+ public AMQPMethodEvent(int channelId, M method, long correlationId,long localCorrletionId)
+ {
+ _channelId = channelId;
+ _method = method;
+ _correlationId = correlationId;
+ _localCorrletionId = localCorrletionId;
+ }
+
+ public AMQPMethodEvent(int channelId, M method, long correlationId)
+ {
+ _channelId = channelId;
+ _method = method;
+ _correlationId = correlationId;
+ }
+
+ public M getMethod()
+ {
+ return _method;
+ }
+
+ public int getChannelId()
+ {
+ return _channelId;
+ }
+
+ public long getCorrelationId()
+ {
+ return _correlationId;
+ }
+
+ public long getLocalCorrelationId()
+ {
+ return _localCorrletionId;
+ }
+
+ public String toString()
+ {
+ StringBuilder buf = new StringBuilder("Method event: \n");
+ buf.append("Channel id: \n").append(_channelId);
+ buf.append("Method: \n").append(_method);
+ buf.append("Request Id: ").append(_correlationId);
+ buf.append("Local Correlation Id: ").append(_localCorrletionId);
+ return buf.toString();
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/AMQPMethodListener.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,11 @@
+package org.apache.qpid.nclient.model;
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.nclient.core.AMQPException;
+
+public interface AMQPMethodListener
+{
+
+ public <B extends AMQMethodBody> boolean methodReceived(AMQPMethodEvent<B> evt) throws AMQPException;
+
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/model/ModelPhase.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,133 @@
+package org.apache.qpid.nclient.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.amqp.state.AMQPStateManager;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.AMQPException;
+import org.apache.qpid.nclient.core.AbstractPhase;
+import org.apache.qpid.nclient.core.Phase;
+import org.apache.qpid.nclient.core.PhaseContext;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+/**
+ * This Phase handles Layer 3 functionality of the AMQP spec.
+ * This class acts as the interface between the API and the pipeline
+ */
+public class ModelPhase extends AbstractPhase {
+
+ private static final Logger _logger = Logger.getLogger(ModelPhase.class);
+
+ private Map <Class,List> _methodListners = new HashMap<Class,List>();
+
+ /**
+ * ------------------------------------------------
+ * Phase - methods introduced by Phase
+ * ------------------------------------------------
+ */
+ public void init(PhaseContext ctx, Phase nextInFlowPhase, Phase nextOutFlowPhase)
+ {
+ super.init(ctx, nextInFlowPhase, nextOutFlowPhase);
+ try
+ {
+ loadMethodListeners();
+ }
+ catch(Exception e)
+ {
+ _logger.fatal("Error loading method listeners", e);
+ }
+ }
+
+ public void messageReceived(Object msg) throws AMQPException
+ {
+ notifyMethodListerners((AMQPMethodEvent)msg);
+
+ // not doing super.methodReceived here, as this is the end of
+ // the pipeline
+ //super.messageReceived(msg);
+ }
+
+ /**
+ * This method should only except and pass messages
+ * of Type @see AMQPMethodEvent
+ */
+ public void messageSent(Object msg) throws AMQPException
+ {
+ super.messageSent(msg);
+ }
+
+ /**
+ * ------------------------------------------------
+ * Event Handling
+ * ------------------------------------------------
+ */
+
+ public void notifyMethodListerners(AMQPMethodEvent event) throws AMQPException
+ {
+ if (_methodListners.containsKey(event.getMethod().getClass()))
+ {
+ List<AMQPMethodListener> listeners = _methodListners.get(event.getMethod().getClass());
+
+ if(listeners.size()>0)
+ {
+ throw new AMQPException("There are no registered listeners for this method");
+ }
+
+ for(AMQPMethodListener l : listeners)
+ {
+ try
+ {
+ l.methodReceived(event);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error handling method event " + event, e);
+ }
+ }
+ }
+ }
+
+ /**
+ * ------------------------------------------------
+ * Configuration
+ * ------------------------------------------------
+ */
+
+ /**
+ * This method loads method listeners from the client.xml file
+ * For each method class there is a list of listeners
+ */
+ private void loadMethodListeners() throws Exception
+ {
+ int count = ClientConfiguration.get().getMaxIndex(QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER);
+ System.out.println(count);
+
+ for(int i=0 ;i<count;i++)
+ {
+ String methodListener = QpidConstants.METHOD_LISTENERS + "." + QpidConstants.METHOD_LISTENER + "(" + i + ")";
+ String className = ClientConfiguration.get().getString(methodListener + "." + QpidConstants.CLASS);
+ Class listenerClass = Class.forName(className);
+ List<String> list = ClientConfiguration.get().getList(methodListener + "." + QpidConstants.METHOD_CLASS);
+ for(String s:list)
+ {
+ List listeners;
+ Class methodClass = Class.forName(s);
+ if (_methodListners.containsKey(methodClass))
+ {
+ listeners = _methodListners.get(methodClass);
+ }
+ else
+ {
+ listeners = new ArrayList();
+ _methodListners.put(methodClass,listeners);
+ }
+ listeners.add(listenerClass);
+ }
+ }
+ }
+
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/AMQPCallbackHandler.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import javax.security.auth.callback.CallbackHandler;
+
+import org.apache.qpid.nclient.transport.ConnectionURL;
+
+public interface AMQPCallbackHandler extends CallbackHandler
+{
+ void initialise(ConnectionURL url);
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/CallbackHandlerRegistry.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+public class CallbackHandlerRegistry
+{
+ private static final Logger _logger = Logger.getLogger(CallbackHandlerRegistry.class);
+
+ private static CallbackHandlerRegistry _instance = new CallbackHandlerRegistry();
+
+ private Map<String,Class> _mechanismToHandlerClassMap = new HashMap<String,Class>();
+
+ private String _mechanisms;
+
+ public static CallbackHandlerRegistry getInstance()
+ {
+ return _instance;
+ }
+
+ public Class getCallbackHandlerClass(String mechanism)
+ {
+ return _mechanismToHandlerClassMap.get(mechanism);
+ }
+
+ public String getMechanisms()
+ {
+ return _mechanisms;
+ }
+
+ private CallbackHandlerRegistry()
+ {
+ // first we register any Sasl client factories
+ DynamicSaslRegistrar.registerSaslProviders();
+ parseProperties();
+ }
+
+ private void parseProperties()
+ {
+ List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_MECHANISMS);
+
+ for (String mechanism : mechanisms)
+ {
+ String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_MECHANISM_HANDLER + "_" + mechanism);
+ Class clazz = null;
+ try
+ {
+ clazz = Class.forName(className);
+ if (!AMQPCallbackHandler.class.isAssignableFrom(clazz))
+ {
+ _logger.warn("SASL provider " + clazz + " does not implement " + AMQPCallbackHandler.class +
+ ". Skipping");
+ continue;
+ }
+ _mechanismToHandlerClassMap.put(mechanism, clazz);
+ if (_mechanisms == null)
+ {
+ _mechanisms = mechanism;
+ }
+ else
+ {
+ // one time cost
+ _mechanisms = _mechanisms + " " + mechanism;
+ }
+ }
+ catch (ClassNotFoundException ex)
+ {
+ _logger.warn("Unable to load class " + className + ". Skipping that SASL provider");
+ continue;
+ }
+ }
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/DynamicSaslRegistrar.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import java.security.Security;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.security.sasl.SaslClientFactory;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.nclient.config.ClientConfiguration;
+import org.apache.qpid.nclient.core.QpidConstants;
+
+public class DynamicSaslRegistrar
+{
+ private static final Logger _logger = Logger.getLogger(DynamicSaslRegistrar.class);
+
+ public static void registerSaslProviders()
+ {
+ Map<String, Class<? extends SaslClientFactory>> factories = parseProperties();
+ if (factories.size() > 0)
+ {
+ Security.addProvider(new JCAProvider(factories));
+ _logger.debug("Dynamic SASL provider added as a security provider");
+ }
+ }
+
+ private static Map<String, Class<? extends SaslClientFactory>> parseProperties()
+ {
+ List<String> mechanisms = ClientConfiguration.get().getList(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY_TYPES);
+ TreeMap<String, Class<? extends SaslClientFactory>> factoriesToRegister =
+ new TreeMap<String, Class<? extends SaslClientFactory>>();
+ for (String mechanism: mechanisms)
+ {
+ String className = ClientConfiguration.get().getString(QpidConstants.AMQP_SECURITY_SASL_CLIENT_FACTORY + "_" + mechanism);
+ try
+ {
+ Class<?> clazz = Class.forName(className);
+ if (!(SaslClientFactory.class.isAssignableFrom(clazz)))
+ {
+ _logger.error("Class " + clazz + " does not implement " + SaslClientFactory.class + " - skipping");
+ continue;
+ }
+ factoriesToRegister.put(mechanism, (Class<? extends SaslClientFactory>) clazz);
+ }
+ catch (Exception ex)
+ {
+ _logger.error("Error instantiating SaslClientFactory calss " + className + " - skipping");
+ }
+ }
+ return factoriesToRegister;
+ }
+
+
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/JCAProvider.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import javax.security.sasl.SaslClientFactory;
+import java.security.Provider;
+import java.security.Security;
+import java.util.Map;
+
+public class JCAProvider extends Provider
+{
+ public JCAProvider(Map<String, Class<? extends SaslClientFactory>> providerMap)
+ {
+ super("AMQSASLProvider", 1.0, "A JCA provider that registers all " +
+ "AMQ SASL providers that want to be registered");
+ register(providerMap);
+ Security.addProvider(this);
+ }
+
+ private void register(Map<String, Class<? extends SaslClientFactory>> providerMap)
+ {
+ for (Map.Entry<String, Class<? extends SaslClientFactory>> me :
+ providerMap.entrySet())
+ {
+ put("SaslClientFactory." + me.getKey(), me.getValue().getName());
+ }
+ }
+}
\ No newline at end of file
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/security/UsernamePasswordCallbackHandler.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.security;
+
+import java.io.IOException;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+
+import org.apache.qpid.nclient.transport.ConnectionURL;
+
+public class UsernamePasswordCallbackHandler implements AMQPCallbackHandler
+{
+ private ConnectionURL _url;
+
+ public void initialise(ConnectionURL url)
+ {
+ _url = url;
+ }
+
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException
+ {
+ for (int i = 0; i < callbacks.length; i++)
+ {
+ Callback cb = callbacks[i];
+ if (cb instanceof NameCallback)
+ {
+ ((NameCallback)cb).setName(_url.getUsername());
+ }
+ else if (cb instanceof PasswordCallback)
+ {
+ ((PasswordCallback)cb).setPassword((_url.getPassword()).toCharArray());
+ }
+ else
+ {
+ throw new UnsupportedCallbackException(cb);
+ }
+ }
+ }
+}
Added: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java?view=auto&rev=522988
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java (added)
+++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/AMQPBrokerDetails.java Tue Mar 27 09:31:05 2007
@@ -0,0 +1,344 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.nclient.transport;
+
+import org.apache.qpid.url.URLHelper;
+import org.apache.qpid.url.URLSyntaxException;
+
+import java.util.HashMap;
+import java.net.URISyntaxException;
+import java.net.URI;
+
+public class AMQPBrokerDetails implements BrokerDetails
+{
+ private String _host;
+
+ private int _port;
+
+ private String _transport;
+
+ private HashMap<String, String> _options;
+
+ public AMQPBrokerDetails()
+ {
+ _options = new HashMap<String, String>();
+ }
+
+ public AMQPBrokerDetails(String url) throws URLSyntaxException
+ {
+ this();
+ // URL should be of format tcp://host:port?option='value',option='value'
+ try
+ {
+ URI connection = new URI(url);
+
+ String transport = connection.getScheme();
+
+ // Handles some defaults to minimise changes to existing broker URLS e.g. localhost
+ if (transport != null)
+ {
+ //todo this list of valid transports should be enumerated somewhere
+ if ((!(transport.equalsIgnoreCase("vm") || transport.equalsIgnoreCase("tcp"))))
+ {
+ if (transport.equalsIgnoreCase("localhost"))
+ {
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+ else
+ {
+ if (url.charAt(transport.length()) == ':' && url.charAt(transport.length() + 1) != '/')
+ {
+ //Then most likely we have a host:port value
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+ else
+ {
+ URLHelper.parseError(0, transport.length(), "Unknown transport", url);
+ }
+ }
+ }
+ }
+ else
+ {
+ //Default the transport
+ connection = new URI(DEFAULT_TRANSPORT + "://" + url);
+ transport = connection.getScheme();
+ }
+
+ if (transport == null)
+ {
+ URLHelper.parseError(-1, "Unknown transport:'" + transport + "'" + " In broker URL:'" + url
+ + "' Format: " + URL_FORMAT_EXAMPLE, "");
+ }
+
+ setTransport(transport);
+
+ String host = connection.getHost();
+
+ // Fix for Java 1.5
+ if (host == null)
+ {
+ host = "";
+ }
+
+ setHost(host);
+
+ int port = connection.getPort();
+
+ if (port == -1)
+ {
+ // Fix for when there is port data but it is not automatically parseable by getPort().
+ String auth = connection.getAuthority();
+
+ if (auth != null && auth.contains(":"))
+ {
+ int start = auth.indexOf(":") + 1;
+ int end = start;
+ boolean looking = true;
+ boolean found = false;
+ //Walk the authority looking for a port value.
+ while (looking)
+ {
+ try
+ {
+ end++;
+ Integer.parseInt(auth.substring(start, end));
+
+ if (end >= auth.length())
+ {
+ looking = false;
+ found = true;
+ }
+ }
+ catch (NumberFormatException nfe)
+ {
+ looking = false;
+ }
+
+ }
+ if (found)
+ {
+ setPort(Integer.parseInt(auth.substring(start, end)));
+ }
+ else
+ {
+ URLHelper.parseError(connection.toString().indexOf(connection.getAuthority()) + end - 1,
+ "Illegal character in port number", connection.toString());
+ }
+
+ }
+ else
+ {
+ setPort(DEFAULT_PORT);
+ }
+ }
+ else
+ {
+ setPort(port);
+ }
+
+ String queryString = connection.getQuery();
+
+ URLHelper.parseOptions(_options, queryString);
+
+ //Fragment is #string (not used)
+ }
+ catch (URISyntaxException uris)
+ {
+ if (uris instanceof URLSyntaxException)
+ {
+ throw (URLSyntaxException) uris;
+ }
+
+ URLHelper.parseError(uris.getIndex(), uris.getReason(), uris.getInput());
+ }
+ }
+
+ public AMQPBrokerDetails(String host, int port, boolean useSSL)
+ {
+ _host = host;
+ _port = port;
+
+ if (useSSL)
+ {
+ setOption(OPTIONS_SSL, "true");
+ }
+ }
+
+ public String getHost()
+ {
+ return _host;
+ }
+
+ public void setHost(String _host)
+ {
+ this._host = _host;
+ }
+
+ public int getPort()
+ {
+ return _port;
+ }
+
+ public void setPort(int _port)
+ {
+ this._port = _port;
+ }
+
+ public String getTransport()
+ {
+ return _transport;
+ }
+
+ public void setTransport(String _transport)
+ {
+ this._transport = _transport;
+ }
+
+ public String getOption(String key)
+ {
+ return _options.get(key);
+ }
+
+ public void setOption(String key, String value)
+ {
+ _options.put(key, value);
+ }
+
+ public long getTimeout()
+ {
+ if (_options.containsKey(OPTIONS_CONNECT_TIMEOUT))
+ {
+ try
+ {
+ return Long.parseLong(_options.get(OPTIONS_CONNECT_TIMEOUT));
+ }
+ catch (NumberFormatException nfe)
+ {
+ //Do nothing as we will use the default below.
+ }
+ }
+
+ return BrokerDetails.DEFAULT_CONNECT_TIMEOUT;
+ }
+
+ public void setTimeout(long timeout)
+ {
+ setOption(OPTIONS_CONNECT_TIMEOUT, Long.toString(timeout));
+ }
+
+ public String toString()
+ {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append(_transport);
+ sb.append("://");
+
+ if (!(_transport.equalsIgnoreCase("vm")))
+ {
+ sb.append(_host);
+ }
+
+ sb.append(':');
+ sb.append(_port);
+
+ sb.append(printOptionsURL());
+
+ return sb.toString();
+ }
+
+ public boolean equals(Object o)
+ {
+ if (!(o instanceof BrokerDetails))
+ {
+ return false;
+ }
+
+ BrokerDetails bd = (BrokerDetails) o;
+
+ return _host.equalsIgnoreCase(bd.getHost()) && (_port == bd.getPort())
+ && _transport.equalsIgnoreCase(bd.getTransport()) && (useSSL() == bd.useSSL());
+
+ //todo do we need to compare all the options as well?
+ }
+
+ private String printOptionsURL()
+ {
+ StringBuffer optionsURL = new StringBuffer();
+
+ optionsURL.append('?');
+
+ if (!(_options.isEmpty()))
+ {
+
+ for (String key : _options.keySet())
+ {
+ optionsURL.append(key);
+
+ optionsURL.append("='");
+
+ optionsURL.append(_options.get(key));
+
+ optionsURL.append("'");
+
+ optionsURL.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+ }
+
+ //removeKey the extra DEFAULT_OPTION_SEPERATOR or the '?' if there are no options
+ optionsURL.deleteCharAt(optionsURL.length() - 1);
+
+ return optionsURL.toString();
+ }
+
+ public boolean useSSL()
+ {
+ // To be friendly to users we should be case insensitive.
+ // or simply force users to conform to OPTIONS_SSL
+ // todo make case insensitive by trying ssl Ssl sSl ssL SSl SsL sSL SSL
+
+ if (_options.containsKey(OPTIONS_SSL))
+ {
+ return _options.get(OPTIONS_SSL).equalsIgnoreCase("true");
+ }
+
+ return USE_SSL_DEFAULT;
+ }
+
+ public void useSSL(boolean ssl)
+ {
+ setOption(OPTIONS_SSL, Boolean.toString(ssl));
+ }
+
+ public static String checkTransport(String broker)
+ {
+ if ((!broker.contains("://")))
+ {
+ return "tcp://" + broker;
+ }
+ else
+ {
+ return broker;
+ }
+ }
+}