You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by di...@apache.org on 2005/12/05 22:38:38 UTC

svn commit: r354198 [1/3] - /webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/

Author: dims
Date: Mon Dec  5 13:38:30 2005
New Revision: 354198

URL: http://svn.apache.org/viewcvs?rev=354198&view=rev
Log:
Initial cut, compiles ok, have run a single thing yet.


Added:
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Handler.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorFactory.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorManager.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConstants.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSEndpoint.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSSender.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSTransport.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSURLConnection.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSURLHelper.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSVendorAdapter.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSVendorAdapterFactory.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JNDIVendorAdapter.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/MapUtils.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/QueueConnector.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSListener.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java
    webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java

Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java Mon Dec  5 13:38:30 2005
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.axis2.util.Loader;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnectionFactory;
+import java.beans.Introspector;
+import java.beans.PropertyDescriptor;
+import java.util.HashMap;
+
+/**
+ * Uses ClassUtils.forName and reflection to configure ConnectionFactory.  Uses
+ * the input sessions to create destinations.
+ */
+public abstract class BeanVendorAdapter extends JMSVendorAdapter {
+    protected final static String CONNECTION_FACTORY_CLASS =
+            "transport.jms.ConnectionFactoryClass";
+
+    public QueueConnectionFactory getQueueConnectionFactory(HashMap cfConfig)
+            throws Exception {
+        return (QueueConnectionFactory) getConnectionFactory(cfConfig);
+    }
+
+    public TopicConnectionFactory getTopicConnectionFactory(HashMap cfConfig)
+            throws Exception {
+        return (TopicConnectionFactory) getConnectionFactory(cfConfig);
+    }
+
+    private ConnectionFactory getConnectionFactory(HashMap cfConfig)
+            throws Exception {
+        String classname = (String) cfConfig.get(CONNECTION_FACTORY_CLASS);
+        if (classname == null || classname.trim().length() == 0)
+            throw new IllegalArgumentException("noCFClass");
+
+        Class factoryClass = Loader.loadClass(classname);
+        ConnectionFactory factory = (ConnectionFactory) factoryClass.newInstance();
+        callSetters(cfConfig, factoryClass, factory);
+        return factory;
+    }
+
+    private void callSetters(HashMap cfConfig,
+                             Class factoryClass,
+                             ConnectionFactory factory)
+            throws Exception {
+        PropertyDescriptor[] bpd = Introspector.getBeanInfo(factoryClass).getPropertyDescriptors();
+        for (int i = 0; i < bpd.length; i++) {
+            PropertyDescriptor thisBPD = bpd[i];
+            String propName = thisBPD.getName();
+            if (cfConfig.containsKey(propName)) {
+                Object value = cfConfig.get(propName);
+                if (value == null)
+                    continue;
+
+                String validType = thisBPD.getName();
+                if (!value.getClass().getName().equals(validType))
+                    throw new IllegalArgumentException("badType");
+                if (thisBPD.getWriteMethod() == null)
+                    throw new IllegalArgumentException("notWriteable");
+                thisBPD.getWriteMethod().invoke(factory, new Object[]{value});
+            }
+        }
+    }
+}
\ No newline at end of file

Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Handler.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Handler.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Handler.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Handler.java Mon Dec  5 13:38:30 2005
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import java.net.URL;
+import java.net.URLConnection;
+
+/**
+ * URLStreamHandler for the "jms" protocol
+ */
+public class Handler
+        extends java.net.URLStreamHandler {
+    static {
+        // register the JMSTransport class
+        //org.apache.axis.client.Call.setTransportForProtocol(JMSConstants.PROTOCOL, org.apache.axis2.transport.jms.JMSTransport.class);
+    }
+
+    /**
+     * Reassembles the URL string, in the form "jms:/<dest>?prop1=value1&prop2=value2&..."
+     */
+    protected String toExternalForm(URL url) {
+
+        String destination = url.getPath().substring(1);
+        String query = url.getQuery();
+
+        StringBuffer jmsurl = new StringBuffer(JMSConstants.PROTOCOL + ":/");
+        jmsurl.append(destination).append("?").append(query);
+
+        return jmsurl.toString();
+    }
+
+    protected URLConnection openConnection(URL url) {
+        return new JMSURLConnection(url);
+    }
+}
\ No newline at end of file

Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java Mon Dec  5 13:38:30 2005
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+/**
+ * The <code>InvokeException</code> is thrown when a method encounters a
+ * general exception in the course of processing.
+ */
+public class InvokeException extends RuntimeException {
+    public InvokeException(String message) {
+        super(message);
+    }
+}

Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java Mon Dec  5 13:38:30 2005
@@ -0,0 +1,32 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+/**
+ * The <code>InvokeTimeoutException</code> is thrown when a method cannot
+ * complete processing within the time allotted.  This occurs most often
+ * when the broker has failed and the client is unable to reconnect.  This
+ * may be thrown from any method within the wsclient that interacts with the
+ * broker.  The timeout is defined within the environment parameter to
+ * <code>createConnector</code> method in <code>JMSConnectorFactory</code>
+ * The key in the table is <code>JMSConstants.INTERACT_TIMEOUT_TIME</code>
+ */
+public class InvokeTimeoutException extends InvokeException {
+    public InvokeTimeoutException(String message) {
+        super(message);
+    }
+}

Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java Mon Dec  5 13:38:30 2005
@@ -0,0 +1,883 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+
+/**
+ * JMSConnector is an abstract class that encapsulates the work of connecting
+ * to JMS destinations. Its subclasses are TopicConnector and QueueConnector
+ * which further specialize connections to the pub-sub and the ptp domains.
+ * It also implements the capability to retry connections in the event of
+ * failures.
+ */
+public abstract class JMSConnector {
+    protected int m_numRetries;
+    protected long m_connectRetryInterval;
+    protected long m_interactRetryInterval;
+    protected long m_timeoutTime;
+    protected long m_poolTimeout;
+    protected AsyncConnection m_receiveConnection;
+    protected SyncConnection m_sendConnection;
+    protected int m_numSessions;
+    protected boolean m_allowReceive;
+    protected JMSVendorAdapter m_adapter;
+    protected JMSURLHelper m_jmsurl;
+
+    public JMSConnector(ConnectionFactory connectionFactory,
+                        int numRetries,
+                        int numSessions,
+                        long connectRetryInterval,
+                        long interactRetryInterval,
+                        long timeoutTime,
+                        boolean allowReceive,
+                        String clientID,
+                        String username,
+                        String password,
+                        JMSVendorAdapter adapter,
+                        JMSURLHelper jmsurl)
+            throws JMSException {
+        m_numRetries = numRetries;
+        m_connectRetryInterval = connectRetryInterval;
+        m_interactRetryInterval = interactRetryInterval;
+        m_timeoutTime = timeoutTime;
+        m_poolTimeout = timeoutTime / (long) numRetries;
+        m_numSessions = numSessions;
+        m_allowReceive = allowReceive;
+        m_adapter = adapter;
+        m_jmsurl = jmsurl;
+
+        // try to connect initially so we can fail fast
+        // in the case of irrecoverable errors.
+        // If we fail in a recoverable fashion we will retry
+        javax.jms.Connection sendConnection = createConnectionWithRetry(
+                connectionFactory,
+                username,
+                password);
+        m_sendConnection = createSyncConnection(connectionFactory, sendConnection,
+                m_numSessions, "SendThread",
+                clientID,
+                username,
+                password);
+
+        m_sendConnection.start();
+
+        if (m_allowReceive) {
+            javax.jms.Connection receiveConnection = createConnectionWithRetry(
+                    connectionFactory,
+                    username,
+                    password);
+            m_receiveConnection = createAsyncConnection(connectionFactory,
+                    receiveConnection,
+                    "ReceiveThread",
+                    clientID,
+                    username,
+                    password);
+            m_receiveConnection.start();
+        }
+    }
+
+    public int getNumRetries() {
+        return m_numRetries;
+    }
+
+    public int numSessions() {
+        return m_numSessions;
+    }
+
+    public ConnectionFactory getConnectionFactory() {
+        // there is always a send connection
+        return getSendConnection().getConnectionFactory();
+    }
+
+    public String getClientID() {
+        return getSendConnection().getClientID();
+    }
+
+    public String getUsername() {
+        return getSendConnection().getUsername();
+    }
+
+    public String getPassword() {
+        return getSendConnection().getPassword();
+    }
+
+    public JMSVendorAdapter getVendorAdapter() {
+        return m_adapter;
+    }
+
+    public JMSURLHelper getJMSURL() {
+        return m_jmsurl;
+    }
+
+    protected javax.jms.Connection createConnectionWithRetry(
+            ConnectionFactory connectionFactory,
+            String username,
+            String password)
+            throws JMSException {
+        javax.jms.Connection connection = null;
+        for (int numTries = 1; connection == null; numTries++) {
+            try {
+                connection = internalConnect(connectionFactory, username, password);
+            }
+            catch (JMSException jmse) {
+                if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION) || numTries == m_numRetries)
+                    throw jmse;
+                else
+                    try {
+                        Thread.sleep(m_connectRetryInterval);
+                    } catch (InterruptedException ie) {
+                    }
+                ;
+            }
+        }
+        return connection;
+    }
+
+    public void stop() {
+        JMSConnectorManager.getInstance().removeConnectorFromPool(this);
+
+        m_sendConnection.stopConnection();
+        if (m_allowReceive)
+            m_receiveConnection.stopConnection();
+    }
+
+    public void start() {
+        m_sendConnection.startConnection();
+        if (m_allowReceive)
+            m_receiveConnection.startConnection();
+
+        JMSConnectorManager.getInstance().addConnectorToPool(this);
+    }
+
+    public void shutdown() {
+        m_sendConnection.shutdown();
+        if (m_allowReceive)
+            m_receiveConnection.shutdown();
+    }
+
+    public abstract JMSEndpoint createEndpoint(String destinationName)
+            throws JMSException;
+
+    public abstract JMSEndpoint createEndpoint(Destination destination)
+            throws JMSException;
+
+
+    protected abstract javax.jms.Connection internalConnect(
+            ConnectionFactory connectionFactory,
+            String username,
+            String password)
+            throws JMSException;
+
+    private abstract class Connection extends Thread implements ExceptionListener {
+        private ConnectionFactory m_connectionFactory;
+        protected javax.jms.Connection m_connection;
+
+        protected boolean m_isActive;
+        private boolean m_needsToConnect;
+        private boolean m_startConnection;
+        private String m_clientID;
+        private String m_username;
+        private String m_password;
+
+        private Object m_jmsLock;
+        private Object m_lifecycleLock;
+
+        protected Connection(ConnectionFactory connectionFactory,
+                             javax.jms.Connection connection,
+                             String threadName,
+                             String clientID,
+                             String username,
+                             String password)
+                throws JMSException {
+            super(threadName);
+            m_connectionFactory = connectionFactory;
+
+            m_clientID = clientID;
+            m_username = username;
+            m_password = password;
+
+            m_jmsLock = new Object();
+            m_lifecycleLock = new Object();
+
+            if (connection != null) {
+                m_needsToConnect = false;
+                m_connection = connection;
+                m_connection.setExceptionListener(this);
+                if (m_clientID != null)
+                    m_connection.setClientID(m_clientID);
+            } else {
+                m_needsToConnect = true;
+            }
+
+            m_isActive = true;
+        }
+
+        public ConnectionFactory getConnectionFactory() {
+            return m_connectionFactory;
+        }
+
+        public String getClientID() {
+            return m_clientID;
+        }
+
+        public String getUsername() {
+            return m_username;
+        }
+
+        public String getPassword() {
+            return m_password;
+        }
+
+        /**
+         * @todo handle non-recoverable errors
+         */
+
+        public void run() {
+            // loop until a connection is made and when a connection is made (re)establish
+            // any subscriptions
+            while (m_isActive) {
+                if (m_needsToConnect) {
+                    m_connection = null;
+                    try {
+                        m_connection = internalConnect(m_connectionFactory,
+                                m_username, m_password);
+                        m_connection.setExceptionListener(this);
+                        if (m_clientID != null)
+                            m_connection.setClientID(m_clientID);
+                    }
+                    catch (JMSException e) {
+                        // simply backoff for a while and then retry
+                        try {
+                            Thread.sleep(m_connectRetryInterval);
+                        } catch (InterruptedException ie) {
+                        }
+                        continue;
+                    }
+                } else
+                    m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because
+                // we lost the connection
+
+                // we now have a valid connection so establish some context
+                try {
+                    internalOnConnect();
+                }
+                catch (Exception e) {
+                    // insert code to handle non recoverable errors
+                    // simply retry
+                    continue;
+                }
+
+                synchronized (m_jmsLock) {
+                    try {
+                        m_jmsLock.wait();
+                    } catch (InterruptedException ie) {
+                    } // until notified due to some change in status
+                }
+            }
+
+            // no longer staying connected, so see what we can cleanup
+            internalOnShutdown();
+        }
+
+
+        void startConnection() {
+            synchronized (m_lifecycleLock) {
+                if (m_startConnection)
+                    return;
+                m_startConnection = true;
+                try {
+                    m_connection.start();
+                } catch (Throwable e) {
+                } // ignore
+            }
+        }
+
+        void stopConnection() {
+            synchronized (m_lifecycleLock) {
+                if (!m_startConnection)
+                    return;
+                m_startConnection = false;
+                try {
+                    m_connection.stop();
+                } catch (Throwable e) {
+                } // ignore
+            }
+        }
+
+        void shutdown() {
+            m_isActive = false;
+            synchronized (m_jmsLock) {
+                m_jmsLock.notifyAll();
+            }
+        }
+
+
+        public void onException(JMSException exception) {
+            if (m_adapter.isRecoverable(exception,
+                    JMSVendorAdapter.ON_EXCEPTION_ACTION))
+                return;
+            onException();
+            synchronized (m_jmsLock) {
+                m_jmsLock.notifyAll();
+            }
+        }
+
+        private final void internalOnConnect()
+                throws Exception {
+            onConnect();
+            synchronized (m_lifecycleLock) {
+                if (m_startConnection) {
+                    try {
+                        m_connection.start();
+                    } catch (Throwable e) {
+                    } // ignore
+                }
+            }
+        }
+
+        private final void internalOnShutdown() {
+            stopConnection();
+            onShutdown();
+            try {
+                m_connection.close();
+            } catch (Throwable e) {
+            } // ignore
+        }
+
+        protected abstract void onConnect() throws Exception;
+
+        protected abstract void onShutdown();
+
+        protected abstract void onException();
+    }
+
+    protected abstract SyncConnection createSyncConnection(ConnectionFactory factory,
+                                                           javax.jms.Connection connection,
+                                                           int numSessions,
+                                                           String threadName,
+                                                           String clientID,
+                                                           String username,
+                                                           String password)
+
+            throws JMSException;
+
+    SyncConnection getSendConnection() {
+        return m_sendConnection;
+    }
+
+    protected abstract class SyncConnection extends Connection {
+        LinkedList m_senders;
+        int m_numSessions;
+        Object m_senderLock;
+
+        SyncConnection(ConnectionFactory connectionFactory,
+                       javax.jms.Connection connection,
+                       int numSessions,
+                       String threadName,
+                       String clientID,
+                       String username,
+                       String password)
+                throws JMSException {
+            super(connectionFactory, connection, threadName,
+                    clientID, username, password);
+            m_senders = new LinkedList();
+            m_numSessions = numSessions;
+            m_senderLock = new Object();
+        }
+
+        protected abstract SendSession createSendSession(javax.jms.Connection connection)
+                throws JMSException;
+
+        protected void onConnect()
+                throws JMSException {
+            synchronized (m_senderLock) {
+                for (int i = 0; i < m_numSessions; i++) {
+                    m_senders.add(createSendSession(m_connection));
+                }
+                m_senderLock.notifyAll();
+            }
+        }
+
+        byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties)
+                throws Exception {
+            long timeoutTime = System.currentTimeMillis() + timeout;
+            while (true) {
+                if (System.currentTimeMillis() > timeoutTime) {
+                    throw new InvokeTimeoutException("Unable to complete call in time allotted");
+                }
+
+                SendSession sendSession = null;
+                try {
+                    sendSession = getSessionFromPool(m_poolTimeout);
+                    byte[] response = sendSession.call(endpoint,
+                            message,
+                            timeoutTime - System.currentTimeMillis(),
+                            properties);
+                    returnSessionToPool(sendSession);
+                    if (response == null) {
+                        throw new InvokeTimeoutException("Unable to complete call in time allotted");
+                    }
+                    return response;
+                }
+                catch (JMSException jmse) {
+                    if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
+                        //this we cannot recover from
+                        //but it does not invalidate the session
+                        returnSessionToPool(sendSession);
+                        throw jmse;
+                    }
+
+                    //for now we will assume this is a reconnect related issue
+                    //and let the sender be collected
+                    //give the reconnect thread a chance to fill the pool
+                    Thread.yield();
+                    continue;
+                }
+                catch (NullPointerException npe) {
+                    Thread.yield();
+                    continue;
+                }
+            }
+        }
+
+        /**
+         * @todo add in handling for security exceptions
+         * @todo add support for timeouts
+         */
+        void send(JMSEndpoint endpoint, byte[] message, HashMap properties)
+                throws Exception {
+            long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+            while (true) {
+                if (System.currentTimeMillis() > timeoutTime) {
+                    throw new InvokeTimeoutException("Cannot complete send in time allotted");
+                }
+
+                SendSession sendSession = null;
+                try {
+                    sendSession = getSessionFromPool(m_poolTimeout);
+                    sendSession.send(endpoint, message, properties);
+                    returnSessionToPool(sendSession);
+                }
+                catch (JMSException jmse) {
+                    if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
+                        //this we cannot recover from
+                        //but it does not invalidate the session
+                        returnSessionToPool(sendSession);
+                        throw jmse;
+                    }
+                    //for now we will assume this is a reconnect related issue
+                    //and let the sender be collected
+                    //give the reconnect thread a chance to fill the pool
+                    Thread.yield();
+                    continue;
+                }
+                catch (NullPointerException npe) {
+                    //give the reconnect thread a chance to fill the pool
+                    Thread.yield();
+                    continue;
+                }
+                break;
+            }
+        }
+
+        protected void onException() {
+            synchronized (m_senderLock) {
+                m_senders.clear();
+            }
+        }
+
+        protected void onShutdown() {
+            synchronized (m_senderLock) {
+                Iterator senders = m_senders.iterator();
+                while (senders.hasNext()) {
+                    SendSession session = (SendSession) senders.next();
+                    session.cleanup();
+                }
+                m_senders.clear();
+            }
+        }
+
+        private SendSession getSessionFromPool(long timeout) {
+            synchronized (m_senderLock) {
+                while (m_senders.size() == 0) {
+                    try {
+                        m_senderLock.wait(timeout);
+                        if (m_senders.size() == 0) {
+                            return null;
+                        }
+                    }
+                    catch (InterruptedException ignore) {
+                        return null;
+                    }
+                }
+                return (SendSession) m_senders.removeFirst();
+            }
+        }
+
+        private void returnSessionToPool(SendSession sendSession) {
+            synchronized (m_senderLock) {
+                m_senders.addLast(sendSession);
+                m_senderLock.notifyAll();
+            }
+        }
+
+        protected abstract class SendSession extends ConnectorSession {
+            MessageProducer m_producer;
+
+            SendSession(Session session,
+                        MessageProducer producer)
+                    throws JMSException {
+                super(session);
+                m_producer = producer;
+            }
+
+            protected abstract Destination createTemporaryDestination()
+                    throws JMSException;
+
+            protected abstract void deleteTemporaryDestination(Destination destination)
+                    throws JMSException;
+
+            protected abstract MessageConsumer createConsumer(Destination destination)
+                    throws JMSException;
+
+            protected abstract void send(Destination destination,
+                                         Message message,
+                                         int deliveryMode,
+                                         int priority,
+                                         long timeToLive)
+                    throws JMSException;
+
+            void send(JMSEndpoint endpoint, byte[] message, HashMap properties)
+                    throws Exception {
+                BytesMessage jmsMessage = m_session.createBytesMessage();
+                jmsMessage.writeBytes(message);
+                int deliveryMode = extractDeliveryMode(properties);
+                int priority = extractPriority(properties);
+                long timeToLive = extractTimeToLive(properties);
+
+                if (properties != null && !properties.isEmpty())
+                    setProperties(properties, jmsMessage);
+
+                send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
+                        priority, timeToLive);
+            }
+
+
+            void cleanup() {
+                try {
+                    m_producer.close();
+                } catch (Throwable t) {
+                }
+                try {
+                    m_session.close();
+                } catch (Throwable t) {
+                }
+            }
+
+            byte[] call(JMSEndpoint endpoint, byte[] message, long timeout,
+                        HashMap properties)
+                    throws Exception {
+                Destination reply = createTemporaryDestination();
+                MessageConsumer subscriber = createConsumer(reply);
+                BytesMessage jmsMessage = m_session.createBytesMessage();
+                jmsMessage.writeBytes(message);
+                jmsMessage.setJMSReplyTo(reply);
+
+                int deliveryMode = extractDeliveryMode(properties);
+                int priority = extractPriority(properties);
+                long timeToLive = extractTimeToLive(properties);
+
+                if (properties != null && !properties.isEmpty())
+                    setProperties(properties, jmsMessage);
+
+                send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
+                        priority, timeToLive);
+                BytesMessage response = null;
+                try {
+                    response = (BytesMessage) subscriber.receive(timeout);
+                } catch (ClassCastException cce) {
+                    throw new InvokeException
+                            ("Error: unexpected message type received - expected BytesMessage");
+                }
+                byte[] respBytes = null;
+                if (response != null) {
+                    byte[] buffer = new byte[8 * 1024];
+                    ByteArrayOutputStream out = new ByteArrayOutputStream();
+                    for (int bytesRead = response.readBytes(buffer);
+                         bytesRead != -1; bytesRead = response.readBytes(buffer)) {
+                        out.write(buffer, 0, bytesRead);
+                    }
+                    respBytes = out.toByteArray();
+                }
+                subscriber.close();
+                deleteTemporaryDestination(reply);
+                return respBytes;
+            }
+
+            private int extractPriority(HashMap properties) {
+                return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,
+                        JMSConstants.DEFAULT_PRIORITY);
+            }
+
+            private int extractDeliveryMode(HashMap properties) {
+                return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE,
+                        JMSConstants.DEFAULT_DELIVERY_MODE);
+            }
+
+            private long extractTimeToLive(HashMap properties) {
+                return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE,
+                        JMSConstants.DEFAULT_TIME_TO_LIVE);
+            }
+
+            private void setProperties(HashMap properties, Message message)
+                    throws JMSException {
+                Iterator propertyIter = properties.entrySet().iterator();
+                while (propertyIter.hasNext()) {
+                    Map.Entry property = (Map.Entry) propertyIter.next();
+                    setProperty((String) property.getKey(), property.getValue(),
+                            message);
+                }
+            }
+
+            private void setProperty(String property, Object value, Message message)
+                    throws JMSException {
+                if (property == null)
+                    return;
+                if (property.equals(JMSConstants.JMS_CORRELATION_ID))
+                    message.setJMSCorrelationID((String) value);
+                else if (property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES))
+                    message.setJMSCorrelationIDAsBytes((byte[]) value);
+                else if (property.equals(JMSConstants.JMS_TYPE))
+                    message.setJMSType((String) value);
+                else
+                    message.setObjectProperty(property, value);
+            }
+        }
+    }
+
+    AsyncConnection getReceiveConnection() {
+        return m_receiveConnection;
+    }
+
+    protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,
+                                                             javax.jms.Connection connection,
+                                                             String threadName,
+                                                             String clientID,
+                                                             String username,
+                                                             String password)
+
+            throws JMSException;
+
+    protected abstract class AsyncConnection extends Connection {
+        HashMap m_subscriptions;
+        Object m_subscriptionLock;
+
+        protected AsyncConnection(ConnectionFactory connectionFactory,
+                                  javax.jms.Connection connection,
+                                  String threadName,
+                                  String clientID,
+                                  String username,
+                                  String password)
+                throws JMSException {
+            super(connectionFactory, connection, threadName,
+                    clientID, username, password);
+            m_subscriptions = new HashMap();
+            m_subscriptionLock = new Object();
+        }
+
+        protected abstract ListenerSession createListenerSession(
+                javax.jms.Connection connection,
+                Subscription subscription)
+                throws Exception;
+
+        protected void onShutdown() {
+            synchronized (m_subscriptionLock) {
+                Iterator subscriptions = m_subscriptions.keySet().iterator();
+                while (subscriptions.hasNext()) {
+                    Subscription subscription = (Subscription) subscriptions.next();
+                    ListenerSession session = (ListenerSession)
+                            m_subscriptions.get(subscription);
+                    if (session != null) {
+                        session.cleanup();
+                    }
+
+                }
+                m_subscriptions.clear();
+            }
+        }
+
+        /**
+         * @param subscription
+         * @todo add in security exception propagation
+         */
+        void subscribe(Subscription subscription)
+                throws Exception {
+            long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+            synchronized (m_subscriptionLock) {
+                if (m_subscriptions.containsKey(subscription))
+                    return;
+                while (true) {
+                    if (System.currentTimeMillis() > timeoutTime) {
+                        throw new InvokeTimeoutException("Cannot subscribe listener");
+                    }
+
+                    try {
+                        ListenerSession session = createListenerSession(m_connection,
+                                subscription);
+                        m_subscriptions.put(subscription, session);
+                        break;
+                    }
+                    catch (JMSException jmse) {
+                        if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION)) {
+                            throw jmse;
+                        }
+
+                        try {
+                            m_subscriptionLock.wait(m_interactRetryInterval);
+                        }
+                        catch (InterruptedException ignore) {
+                        }
+                        //give reconnect a chance
+                        Thread.yield();
+                        continue;
+                    }
+                    catch (NullPointerException jmse) {
+                        //we ARE reconnecting
+                        try {
+                            m_subscriptionLock.wait(m_interactRetryInterval);
+                        }
+                        catch (InterruptedException ignore) {
+                        }
+                        //give reconnect a chance
+                        Thread.yield();
+                        continue;
+                    }
+                }
+            }
+        }
+
+        void unsubscribe(Subscription subscription) {
+            long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+            synchronized (m_subscriptionLock) {
+                if (!m_subscriptions.containsKey(subscription))
+                    return;
+                while (true) {
+                    if (System.currentTimeMillis() > timeoutTime) {
+                        throw new InvokeTimeoutException("Cannot unsubscribe listener");
+                    }
+
+                    //give reconnect a chance
+                    Thread.yield();
+                    try {
+                        ListenerSession session = (ListenerSession)
+                                m_subscriptions.get(subscription);
+                        session.cleanup();
+                        m_subscriptions.remove(subscription);
+                        break;
+                    }
+                    catch (NullPointerException jmse) {
+                        //we are reconnecting
+                        try {
+                            m_subscriptionLock.wait(m_interactRetryInterval);
+                        }
+                        catch (InterruptedException ignore) {
+                        }
+                        continue;
+                    }
+                }
+            }
+        }
+
+        protected void onConnect()
+                throws Exception {
+            synchronized (m_subscriptionLock) {
+                Iterator subscriptions = m_subscriptions.keySet().iterator();
+                while (subscriptions.hasNext()) {
+                    Subscription subscription = (Subscription) subscriptions.next();
+
+                    if (m_subscriptions.get(subscription) == null) {
+                        m_subscriptions.put(subscription,
+                                createListenerSession(m_connection, subscription));
+                    }
+                }
+                m_subscriptionLock.notifyAll();
+            }
+        }
+
+        protected void onException() {
+            synchronized (m_subscriptionLock) {
+                Iterator subscriptions = m_subscriptions.keySet().iterator();
+                while (subscriptions.hasNext()) {
+                    Subscription subscription = (Subscription) subscriptions.next();
+                    m_subscriptions.put(subscription, null);
+                }
+            }
+        }
+
+
+        protected class ListenerSession extends ConnectorSession {
+            protected MessageConsumer m_consumer;
+            protected Subscription m_subscription;
+
+            ListenerSession(Session session,
+                            MessageConsumer consumer,
+                            Subscription subscription)
+                    throws Exception {
+                super(session);
+                m_subscription = subscription;
+                m_consumer = consumer;
+                Destination destination = subscription.m_endpoint.getDestination(m_session);
+                m_consumer.setMessageListener(subscription.m_listener);
+            }
+
+            void cleanup() {
+                try {
+                    m_consumer.close();
+                } catch (Exception ignore) {
+                }
+                try {
+                    m_session.close();
+                } catch (Exception ignore) {
+                }
+            }
+
+        }
+    }
+
+    private abstract class ConnectorSession {
+        Session m_session;
+
+        ConnectorSession(Session session)
+                throws JMSException {
+            m_session = session;
+        }
+    }
+}
\ No newline at end of file

Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorFactory.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorFactory.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorFactory.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorFactory.java Mon Dec  5 13:38:30 2005
@@ -0,0 +1,225 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+
+/**
+ * JMSConnectorFactory is a factory class for creating JMSConnectors. It can
+ * create both client connectors and server connectors.  A server connector
+ * is configured to allow asynchronous message receipt, while a client
+ * connector is not.
+ * <p/>
+ * JMSConnectorFactory can also be used to select an appropriately configured
+ * JMSConnector from an existing pool of connectors.
+ */
+public abstract class JMSConnectorFactory {
+    protected static Log log =
+            LogFactory.getLog(JMSConnectorFactory.class.getName());
+
+    /**
+     * Performs an initial check on the connector properties, and then defers
+     * to the vendor adapter for matching on the vendor-specific connection factory.
+     *
+     * @param connectors     the list of potential matches
+     * @param connectorProps the set of properties to be used for matching the connector
+     * @param cfProps        the set of properties to be used for matching the connection factory
+     * @param username       the user requesting the connector
+     * @param password       the password associated with the requesting user
+     * @param adapter        the vendor adapter specified in the JMS URL
+     * @return a JMSConnector that matches the specified properties
+     */
+    public static JMSConnector matchConnector(java.util.Set connectors,
+                                              HashMap connectorProps,
+                                              HashMap cfProps,
+                                              String username,
+                                              String password,
+                                              JMSVendorAdapter adapter) {
+        java.util.Iterator iter = connectors.iterator();
+        while (iter.hasNext()) {
+            JMSConnector conn = (JMSConnector) iter.next();
+
+            // username
+            String connectorUsername = conn.getUsername();
+            if (!(((connectorUsername == null) && (username == null)) ||
+                    ((connectorUsername != null) && (username != null) && (connectorUsername.equals(username)))))
+                continue;
+
+            // password
+            String connectorPassword = conn.getPassword();
+            if (!(((connectorPassword == null) && (password == null)) ||
+                    ((connectorPassword != null) && (password != null) && (connectorPassword.equals(password)))))
+                continue;
+
+            // num retries
+            int connectorNumRetries = conn.getNumRetries();
+            String propertyNumRetries = (String) connectorProps.get(JMSConstants.NUM_RETRIES);
+            int numRetries = JMSConstants.DEFAULT_NUM_RETRIES;
+            if (propertyNumRetries != null)
+                numRetries = Integer.parseInt(propertyNumRetries);
+            if (connectorNumRetries != numRetries)
+                continue;
+
+            // client id
+            String connectorClientID = conn.getClientID();
+            String clientID = (String) connectorProps.get(JMSConstants.CLIENT_ID);
+            if (!(((connectorClientID == null) && (clientID == null))
+                    ||
+                    ((connectorClientID != null) && (clientID != null) && connectorClientID.equals(clientID))))
+                continue;
+
+            // domain
+            String connectorDomain = (conn instanceof QueueConnector) ? JMSConstants.DOMAIN_QUEUE : JMSConstants.DOMAIN_TOPIC;
+            String propertyDomain = (String) connectorProps.get(JMSConstants.DOMAIN);
+            String domain = JMSConstants.DOMAIN_DEFAULT;
+            if (propertyDomain != null)
+                domain = propertyDomain;
+            if (!(((connectorDomain == null) && (domain == null))
+                    ||
+                    ((connectorDomain != null) && (domain != null) && connectorDomain.equalsIgnoreCase(domain))))
+                continue;
+
+            // the connection factory must also match for the connector to be reused
+            JMSURLHelper jmsurl = conn.getJMSURL();
+            if (adapter.isMatchingConnectionFactory(conn.getConnectionFactory(), jmsurl, cfProps)) {
+                // attempt to reserve the connector
+                try {
+                    JMSConnectorManager.getInstance().reserve(conn);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("JMSConnectorFactory: Found matching connector");
+                    }
+                }
+                catch (Exception e) {
+                    // ignore. the connector may be in the process of shutting down, so try the next element
+                    continue;
+                }
+
+                return conn;
+            }
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("JMSConnectorFactory: No matching connectors found");
+        }
+
+        return null;
+    }
+
+    /**
+     * Static method to create a server connector. Server connectors can
+     * accept incoming requests.
+     *
+     * @param connectorConfig
+     * @param cfConfig
+     * @param username
+     * @param password
+     * @return
+     * @throws Exception
+     */
+    public static JMSConnector createServerConnector(HashMap connectorConfig,
+                                                     HashMap cfConfig,
+                                                     String username,
+                                                     String password,
+                                                     JMSVendorAdapter adapter)
+            throws Exception {
+        return createConnector(connectorConfig, cfConfig, true,
+                username, password, adapter);
+    }
+
+    /**
+     * Static method to create a client connector. Client connectors cannot
+     * accept incoming requests.
+     *
+     * @param connectorConfig
+     * @param cfConfig
+     * @param username
+     * @param password
+     * @return
+     * @throws Exception
+     */
+    public static JMSConnector createClientConnector(HashMap connectorConfig,
+                                                     HashMap cfConfig,
+                                                     String username,
+                                                     String password,
+                                                     JMSVendorAdapter adapter)
+            throws Exception {
+        return createConnector(connectorConfig, cfConfig, false,
+                username, password, adapter);
+    }
+
+    private static JMSConnector createConnector(HashMap connectorConfig,
+                                                HashMap cfConfig,
+                                                boolean allowReceive,
+                                                String username,
+                                                String password,
+                                                JMSVendorAdapter adapter)
+            throws Exception {
+        if (connectorConfig != null)
+            connectorConfig = (HashMap) connectorConfig.clone();
+        int numRetries = MapUtils.removeIntProperty(connectorConfig,
+                JMSConstants.NUM_RETRIES,
+                JMSConstants.DEFAULT_NUM_RETRIES);
+
+        int numSessions = MapUtils.removeIntProperty(connectorConfig,
+                JMSConstants.NUM_SESSIONS,
+                JMSConstants.DEFAULT_NUM_SESSIONS);
+
+        long connectRetryInterval = MapUtils.removeLongProperty(connectorConfig,
+                JMSConstants.CONNECT_RETRY_INTERVAL,
+                JMSConstants.DEFAULT_CONNECT_RETRY_INTERVAL);
+
+        long interactRetryInterval = MapUtils.removeLongProperty(connectorConfig,
+                JMSConstants.INTERACT_RETRY_INTERVAL,
+                JMSConstants.DEFAULT_INTERACT_RETRY_INTERVAL);
+
+        long timeoutTime = MapUtils.removeLongProperty(connectorConfig,
+                JMSConstants.TIMEOUT_TIME,
+                JMSConstants.DEFAULT_TIMEOUT_TIME);
+
+        String clientID = MapUtils.removeStringProperty(connectorConfig,
+                JMSConstants.CLIENT_ID,
+                null);
+        String domain = MapUtils.removeStringProperty(connectorConfig,
+                JMSConstants.DOMAIN,
+                JMSConstants.DOMAIN_DEFAULT);
+
+        // this will be set if the target endpoint address was set on the Axis call
+        JMSURLHelper jmsurl = (JMSURLHelper) connectorConfig.get(JMSConstants.JMS_URL);
+
+        if (cfConfig == null)
+            throw new IllegalArgumentException("noCfConfig");
+
+        if (domain.equals(JMSConstants.DOMAIN_QUEUE)) {
+            return new QueueConnector(adapter.getQueueConnectionFactory(cfConfig),
+                    numRetries, numSessions, connectRetryInterval,
+                    interactRetryInterval, timeoutTime,
+                    allowReceive, clientID, username, password,
+                    adapter, jmsurl);
+        } else // domain is Topic
+        {
+            return new TopicConnector(adapter.getTopicConnectionFactory(cfConfig),
+                    numRetries, numSessions, connectRetryInterval,
+                    interactRetryInterval, timeoutTime,
+                    allowReceive, clientID, username, password,
+                    adapter, jmsurl);
+        }
+    }
+}
\ No newline at end of file

Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorManager.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorManager.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorManager.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnectorManager.java Mon Dec  5 13:38:30 2005
@@ -0,0 +1,408 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.i18n.Messages;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.HashMap;
+import java.util.Iterator;
+
+/**
+ * JMSConnectorManager manages a pool of connectors and works with the
+ * vendor adapters to support the reuse of JMS connections.
+ */
+public class JMSConnectorManager {
+    protected static Log log =
+            LogFactory.getLog(JMSConnectorManager.class.getName());
+
+    private static JMSConnectorManager s_instance = new JMSConnectorManager();
+
+    private static HashMap vendorConnectorPools = new HashMap();
+    private int DEFAULT_WAIT_FOR_SHUTDOWN = 90000; // 1.5 minutes
+
+    private JMSConnectorManager() {
+    }
+
+    public static JMSConnectorManager getInstance() {
+        return s_instance;
+    }
+
+    /**
+     * Returns the pool of JMSConnectors for a particular vendor
+     */
+    public ShareableObjectPool getVendorPool(String vendorId) {
+        return (ShareableObjectPool) vendorConnectorPools.get(vendorId);
+    }
+
+    /**
+     * Retrieves a JMSConnector that satisfies the provided connector criteria
+     */
+    public JMSConnector getConnector(HashMap connectorProperties,
+                                     HashMap connectionFactoryProperties,
+                                     String username,
+                                     String password,
+                                     JMSVendorAdapter vendorAdapter)
+            throws AxisFault {
+        JMSConnector connector = null;
+
+        try {
+            // check for a vendor-specific pool, and create if necessary
+            ShareableObjectPool vendorConnectors = getVendorPool(vendorAdapter.getVendorId());
+            if (vendorConnectors == null) {
+                synchronized (vendorConnectorPools) {
+                    vendorConnectors = getVendorPool(vendorAdapter.getVendorId());
+                    if (vendorConnectors == null) {
+                        vendorConnectors = new ShareableObjectPool();
+                        vendorConnectorPools.put(vendorAdapter.getVendorId(), vendorConnectors);
+                    }
+                }
+            }
+
+            // look for a matching JMSConnector among existing connectors
+            synchronized (vendorConnectors) {
+                try {
+
+                    connector = JMSConnectorFactory.matchConnector(vendorConnectors.getElements(),
+                            connectorProperties,
+                            connectionFactoryProperties,
+                            username,
+                            password,
+                            vendorAdapter);
+                }
+                catch (Exception e) {
+                } // ignore. a new connector will be created if no match is found
+
+                if (connector == null) {
+                    connector = JMSConnectorFactory.createClientConnector(connectorProperties,
+                            connectionFactoryProperties,
+                            username,
+                            password,
+                            vendorAdapter);
+                    connector.start();
+                }
+            }
+        }
+        catch (Exception e) {
+            log.error(Messages.getMessage("cannotConnectError"), e);
+
+            if (e instanceof AxisFault)
+                throw (AxisFault) e;
+            throw new AxisFault("cannotConnect", e);
+        }
+
+        return connector;
+    }
+
+    /**
+     * Closes JMSConnectors in all pools
+     */
+    void closeAllConnectors() {
+        if (log.isDebugEnabled()) {
+            log.debug("Enter: JMSConnectorManager::closeAllConnectors");
+        }
+
+        synchronized (vendorConnectorPools) {
+            Iterator iter = vendorConnectorPools.values().iterator();
+            while (iter.hasNext()) {
+                // close all connectors in the vendor pool
+                ShareableObjectPool pool = (ShareableObjectPool) iter.next();
+                synchronized (pool) {
+                    java.util.Iterator connectors = pool.getElements().iterator();
+                    while (connectors.hasNext()) {
+                        JMSConnector conn = (JMSConnector) connectors.next();
+                        try {
+                            // shutdown automatically decrements the ref count of a connector before closing it
+                            // call reserve() to simulate the checkout
+                            reserve(conn);
+                            closeConnector(conn);
+                        }
+                        catch (Exception e) {
+                        } // ignore. the connector is already being deactivated
+                    }
+                }
+            }
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Exit: JMSConnectorManager::closeAllConnectors");
+        }
+    }
+
+    /**
+     * Closes JMS connectors that match the specified endpoint address
+     */
+    void closeMatchingJMSConnectors(HashMap connectorProps, HashMap cfProps,
+                                    String username, String password,
+                                    JMSVendorAdapter vendorAdapter) {
+        if (log.isDebugEnabled()) {
+            log.debug("Enter: JMSConnectorManager::closeMatchingJMSConnectors");
+        }
+
+        try {
+            String vendorId = vendorAdapter.getVendorId();
+
+            // get the vendor-specific pool of connectors
+            ShareableObjectPool vendorConnectors = null;
+            synchronized (vendorConnectorPools) {
+                vendorConnectors = getVendorPool(vendorId);
+            }
+
+            // it's possible that there is no pool for that vendor
+            if (vendorConnectors == null)
+                return;
+
+            synchronized (vendorConnectors) {
+                // close any matched connectors
+                JMSConnector connector = null;
+                while ((vendorConnectors.size() > 0) &&
+                        (connector = JMSConnectorFactory.matchConnector(vendorConnectors.getElements(),
+                                connectorProps,
+                                cfProps,
+                                username,
+                                password,
+                                vendorAdapter)) != null) {
+                    closeConnector(connector);
+                }
+            }
+        }
+        catch (Exception e) {
+            log.warn(Messages.getMessage("failedJMSConnectorShutdown"), e);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Exit: JMSConnectorManager::closeMatchingJMSConnectors");
+        }
+    }
+
+    private void closeConnector(JMSConnector conn) {
+        conn.stop();
+        conn.shutdown();
+    }
+
+    /**
+     * Adds a JMSConnector to the appropriate vendor pool
+     */
+    public void addConnectorToPool(JMSConnector conn) {
+        if (log.isDebugEnabled()) {
+            log.debug("Enter: JMSConnectorManager::addConnectorToPool");
+        }
+
+        ShareableObjectPool vendorConnectors = null;
+        synchronized (vendorConnectorPools) {
+            String vendorId = conn.getVendorAdapter().getVendorId();
+            vendorConnectors = getVendorPool(vendorId);
+            // it's possible the pool does not yet exist (if, for example, the connector
+            // is created before invoking the call/JMSTransport, as is the case with
+            // SimpleJMSListener)
+            if (vendorConnectors == null) {
+                vendorConnectors = new ShareableObjectPool();
+                vendorConnectorPools.put(vendorId, vendorConnectors);
+            }
+        }
+
+        synchronized (vendorConnectors) {
+            vendorConnectors.addObject(conn);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Exit: JMSConnectorManager::addConnectorToPool");
+        }
+    }
+
+    /**
+     * Removes a JMSConnector from the appropriate vendor pool
+     */
+    public void removeConnectorFromPool(JMSConnector conn) {
+        if (log.isDebugEnabled()) {
+            log.debug("Enter: JMSConnectorManager::removeConnectorFromPool");
+        }
+
+        ShareableObjectPool vendorConnectors = null;
+        synchronized (vendorConnectorPools) {
+            vendorConnectors = getVendorPool(conn.getVendorAdapter().getVendorId());
+        }
+        if (vendorConnectors == null)
+            return;
+
+        synchronized (vendorConnectors) {
+            // first release, to decrement the ref count (it is automatically incremented when
+            // the connector is matched)
+            vendorConnectors.release(conn);
+            vendorConnectors.removeObject(conn);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Exit: JMSConnectorManager::removeConnectorFromPool");
+        }
+    }
+
+    /**
+     * Performs a non-exclusive checkout of the JMSConnector
+     */
+    public void reserve(JMSConnector connector) throws Exception {
+        ShareableObjectPool pool = null;
+        synchronized (vendorConnectorPools) {
+            pool = getVendorPool(connector.getVendorAdapter().getVendorId());
+        }
+        if (pool != null)
+            pool.reserve(connector);
+    }
+
+    /**
+     * Performs a non-exclusive checkin of the JMSConnector
+     */
+    public void release(JMSConnector connector) {
+        ShareableObjectPool pool = null;
+        synchronized (vendorConnectorPools) {
+            pool = getVendorPool(connector.getVendorAdapter().getVendorId());
+        }
+        if (pool != null)
+            pool.release(connector);
+    }
+
+    /**
+     * A simple non-blocking pool impl for objects that can be shared.
+     * Only a ref count is necessary to prevent collisions at shutdown.
+     * Todo: max size, cleanup stale connections
+     */
+    public class ShareableObjectPool {
+        // maps object to ref count wrapper
+        private java.util.HashMap m_elements;
+
+        // holds objects which should no longer be leased (pending removal)
+        private java.util.HashMap m_expiring;
+
+        private int m_numElements = 0;
+
+        public ShareableObjectPool() {
+            m_elements = new java.util.HashMap();
+            m_expiring = new java.util.HashMap();
+        }
+
+        /**
+         * Adds the object to the pool, if not already added
+         */
+        public void addObject(Object obj) {
+            ReferenceCountedObject ref = new ReferenceCountedObject(obj);
+            synchronized (m_elements) {
+                if (!m_elements.containsKey(obj) && !m_expiring.containsKey(obj))
+                    m_elements.put(obj, ref);
+            }
+        }
+
+        /**
+         * Removes the object from the pool.  If the object is reserved,
+         * waits the specified time before forcibly removing
+         * Todo: check expirations with the next request instead of holding up the current request
+         */
+        public void removeObject(Object obj, long waitTime) {
+            ReferenceCountedObject ref = null;
+            synchronized (m_elements) {
+                ref = (ReferenceCountedObject) m_elements.get(obj);
+                if (ref == null)
+                    return;
+
+                m_elements.remove(obj);
+
+                if (ref.count() == 0)
+                    return;
+                else
+                    // mark the object for expiration
+                    m_expiring.put(obj, ref);
+            }
+
+            // connector is now marked for expiration. wait for the ref count to drop to zero
+            long expiration = System.currentTimeMillis() + waitTime;
+            while (ref.count() > 0) {
+                try {
+                    Thread.sleep(5000);
+                }
+                catch (InterruptedException e) {
+                } // ignore
+                if (System.currentTimeMillis() > expiration)
+                    break;
+            }
+
+            // also clear from the expiring list
+            m_expiring.remove(obj);
+        }
+
+        public void removeObject(Object obj) {
+            removeObject(obj, DEFAULT_WAIT_FOR_SHUTDOWN);
+        }
+
+        /**
+         * Marks the connector as in use by incrementing the connector's reference count
+         */
+        public void reserve(Object obj) throws Exception {
+            synchronized (m_elements) {
+                if (m_expiring.containsKey(obj))
+                    throw new Exception("resourceUnavailable");
+
+                ReferenceCountedObject ref = (ReferenceCountedObject) m_elements.get(obj);
+                ref.increment();
+            }
+        }
+
+        /**
+         * Decrements the connector's reference count
+         */
+        public void release(Object obj) {
+            synchronized (m_elements) {
+                ReferenceCountedObject ref = (ReferenceCountedObject) m_elements.get(obj);
+                ref.decrement();
+            }
+        }
+
+        public synchronized java.util.Set getElements() {
+            return m_elements.keySet();
+        }
+
+        public synchronized int size() {
+            return m_elements.size();
+        }
+
+        /**
+         * Wrapper to track the use count of an object
+         */
+        public class ReferenceCountedObject {
+            private Object m_object;
+            private int m_refCount;
+
+            public ReferenceCountedObject(Object obj) {
+                m_object = obj;
+                m_refCount = 0;
+            }
+
+            public synchronized void increment() {
+                m_refCount++;
+            }
+
+            public synchronized void decrement() {
+                if (m_refCount > 0)
+                    m_refCount--;
+            }
+
+            public synchronized int count() {
+                return m_refCount;
+            }
+        }
+    }
+}
\ No newline at end of file

Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConstants.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConstants.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConstants.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConstants.java Mon Dec  5 13:38:30 2005
@@ -0,0 +1,263 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.Session;
+
+/**
+ * JMSConstants contains constants that apply to all JMS providers.
+ * <p/>
+ * <code>JMSConstants</code> contains the constant definitions for
+ * interacting with the WSClient.  The most important constants are the
+ * <code>HashMap</code> keys for use in the arguments to the
+ * <code>send, call, registerListener, unregisterListener</code> methods of
+ * <code>JMSEndpoint</code> and the <code>createConnector</code> method of
+ * <code>JMSConnectorFactory</code>.
+ */
+
+public interface JMSConstants {
+    public final static String PROTOCOL = "jms";
+
+    // abbreviated version of all constants (see below for description of each constant)
+    // the short name is used in the JMS URL. the full name is used in the Axis call.
+    final static String _WAIT_FOR_RESPONSE = "waitForResponse";
+    final static String _CLIENT_ID = "clientID";
+    final static String _VENDOR = "vendor";
+    final static String _DOMAIN = "domain";
+    final static String _JMS_CORRELATION_ID = "jmsCorrelationID";
+    final static String _JMS_CORRELATION_ID_AS_BYTES = "jmsCorrelationIDAsBytes";
+    final static String _JMS_TYPE = "jmsType";
+    final static String _TIME_TO_LIVE = "ttl";
+    final static String _PRIORITY = "priority";
+    final static String _DELIVERY_MODE = "deliveryMode";
+    final static String _MESSAGE_SELECTOR = "messageSelector";
+    final static String _ACKNOWLEDGE_MODE = "acknowledgeMode";
+    final static String _SUBSCRIPTION_NAME = "subscriptionName";
+    final static String _UNSUBSCRIBE = "unsubscribe";
+    final static String _NO_LOCAL = "noLocal";
+    final static String _NUM_RETRIES = "numRetries";
+    final static String _NUM_SESSIONS = "numSessions";
+    final static String _CONNECT_RETRY_INTERVAL = "connectRetryInterval";
+    final static String _INTERACT_RETRY_INTERVAL = "interactRetryInterval";
+    final static String _TIMEOUT_TIME = "timeoutTime";
+    final static String _MIN_TIMEOUT_TIME = "minTimeoutTime";
+    /**
+     * Defines a prefix added to each application-specific property in the
+     * JMS URL that should be added to the JMS Message when issued.
+     */
+    final static String _MSG_PROP_PREFIX = "msgProp.";
+
+    public static String JMS_PROPERTY_PREFIX = "transport.jms.";
+
+    /**
+     * This is used as a key in the Call properties telling the JMS transport
+     * to wait for a response from the service.  The default value is true.
+     * If false is specified, the message will be delivered without specifying
+     * a ReplyTo.  The client will always return null from invoke unless
+     * a client-side exception is thrown (similar to invokeOneWay in semantics)
+     * The value must be a <code>java.lang.Boolean</code>.
+     * See the javax.jms javadoc for information on this property.
+     */
+    final static String WAIT_FOR_RESPONSE = JMS_PROPERTY_PREFIX + _WAIT_FOR_RESPONSE;
+
+    /**
+     * <code>JMSConnectorFactory</code> parameter valid for either domain.  This should
+     * be used as a key in the environment map passed into calls to
+     * <code>createConnector</code> in <code>JMSConnectorFactory</code>
+     * This is a required property for durable subscribers.
+     * The value must be a <code>java.lang.String</code>.
+     * See the javax.jms javadoc for information on this property.
+     */
+    final static String CLIENT_ID = JMS_PROPERTY_PREFIX + _CLIENT_ID;
+
+    // there is no short version
+    final static String DESTINATION = JMS_PROPERTY_PREFIX + "Destination";
+
+    final static String VENDOR = JMS_PROPERTY_PREFIX + _VENDOR;
+    public final static String JNDI_VENDOR_ID = "JNDI";
+
+    final static String DOMAIN = JMS_PROPERTY_PREFIX + _DOMAIN;
+
+    final static String DOMAIN_QUEUE = "QUEUE";
+    final static String DOMAIN_TOPIC = "TOPIC";
+    final static String DOMAIN_DEFAULT = DOMAIN_QUEUE;
+
+    /**
+     * Key for properties used in the <code>send</code> and <code>call</code>
+     * methods.  It is valid for either domain.
+     * The value must be a <code>java.lang.String</code>.
+     * See the javax.jms javadoc for information on this property.
+     */
+    final static String JMS_CORRELATION_ID = JMS_PROPERTY_PREFIX + _JMS_CORRELATION_ID;
+    /**
+     * Key for properties used in the <code>send</code> and <code>call</code>
+     * methods.  It is valid for either domain.
+     * The value must be a <code>byte[]</code>.
+     * See the javax.jms javadoc for information on this property.
+     */
+    final static String JMS_CORRELATION_ID_AS_BYTES = JMS_PROPERTY_PREFIX + _JMS_CORRELATION_ID_AS_BYTES;
+    /**
+     * Key for properties used in the <code>send</code> and <code>call</code>
+     * methods.  It is valid for either domain.
+     * The value must be a <code>java.lang.String</code>.
+     * See the javax.jms javadoc for information on this property.
+     */
+    final static String JMS_TYPE = JMS_PROPERTY_PREFIX + _JMS_TYPE;
+    /**
+     * Key for properties used in the <code>send</code> and <code>call</code>
+     * methods.  It is valid for either domain.
+     * The value must be a <code>java.lang.Long</code>.
+     * See the javax.jms javadoc for information on this property.
+     */
+    final static String TIME_TO_LIVE = JMS_PROPERTY_PREFIX + _TIME_TO_LIVE;
+    /**
+     * Key for properties used in the <code>send</code> and <code>call</code>
+     * methods.  It is valid for either domain.
+     * The value must be a <code>java.lang.Integer</code>.
+     * See the javax.jms javadoc for information on this property.
+     */
+    final static String PRIORITY = JMS_PROPERTY_PREFIX + _PRIORITY;
+    /**
+     * Key for properties used in the <code>send</code> and <code>call</code>
+     * methods.  It is valid for either domain.
+     * The value must be a <code>java.lang.Integer</code> equal to
+     * DeliveryMode.NON_PERSISTENT or DeliveryMode.PERSISTENT.
+     * See the javax.jms javadoc for information on this property.
+     */
+    final static String DELIVERY_MODE = JMS_PROPERTY_PREFIX + _DELIVERY_MODE;
+
+    final static String DELIVERY_MODE_PERSISTENT = "Persistent";
+    final static String DELIVERY_MODE_NONPERSISTENT = "Nonpersistent";
+    final static String DELIVERY_MODE_DISCARDABLE = "Discardable";
+    final static int DEFAULT_DELIVERY_MODE = DeliveryMode.NON_PERSISTENT;
+
+    final static int DEFAULT_PRIORITY = Message.DEFAULT_PRIORITY;
+    final static long DEFAULT_TIME_TO_LIVE = Message.DEFAULT_TIME_TO_LIVE;
+
+    /**
+     * Key for properties used in the <code>registerListener</code>
+     * method.  It is valid for either domain.
+     * The value must be a <code>java.lang.String</code>.
+     * See the javax.jms javadoc for information on this property.
+     */
+    final static String MESSAGE_SELECTOR = JMS_PROPERTY_PREFIX + _MESSAGE_SELECTOR;
+    /**
+     * Key for properties used in the <code>registerListener</code>
+     * method.  It is valid for either domain.
+     * The value must be a <code>java.lang.Integer</code> that is one of
+     * Session.AUTO_ACKNOWLEDGE, Session.DUPS_OK_ACKNOWLEDGE,
+     * or Session.CLIENT_ACKNOWLEDGE.
+     * See the javax.jms javadoc for information on this property.
+     */
+    final static String ACKNOWLEDGE_MODE = JMS_PROPERTY_PREFIX + _ACKNOWLEDGE_MODE;
+
+    /**
+     * value for ACKNOWLEDGE_MODE if left unset.  It is equal to
+     * Session.DUPS_OK_ACKNOWLEDGE.
+     */
+    final static int DEFAULT_ACKNOWLEDGE_MODE = Session.DUPS_OK_ACKNOWLEDGE;
+
+    /**
+     * Specifies the name of a durable subscription
+     * Key for properties used in the <code>registerListener</code>
+     * method.  It is valid for the PubSub domain.
+     * The value must be a <code>java.lang.String</code>.
+     */
+    final static String SUBSCRIPTION_NAME = JMS_PROPERTY_PREFIX + _SUBSCRIPTION_NAME;
+    /**
+     * Key for properties used in the <code>registerListener</code>
+     * method.  It is valid for the PubSub domain.
+     * Specifies that the durable subscription should be unsubscribed
+     * (deleted from the broker) when unregistered.
+     * The value must be a <code>java.lang.Boolean</code>.
+     */
+    final static String UNSUBSCRIBE = JMS_PROPERTY_PREFIX + _UNSUBSCRIBE;
+    /**
+     * Key for properties used in the <code>registerListener</code>
+     * method.  It is valid for the PubSub domain.
+     * The value must be a <code>java.lang.Boolean</code>.
+     */
+    final static String NO_LOCAL = JMS_PROPERTY_PREFIX + _NO_LOCAL;
+
+    final static boolean DEFAULT_NO_LOCAL = false;
+    final static boolean DEFAULT_UNSUBSCRIBE = false;
+
+    /**
+     * Key for properties used in the <code>createConnector</code>
+     * method.  It changes the behavior of the wsclient.
+     * The value must be a <code>java.lang.Integer</code>.
+     */
+    final static String NUM_RETRIES = JMS_PROPERTY_PREFIX + _NUM_RETRIES;
+    /**
+     * Key for properties used in the <code>createConnector</code>
+     * method.  It changes the behavior of the wsclient.
+     * The value must be a <code>java.lang.Integer</code>.
+     */
+    final static String NUM_SESSIONS = JMS_PROPERTY_PREFIX + _NUM_SESSIONS;
+    /**
+     * Key for properties used in the <code>createConnector</code>
+     * method.  It changes the behavior of the wsclient.
+     * The value must be a <code>java.lang.Long</code>.
+     */
+    final static String CONNECT_RETRY_INTERVAL = JMS_PROPERTY_PREFIX + _CONNECT_RETRY_INTERVAL;
+    /**
+     * Key for properties used in the <code>createConnector</code>
+     * method.  It changes the behavior of the wsclient.
+     * The value must be a <code>java.lang.Long</code>.
+     */
+    final static String INTERACT_RETRY_INTERVAL = JMS_PROPERTY_PREFIX + _INTERACT_RETRY_INTERVAL;
+    /**
+     * Key for properties used in the <code>createConnector</code>
+     * method.  It changes the behavior of the wsclient.
+     * The value must be a <code>java.lang.Long</code>.
+     */
+    final static String TIMEOUT_TIME = JMS_PROPERTY_PREFIX + _TIMEOUT_TIME;
+    /**
+     * Key for properties used in the <code>createConnector</code>
+     * method.  It changes the behavior of the wsclient.
+     * The value must be a <code>java.lang.Long</code>.
+     */
+    final static String MIN_TIMEOUT_TIME = JMS_PROPERTY_PREFIX + _MIN_TIMEOUT_TIME;
+
+    final static int DEFAULT_NUM_RETRIES = 5;
+    final static int DEFAULT_NUM_SESSIONS = 5;
+
+    final static long DEFAULT_CONNECT_RETRY_INTERVAL = 2000;
+    final static long DEFAULT_TIMEOUT_TIME = 5000;
+    final static long DEFAULT_MIN_TIMEOUT_TIME = 1000;
+    final static long DEFAULT_INTERACT_RETRY_INTERVAL = 250;
+
+    // key used to store the JMS connector in the message context
+    final static String CONNECTOR = JMS_PROPERTY_PREFIX + "Connector";
+
+    // key used to store the JMS vendor adapter in the message context
+    final static String VENDOR_ADAPTER = JMS_PROPERTY_PREFIX + "VendorAdapter";
+
+    // key used to store the JMS URL string in the message context
+    final static String JMS_URL = JMS_PROPERTY_PREFIX + "EndpointAddress";
+
+    /**
+     * A property that carries a Map of application-specific properties to be
+     * added to the JMS messages when issued.
+     */
+    final static String JMS_APPLICATION_MSG_PROPS =
+            JMS_PROPERTY_PREFIX + "msgProps";
+
+    final static String ADAPTER_POSTFIX = "VendorAdapter";
+}
\ No newline at end of file