You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by di...@apache.org on 2005/12/16 18:18:08 UTC

svn commit: r357187 [19/25] - in /webservices/axis2/trunk/java/modules/core/src/org/apache/axis2: ./ addressing/ client/ client/async/ context/ deployment/ deployment/listener/ deployment/repository/util/ deployment/scheduler/ deployment/util/ descript...

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/http/server/TransparentProxyRequestHandler.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/http/server/TransparentProxyRequestHandler.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/http/server/TransparentProxyRequestHandler.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/http/server/TransparentProxyRequestHandler.java Fri Dec 16 09:13:57 2005
@@ -1,31 +1,32 @@
 /*
- * $Header: /home/jerenkrantz/tmp/commons/commons-convert/cvs/home/cvs/jakarta-commons//httpclient/src/test/org/apache/commons/httpclient/server/TransparentProxyRequestHandler.java,v 1.7 2004/12/11 22:35:26 olegk Exp $
- * $Revision: 155418 $
- * $Date: 2005-02-26 08:01:52 -0500 (Sat, 26 Feb 2005) $
- *
- * ====================================================================
- *
- *  Copyright 1999-2004 The Apache Software Foundation
- *
- *  Licensed under the Apache License, Version 2.0 (the "License");
- *  you may not use this file except in compliance with the License.
- *  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- * ====================================================================
- *
- * This software consists of voluntary contributions made by many
- * individuals on behalf of the Apache Software Foundation.  For more
- * information on the Apache Software Foundation, please see
- * <http://www.apache.org/>.
- *
- */
+* $Header: /home/jerenkrantz/tmp/commons/commons-convert/cvs/home/cvs/jakarta-commons//httpclient/src/test/org/apache/commons/httpclient/server/TransparentProxyRequestHandler.java,v 1.7 2004/12/11 22:35:26 olegk Exp $
+* $Revision: 155418 $
+* $Date: 2005-02-26 08:01:52 -0500 (Sat, 26 Feb 2005) $
+*
+* ====================================================================
+*
+*  Copyright 1999-2004 The Apache Software Foundation
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License.
+* ====================================================================
+*
+* This software consists of voluntary contributions made by many
+* individuals on behalf of the Apache Software Foundation.  For more
+* information on the Apache Software Foundation, please see
+* <http://www.apache.org/>.
+*
+*/
+
 
 package org.apache.axis2.transport.http.server;
 
@@ -44,47 +45,74 @@
  * other HTTP methods.
  */
 public class TransparentProxyRequestHandler implements HttpRequestHandler {
+    private static Socket connect(final String host) throws IOException {
+        String hostname = null;
+        int port;
+        int i = host.indexOf(':');
+
+        if (i != -1) {
+            hostname = host.substring(0, i);
+
+            try {
+                port = Integer.parseInt(host.substring(i + 1));
+            } catch (NumberFormatException ex) {
+                throw new IOException("Invalid host address: " + host);
+            }
+        } else {
+            hostname = host;
+            port = 80;
+        }
+
+        return new Socket(hostname, port);
+    }
 
     /*
      * (non-Javadoc)
-     * 
+     *
      * @see org.apache.commons.httpclient.server.HttpRequestHandler#processRequest(org.apache.commons.httpclient.server.SimpleHttpServerConnection)
      */
-    public boolean processRequest(
-        final SimpleHttpServerConnection conn,
-        final SimpleRequest request) throws IOException
-    {
-
+    public boolean processRequest(final SimpleHttpServerConnection conn,
+                                  final SimpleRequest request)
+            throws IOException {
         RequestLine line = request.getRequestLine();
         HttpVersion ver = line.getHttpVersion();
         String method = line.getMethod();
+
         if (!"CONNECT".equalsIgnoreCase(method)) {
             return false;
         }
+
         Socket targetSocket = null;
+
         try {
             targetSocket = connect(line.getUri());
         } catch (IOException e) {
             SimpleResponse response = new SimpleResponse();
+
             response.setStatusLine(ver, HttpStatus.SC_NOT_FOUND);
             response.setHeader(new Header("Server", "test proxy"));
             response.setBodyString("Cannot connect to " + line.getUri());
             conn.writeResponse(response);
+
             return true;
         }
+
         SimpleResponse response = new SimpleResponse();
+
         response.setHeader(new Header("Server", "test proxy"));
         response.setStatusLine(ver, HttpStatus.SC_OK, "Connection established");
         conn.writeResponse(response);
-        
-        SimpleHttpServerConnection target = new SimpleHttpServerConnection(targetSocket); 
+
+        SimpleHttpServerConnection target = new SimpleHttpServerConnection(targetSocket);
+
         pump(conn, target);
+
         return true;
     }
 
-    private void pump(final SimpleHttpServerConnection source, final SimpleHttpServerConnection target)
-        throws IOException {
-
+    private void pump(final SimpleHttpServerConnection source,
+                      final SimpleHttpServerConnection target)
+            throws IOException {
         source.setSocketTimeout(100);
         target.setSocketTimeout(100);
 
@@ -92,29 +120,35 @@
         OutputStream sourceOut = source.getOutputStream();
         InputStream targetIn = target.getInputStream();
         OutputStream targetOut = target.getOutputStream();
-        
-        byte[] tmp = new byte[1024];
+        byte[]       tmp = new byte[1024];
         int l;
-        for (;;) {
-            if (!source.isOpen() || !target.isOpen()) { 
+
+        for (; ;) {
+            if (!source.isOpen() || !target.isOpen()) {
                 break;
             }
+
             try {
                 l = sourceIn.read(tmp);
+
                 if (l == -1) {
                     break;
                 }
+
                 targetOut.write(tmp, 0, l);
             } catch (InterruptedIOException ignore) {
                 if (Thread.interrupted()) {
                     break;
                 }
             }
+
             try {
                 l = targetIn.read(tmp);
+
                 if (l == -1) {
                     break;
                 }
+
                 sourceOut.write(tmp, 0, l);
             } catch (InterruptedIOException ignore) {
                 if (Thread.interrupted()) {
@@ -123,23 +157,4 @@
             }
         }
     }
-    
-    private static Socket connect(final String host) throws IOException {
-        String hostname = null; 
-        int port; 
-        int i = host.indexOf(':');
-        if (i != -1) {
-            hostname = host.substring(0, i);
-            try {
-                port = Integer.parseInt(host.substring(i + 1));
-            } catch (NumberFormatException ex) {
-                throw new IOException("Invalid host address: " + host);
-            }
-        } else {
-            hostname = host;
-            port = 80;
-        }
-        return new Socket(hostname, port);        
-    }
-    
-}
\ No newline at end of file
+}

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/BeanVendorAdapter.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
 /*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 
 package org.apache.axis2.transport.jms;
 
@@ -30,51 +31,58 @@
  * the input sessions to create destinations.
  */
 public abstract class BeanVendorAdapter extends JMSVendorAdapter {
-    protected final static String CONNECTION_FACTORY_CLASS =
-            "transport.jms.ConnectionFactoryClass";
-
-    public QueueConnectionFactory getQueueConnectionFactory(HashMap cfConfig)
-            throws Exception {
-        return (QueueConnectionFactory) getConnectionFactory(cfConfig);
-    }
+    protected final static String CONNECTION_FACTORY_CLASS = "transport.jms.ConnectionFactoryClass";
 
-    public TopicConnectionFactory getTopicConnectionFactory(HashMap cfConfig)
-            throws Exception {
-        return (TopicConnectionFactory) getConnectionFactory(cfConfig);
-    }
-
-    private ConnectionFactory getConnectionFactory(HashMap cfConfig)
-            throws Exception {
-        String classname = (String) cfConfig.get(CONNECTION_FACTORY_CLASS);
-        if (classname == null || classname.trim().length() == 0)
-            throw new IllegalArgumentException("noCFClass");
-
-        Class factoryClass = Loader.loadClass(classname);
-        ConnectionFactory factory = (ConnectionFactory) factoryClass.newInstance();
-        callSetters(cfConfig, factoryClass, factory);
-        return factory;
-    }
-
-    private void callSetters(HashMap cfConfig,
-                             Class factoryClass,
-                             ConnectionFactory factory)
+    private void callSetters(HashMap cfConfig, Class factoryClass, ConnectionFactory factory)
             throws Exception {
         PropertyDescriptor[] bpd = Introspector.getBeanInfo(factoryClass).getPropertyDescriptors();
+
         for (int i = 0; i < bpd.length; i++) {
             PropertyDescriptor thisBPD = bpd[i];
             String propName = thisBPD.getName();
+
             if (cfConfig.containsKey(propName)) {
                 Object value = cfConfig.get(propName);
-                if (value == null)
+
+                if (value == null) {
                     continue;
+                }
 
                 String validType = thisBPD.getName();
-                if (!value.getClass().getName().equals(validType))
+
+                if (!value.getClass().getName().equals(validType)) {
                     throw new IllegalArgumentException("badType");
-                if (thisBPD.getWriteMethod() == null)
+                }
+
+                if (thisBPD.getWriteMethod() == null) {
                     throw new IllegalArgumentException("notWriteable");
+                }
+
                 thisBPD.getWriteMethod().invoke(factory, new Object[]{value});
             }
         }
     }
-}
\ No newline at end of file
+
+    private ConnectionFactory getConnectionFactory(HashMap cfConfig) throws Exception {
+        String classname = (String) cfConfig.get(CONNECTION_FACTORY_CLASS);
+
+        if ((classname == null) || (classname.trim().length() == 0)) {
+            throw new IllegalArgumentException("noCFClass");
+        }
+
+        Class factoryClass = Loader.loadClass(classname);
+        ConnectionFactory factory = (ConnectionFactory) factoryClass.newInstance();
+
+        callSetters(cfConfig, factoryClass, factory);
+
+        return factory;
+    }
+
+    public QueueConnectionFactory getQueueConnectionFactory(HashMap cfConfig) throws Exception {
+        return (QueueConnectionFactory) getConnectionFactory(cfConfig);
+    }
+
+    public TopicConnectionFactory getTopicConnectionFactory(HashMap cfConfig) throws Exception {
+        return (TopicConnectionFactory) getConnectionFactory(cfConfig);
+    }
+}

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeException.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
 /*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 
 package org.apache.axis2.transport.jms;
 

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/InvokeTimeoutException.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
 /*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 
 package org.apache.axis2.transport.jms;
 

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/JMSConnector.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
 /*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 
 package org.apache.axis2.transport.jms;
 
@@ -39,30 +40,22 @@
  * failures.
  */
 public abstract class JMSConnector {
-    protected int m_numRetries;
+    protected JMSVendorAdapter m_adapter;
+    protected boolean m_allowReceive;
     protected long m_connectRetryInterval;
     protected long m_interactRetryInterval;
-    protected long m_timeoutTime;
+    protected JMSURLHelper m_jmsurl;
+    protected int m_numRetries;
+    protected int m_numSessions;
     protected long m_poolTimeout;
     protected AsyncConnection m_receiveConnection;
     protected SyncConnection m_sendConnection;
-    protected int m_numSessions;
-    protected boolean m_allowReceive;
-    protected JMSVendorAdapter m_adapter;
-    protected JMSURLHelper m_jmsurl;
+    protected long m_timeoutTime;
 
-    public JMSConnector(ConnectionFactory connectionFactory,
-                        int numRetries,
-                        int numSessions,
-                        long connectRetryInterval,
-                        long interactRetryInterval,
-                        long timeoutTime,
-                        boolean allowReceive,
-                        String clientID,
-                        String username,
-                        String password,
-                        JMSVendorAdapter adapter,
-                        JMSURLHelper jmsurl)
+    public JMSConnector(ConnectionFactory connectionFactory, int numRetries, int numSessions,
+                        long connectRetryInterval, long interactRetryInterval, long timeoutTime,
+                        boolean allowReceive, String clientID, String username, String password,
+                        JMSVendorAdapter adapter, JMSURLHelper jmsurl)
             throws JMSException {
         m_numRetries = numRetries;
         m_connectRetryInterval = connectRetryInterval;
@@ -77,288 +70,368 @@
         // try to connect initially so we can fail fast
         // in the case of irrecoverable errors.
         // If we fail in a recoverable fashion we will retry
-        javax.jms.Connection sendConnection = createConnectionWithRetry(
-                connectionFactory,
-                username,
-                password);
-        m_sendConnection = createSyncConnection(connectionFactory, sendConnection,
-                m_numSessions, "SendThread",
-                clientID,
-                username,
-                password);
+        javax.jms.Connection sendConnection = createConnectionWithRetry(connectionFactory,
+                username, password);
 
+        m_sendConnection = createSyncConnection(connectionFactory, sendConnection, m_numSessions,
+                "SendThread", clientID, username, password);
         m_sendConnection.start();
 
         if (m_allowReceive) {
-            javax.jms.Connection receiveConnection = createConnectionWithRetry(
-                    connectionFactory,
-                    username,
-                    password);
-            m_receiveConnection = createAsyncConnection(connectionFactory,
-                    receiveConnection,
-                    "ReceiveThread",
-                    clientID,
-                    username,
-                    password);
+            javax.jms.Connection receiveConnection = createConnectionWithRetry(connectionFactory,
+                    username, password);
+
+            m_receiveConnection = createAsyncConnection(connectionFactory, receiveConnection,
+                    "ReceiveThread", clientID, username, password);
             m_receiveConnection.start();
         }
     }
 
-    public int getNumRetries() {
-        return m_numRetries;
-    }
-
-    public int numSessions() {
-        return m_numSessions;
-    }
-
-    public ConnectionFactory getConnectionFactory() {
-        // there is always a send connection
-        return getSendConnection().getConnectionFactory();
-    }
-
-    public String getClientID() {
-        return getSendConnection().getClientID();
-    }
-
-    public String getUsername() {
-        return getSendConnection().getUsername();
-    }
-
-    public String getPassword() {
-        return getSendConnection().getPassword();
-    }
-
-    public JMSVendorAdapter getVendorAdapter() {
-        return m_adapter;
-    }
-
-    public JMSURLHelper getJMSURL() {
-        return m_jmsurl;
-    }
+    protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,
+                                                             javax.jms.Connection connection, String threadName, String clientID, String username,
+                                                             String password)
+            throws JMSException;
 
-    protected javax.jms.Connection createConnectionWithRetry(
-            ConnectionFactory connectionFactory,
-            String username,
-            String password)
+    protected javax.jms.Connection createConnectionWithRetry(ConnectionFactory connectionFactory,
+                                                             String username, String password)
             throws JMSException {
         javax.jms.Connection connection = null;
+
         for (int numTries = 1; connection == null; numTries++) {
             try {
                 connection = internalConnect(connectionFactory, username, password);
-            }
-            catch (JMSException jmse) {
-                if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION) || numTries == m_numRetries)
+            } catch (JMSException jmse) {
+                if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.CONNECT_ACTION)
+                        || (numTries == m_numRetries)) {
                     throw jmse;
-                else
+                } else {
                     try {
                         Thread.sleep(m_connectRetryInterval);
                     } catch (InterruptedException ie) {
                     }
+                }
                 ;
             }
         }
+
         return connection;
     }
 
-    public void stop() {
-        JMSConnectorManager.getInstance().removeConnectorFromPool(this);
+    public abstract JMSEndpoint createEndpoint(Destination destination) throws JMSException;
 
-        m_sendConnection.stopConnection();
-        if (m_allowReceive)
-            m_receiveConnection.stopConnection();
-    }
+    public abstract JMSEndpoint createEndpoint(String destinationName) throws JMSException;
 
-    public void start() {
-        m_sendConnection.startConnection();
-        if (m_allowReceive)
-            m_receiveConnection.startConnection();
+    protected abstract SyncConnection createSyncConnection(ConnectionFactory factory,
+                                                           javax.jms.Connection connection, int numSessions, String threadName, String clientID,
+                                                           String username, String password)
+            throws JMSException;
 
-        JMSConnectorManager.getInstance().addConnectorToPool(this);
+    protected abstract javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
+                                                            String username, String password)
+            throws JMSException;
+
+    public int numSessions() {
+        return m_numSessions;
     }
 
     public void shutdown() {
         m_sendConnection.shutdown();
-        if (m_allowReceive)
+
+        if (m_allowReceive) {
             m_receiveConnection.shutdown();
+        }
     }
 
-    public abstract JMSEndpoint createEndpoint(String destinationName)
-            throws JMSException;
+    public void start() {
+        m_sendConnection.startConnection();
 
-    public abstract JMSEndpoint createEndpoint(Destination destination)
-            throws JMSException;
+        if (m_allowReceive) {
+            m_receiveConnection.startConnection();
+        }
 
+        JMSConnectorManager.getInstance().addConnectorToPool(this);
+    }
 
-    protected abstract javax.jms.Connection internalConnect(
-            ConnectionFactory connectionFactory,
-            String username,
-            String password)
-            throws JMSException;
+    public void stop() {
+        JMSConnectorManager.getInstance().removeConnectorFromPool(this);
+        m_sendConnection.stopConnection();
 
-    private abstract class Connection extends Thread implements ExceptionListener {
-        private ConnectionFactory m_connectionFactory;
-        protected javax.jms.Connection m_connection;
+        if (m_allowReceive) {
+            m_receiveConnection.stopConnection();
+        }
+    }
 
-        protected boolean m_isActive;
-        private boolean m_needsToConnect;
-        private boolean m_startConnection;
-        private String m_clientID;
-        private String m_username;
-        private String m_password;
+    public String getClientID() {
+        return getSendConnection().getClientID();
+    }
 
-        private Object m_jmsLock;
-        private Object m_lifecycleLock;
+    public ConnectionFactory getConnectionFactory() {
 
-        protected Connection(ConnectionFactory connectionFactory,
-                             javax.jms.Connection connection,
-                             String threadName,
-                             String clientID,
-                             String username,
-                             String password)
-                throws JMSException {
-            super(threadName);
-            m_connectionFactory = connectionFactory;
+        // there is always a send connection
+        return getSendConnection().getConnectionFactory();
+    }
 
-            m_clientID = clientID;
-            m_username = username;
-            m_password = password;
+    public JMSURLHelper getJMSURL() {
+        return m_jmsurl;
+    }
 
-            m_jmsLock = new Object();
-            m_lifecycleLock = new Object();
+    public int getNumRetries() {
+        return m_numRetries;
+    }
 
-            if (connection != null) {
-                m_needsToConnect = false;
-                m_connection = connection;
-                m_connection.setExceptionListener(this);
-                if (m_clientID != null)
-                    m_connection.setClientID(m_clientID);
-            } else {
-                m_needsToConnect = true;
-            }
+    public String getPassword() {
+        return getSendConnection().getPassword();
+    }
 
-            m_isActive = true;
-        }
+    AsyncConnection getReceiveConnection() {
+        return m_receiveConnection;
+    }
 
-        public ConnectionFactory getConnectionFactory() {
-            return m_connectionFactory;
-        }
+    SyncConnection getSendConnection() {
+        return m_sendConnection;
+    }
 
-        public String getClientID() {
-            return m_clientID;
-        }
+    public String getUsername() {
+        return getSendConnection().getUsername();
+    }
 
-        public String getUsername() {
-            return m_username;
-        }
+    public JMSVendorAdapter getVendorAdapter() {
+        return m_adapter;
+    }
 
-        public String getPassword() {
-            return m_password;
+    protected abstract class AsyncConnection extends Connection {
+        Object m_subscriptionLock;
+        HashMap m_subscriptions;
+
+        protected AsyncConnection(ConnectionFactory connectionFactory,
+                                  javax.jms.Connection connection, String threadName,
+                                  String clientID, String username, String password)
+                throws JMSException {
+            super(connectionFactory, connection, threadName, clientID, username, password);
+            m_subscriptions = new HashMap();
+            m_subscriptionLock = new Object();
         }
 
-        /**
-         * @todo handle non-recoverable errors
-         */
+        protected abstract ListenerSession createListenerSession(javax.jms.Connection connection,
+                                                                 Subscription subscription)
+                throws Exception;
 
-        public void run() {
-            // loop until a connection is made and when a connection is made (re)establish
-            // any subscriptions
-            while (m_isActive) {
-                if (m_needsToConnect) {
-                    m_connection = null;
-                    try {
-                        m_connection = internalConnect(m_connectionFactory,
-                                m_username, m_password);
-                        m_connection.setExceptionListener(this);
-                        if (m_clientID != null)
-                            m_connection.setClientID(m_clientID);
-                    }
-                    catch (JMSException e) {
-                        // simply backoff for a while and then retry
-                        try {
-                            Thread.sleep(m_connectRetryInterval);
-                        } catch (InterruptedException ie) {
-                        }
-                        continue;
-                    }
-                } else
-                    m_needsToConnect = true; // When we'll get to the "if (needsToConnect)" statement the next time it will be because
-                // we lost the connection
+        protected void onConnect() throws Exception {
+            synchronized (m_subscriptionLock) {
+                Iterator subscriptions = m_subscriptions.keySet().iterator();
 
-                // we now have a valid connection so establish some context
-                try {
-                    internalOnConnect();
-                }
-                catch (Exception e) {
-                    // insert code to handle non recoverable errors
-                    // simply retry
-                    continue;
-                }
+                while (subscriptions.hasNext()) {
+                    Subscription subscription = (Subscription) subscriptions.next();
 
-                synchronized (m_jmsLock) {
-                    try {
-                        m_jmsLock.wait();
-                    } catch (InterruptedException ie) {
-                    } // until notified due to some change in status
+                    if (m_subscriptions.get(subscription) == null) {
+                        m_subscriptions.put(subscription,
+                                createListenerSession(m_connection, subscription));
+                    }
                 }
-            }
 
-            // no longer staying connected, so see what we can cleanup
-            internalOnShutdown();
+                m_subscriptionLock.notifyAll();
+            }
         }
 
+        protected void onException() {
+            synchronized (m_subscriptionLock) {
+                Iterator subscriptions = m_subscriptions.keySet().iterator();
 
-        void startConnection() {
-            synchronized (m_lifecycleLock) {
-                if (m_startConnection)
-                    return;
-                m_startConnection = true;
-                try {
-                    m_connection.start();
-                } catch (Throwable e) {
-                } // ignore
-            }
-        }
+                while (subscriptions.hasNext()) {
+                    Subscription subscription = (Subscription) subscriptions.next();
 
-        void stopConnection() {
-            synchronized (m_lifecycleLock) {
-                if (!m_startConnection)
-                    return;
-                m_startConnection = false;
-                try {
-                    m_connection.stop();
-                } catch (Throwable e) {
-                } // ignore
+                    m_subscriptions.put(subscription, null);
+                }
             }
         }
 
-        void shutdown() {
-            m_isActive = false;
-            synchronized (m_jmsLock) {
-                m_jmsLock.notifyAll();
+        protected void onShutdown() {
+            synchronized (m_subscriptionLock) {
+                Iterator subscriptions = m_subscriptions.keySet().iterator();
+
+                while (subscriptions.hasNext()) {
+                    Subscription subscription = (Subscription) subscriptions.next();
+                    ListenerSession session =
+                            (ListenerSession) m_subscriptions.get(subscription);
+
+                    if (session != null) {
+                        session.cleanup();
+                    }
+                }
+
+                m_subscriptions.clear();
             }
         }
 
-
-        public void onException(JMSException exception) {
-            if (m_adapter.isRecoverable(exception,
-                    JMSVendorAdapter.ON_EXCEPTION_ACTION))
-                return;
-            onException();
-            synchronized (m_jmsLock) {
-                m_jmsLock.notifyAll();
+        /**
+         * @param subscription
+         * @todo add in security exception propagation
+         */
+        void subscribe(Subscription subscription) throws Exception {
+            long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+
+            synchronized (m_subscriptionLock) {
+                if (m_subscriptions.containsKey(subscription)) {
+                    return;
+                }
+
+                while (true) {
+                    if (System.currentTimeMillis() > timeoutTime) {
+                        throw new InvokeTimeoutException("Cannot subscribe listener");
+                    }
+
+                    try {
+                        ListenerSession session = createListenerSession(m_connection, subscription);
+
+                        m_subscriptions.put(subscription, session);
+
+                        break;
+                    } catch (JMSException jmse) {
+                        if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION)) {
+                            throw jmse;
+                        }
+
+                        try {
+                            m_subscriptionLock.wait(m_interactRetryInterval);
+                        } catch (InterruptedException ignore) {
+                        }
+
+                        // give reconnect a chance
+                        Thread.yield();
+
+                        continue;
+                    } catch (NullPointerException jmse) {
+
+                        // we ARE reconnecting
+                        try {
+                            m_subscriptionLock.wait(m_interactRetryInterval);
+                        } catch (InterruptedException ignore) {
+                        }
+
+                        // give reconnect a chance
+                        Thread.yield();
+
+                        continue;
+                    }
+                }
             }
         }
 
-        private final void internalOnConnect()
-                throws Exception {
+        void unsubscribe(Subscription subscription) {
+            long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+
+            synchronized (m_subscriptionLock) {
+                if (!m_subscriptions.containsKey(subscription)) {
+                    return;
+                }
+
+                while (true) {
+                    if (System.currentTimeMillis() > timeoutTime) {
+                        throw new InvokeTimeoutException("Cannot unsubscribe listener");
+                    }
+
+                    // give reconnect a chance
+                    Thread.yield();
+
+                    try {
+                        ListenerSession session =
+                                (ListenerSession) m_subscriptions.get(subscription);
+
+                        session.cleanup();
+                        m_subscriptions.remove(subscription);
+
+                        break;
+                    } catch (NullPointerException jmse) {
+
+                        // we are reconnecting
+                        try {
+                            m_subscriptionLock.wait(m_interactRetryInterval);
+                        } catch (InterruptedException ignore) {
+                        }
+
+                        continue;
+                    }
+                }
+            }
+        }
+
+        protected class ListenerSession extends ConnectorSession {
+            protected MessageConsumer m_consumer;
+            protected Subscription m_subscription;
+
+            ListenerSession(Session session, MessageConsumer consumer, Subscription subscription)
+                    throws Exception {
+                super(session);
+                m_subscription = subscription;
+                m_consumer = consumer;
+
+                Destination destination = subscription.m_endpoint.getDestination(m_session);
+
+                m_consumer.setMessageListener(subscription.m_listener);
+            }
+
+            void cleanup() {
+                try {
+                    m_consumer.close();
+                } catch (Exception ignore) {
+                }
+
+                try {
+                    m_session.close();
+                } catch (Exception ignore) {
+                }
+            }
+        }
+    }
+
+
+    private abstract class Connection extends Thread implements ExceptionListener {
+        private String m_clientID;
+        protected javax.jms.Connection m_connection;
+        private ConnectionFactory m_connectionFactory;
+        protected boolean m_isActive;
+        private Object m_jmsLock;
+        private Object m_lifecycleLock;
+        private boolean m_needsToConnect;
+        private String m_password;
+        private boolean m_startConnection;
+        private String m_username;
+
+        protected Connection(ConnectionFactory connectionFactory, javax.jms.Connection connection,
+                             String threadName, String clientID, String username, String password)
+                throws JMSException {
+            super(threadName);
+            m_connectionFactory = connectionFactory;
+            m_clientID = clientID;
+            m_username = username;
+            m_password = password;
+            m_jmsLock = new Object();
+            m_lifecycleLock = new Object();
+
+            if (connection != null) {
+                m_needsToConnect = false;
+                m_connection = connection;
+                m_connection.setExceptionListener(this);
+
+                if (m_clientID != null) {
+                    m_connection.setClientID(m_clientID);
+                }
+            } else {
+                m_needsToConnect = true;
+            }
+
+            m_isActive = true;
+        }
+
+        private final void internalOnConnect() throws Exception {
             onConnect();
+
             synchronized (m_lifecycleLock) {
                 if (m_startConnection) {
                     try {
                         m_connection.start();
                     } catch (Throwable e) {
-                    } // ignore
+                    }    // ignore
                 }
             }
         }
@@ -366,162 +439,300 @@
         private final void internalOnShutdown() {
             stopConnection();
             onShutdown();
+
             try {
                 m_connection.close();
             } catch (Throwable e) {
-            } // ignore
+            }    // ignore
         }
 
         protected abstract void onConnect() throws Exception;
 
+        protected abstract void onException();
+
+        public void onException(JMSException exception) {
+            if (m_adapter.isRecoverable(exception, JMSVendorAdapter.ON_EXCEPTION_ACTION)) {
+                return;
+            }
+
+            onException();
+
+            synchronized (m_jmsLock) {
+                m_jmsLock.notifyAll();
+            }
+        }
+
         protected abstract void onShutdown();
 
-        protected abstract void onException();
+        /**
+         * @todo handle non-recoverable errors
+         */
+        public void run() {
+
+            // loop until a connection is made and when a connection is made (re)establish
+            // any subscriptions
+            while (m_isActive) {
+                if (m_needsToConnect) {
+                    m_connection = null;
+
+                    try {
+                        m_connection = internalConnect(m_connectionFactory, m_username, m_password);
+                        m_connection.setExceptionListener(this);
+
+                        if (m_clientID != null) {
+                            m_connection.setClientID(m_clientID);
+                        }
+                    } catch (JMSException e) {
+
+                        // simply backoff for a while and then retry
+                        try {
+                            Thread.sleep(m_connectRetryInterval);
+                        } catch (InterruptedException ie) {
+                        }
+
+                        continue;
+                    }
+                } else {
+                    m_needsToConnect = true;    // When we'll get to the "if (needsToConnect)" statement the next time it will be because
+                }
+
+                // we lost the connection
+
+                // we now have a valid connection so establish some context
+                try {
+                    internalOnConnect();
+                } catch (Exception e) {
+
+                    // insert code to handle non recoverable errors
+                    // simply retry
+                    continue;
+                }
+
+                synchronized (m_jmsLock) {
+                    try {
+                        m_jmsLock.wait();
+                    } catch (InterruptedException ie) {
+                    }    // until notified due to some change in status
+                }
+            }
+
+            // no longer staying connected, so see what we can cleanup
+            internalOnShutdown();
+        }
+
+        void shutdown() {
+            m_isActive = false;
+
+            synchronized (m_jmsLock) {
+                m_jmsLock.notifyAll();
+            }
+        }
+
+        void startConnection() {
+            synchronized (m_lifecycleLock) {
+                if (m_startConnection) {
+                    return;
+                }
+
+                m_startConnection = true;
+
+                try {
+                    m_connection.start();
+                } catch (Throwable e) {
+                }    // ignore
+            }
+        }
+
+        void stopConnection() {
+            synchronized (m_lifecycleLock) {
+                if (!m_startConnection) {
+                    return;
+                }
+
+                m_startConnection = false;
+
+                try {
+                    m_connection.stop();
+                } catch (Throwable e) {
+                }    // ignore
+            }
+        }
+
+        public String getClientID() {
+            return m_clientID;
+        }
+
+        public ConnectionFactory getConnectionFactory() {
+            return m_connectionFactory;
+        }
+
+        public String getPassword() {
+            return m_password;
+        }
+
+        public String getUsername() {
+            return m_username;
+        }
     }
 
-    protected abstract SyncConnection createSyncConnection(ConnectionFactory factory,
-                                                           javax.jms.Connection connection,
-                                                           int numSessions,
-                                                           String threadName,
-                                                           String clientID,
-                                                           String username,
-                                                           String password)
 
-            throws JMSException;
+    private abstract class ConnectorSession {
+        Session m_session;
 
-    SyncConnection getSendConnection() {
-        return m_sendConnection;
+        ConnectorSession(Session session) throws JMSException {
+            m_session = session;
+        }
     }
 
+
     protected abstract class SyncConnection extends Connection {
-        LinkedList m_senders;
         int m_numSessions;
         Object m_senderLock;
+        LinkedList m_senders;
 
-        SyncConnection(ConnectionFactory connectionFactory,
-                       javax.jms.Connection connection,
-                       int numSessions,
-                       String threadName,
-                       String clientID,
-                       String username,
+        SyncConnection(ConnectionFactory connectionFactory, javax.jms.Connection connection,
+                       int numSessions, String threadName, String clientID, String username,
                        String password)
                 throws JMSException {
-            super(connectionFactory, connection, threadName,
-                    clientID, username, password);
+            super(connectionFactory, connection, threadName, clientID, username, password);
             m_senders = new LinkedList();
             m_numSessions = numSessions;
             m_senderLock = new Object();
         }
 
-        protected abstract SendSession createSendSession(javax.jms.Connection connection)
-                throws JMSException;
-
-        protected void onConnect()
-                throws JMSException {
-            synchronized (m_senderLock) {
-                for (int i = 0; i < m_numSessions; i++) {
-                    m_senders.add(createSendSession(m_connection));
-                }
-                m_senderLock.notifyAll();
-            }
-        }
-
         byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties)
                 throws Exception {
             long timeoutTime = System.currentTimeMillis() + timeout;
+
             while (true) {
                 if (System.currentTimeMillis() > timeoutTime) {
                     throw new InvokeTimeoutException("Unable to complete call in time allotted");
                 }
 
                 SendSession sendSession = null;
+
                 try {
                     sendSession = getSessionFromPool(m_poolTimeout);
-                    byte[] response = sendSession.call(endpoint,
-                            message,
+
+                    byte[] response = sendSession.call(endpoint, message,
                             timeoutTime - System.currentTimeMillis(),
                             properties);
+
                     returnSessionToPool(sendSession);
+
                     if (response == null) {
-                        throw new InvokeTimeoutException("Unable to complete call in time allotted");
+                        throw new InvokeTimeoutException(
+                                "Unable to complete call in time allotted");
                     }
+
                     return response;
-                }
-                catch (JMSException jmse) {
+                } catch (JMSException jmse) {
                     if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
-                        //this we cannot recover from
-                        //but it does not invalidate the session
+
+                        // this we cannot recover from
+                        // but it does not invalidate the session
                         returnSessionToPool(sendSession);
+
                         throw jmse;
                     }
 
-                    //for now we will assume this is a reconnect related issue
-                    //and let the sender be collected
-                    //give the reconnect thread a chance to fill the pool
+                    // for now we will assume this is a reconnect related issue
+                    // and let the sender be collected
+                    // give the reconnect thread a chance to fill the pool
                     Thread.yield();
+
                     continue;
-                }
-                catch (NullPointerException npe) {
+                } catch (NullPointerException npe) {
                     Thread.yield();
+
                     continue;
                 }
             }
         }
 
+        protected abstract SendSession createSendSession(javax.jms.Connection connection)
+                throws JMSException;
+
+        protected void onConnect() throws JMSException {
+            synchronized (m_senderLock) {
+                for (int i = 0; i < m_numSessions; i++) {
+                    m_senders.add(createSendSession(m_connection));
+                }
+
+                m_senderLock.notifyAll();
+            }
+        }
+
+        protected void onException() {
+            synchronized (m_senderLock) {
+                m_senders.clear();
+            }
+        }
+
+        protected void onShutdown() {
+            synchronized (m_senderLock) {
+                Iterator senders = m_senders.iterator();
+
+                while (senders.hasNext()) {
+                    SendSession session = (SendSession) senders.next();
+
+                    session.cleanup();
+                }
+
+                m_senders.clear();
+            }
+        }
+
+        private void returnSessionToPool(SendSession sendSession) {
+            synchronized (m_senderLock) {
+                m_senders.addLast(sendSession);
+                m_senderLock.notifyAll();
+            }
+        }
+
         /**
          * @todo add in handling for security exceptions
          * @todo add support for timeouts
          */
-        void send(JMSEndpoint endpoint, byte[] message, HashMap properties)
-                throws Exception {
+        void send(JMSEndpoint endpoint, byte[] message, HashMap properties) throws Exception {
             long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
+
             while (true) {
                 if (System.currentTimeMillis() > timeoutTime) {
                     throw new InvokeTimeoutException("Cannot complete send in time allotted");
                 }
 
                 SendSession sendSession = null;
+
                 try {
                     sendSession = getSessionFromPool(m_poolTimeout);
                     sendSession.send(endpoint, message, properties);
                     returnSessionToPool(sendSession);
-                }
-                catch (JMSException jmse) {
+                } catch (JMSException jmse) {
                     if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SEND_ACTION)) {
-                        //this we cannot recover from
-                        //but it does not invalidate the session
+
+                        // this we cannot recover from
+                        // but it does not invalidate the session
                         returnSessionToPool(sendSession);
+
                         throw jmse;
                     }
-                    //for now we will assume this is a reconnect related issue
-                    //and let the sender be collected
-                    //give the reconnect thread a chance to fill the pool
-                    Thread.yield();
-                    continue;
-                }
-                catch (NullPointerException npe) {
-                    //give the reconnect thread a chance to fill the pool
+
+                    // for now we will assume this is a reconnect related issue
+                    // and let the sender be collected
+                    // give the reconnect thread a chance to fill the pool
                     Thread.yield();
+
                     continue;
-                }
-                break;
-            }
-        }
+                } catch (NullPointerException npe) {
 
-        protected void onException() {
-            synchronized (m_senderLock) {
-                m_senders.clear();
-            }
-        }
+                    // give the reconnect thread a chance to fill the pool
+                    Thread.yield();
 
-        protected void onShutdown() {
-            synchronized (m_senderLock) {
-                Iterator senders = m_senders.iterator();
-                while (senders.hasNext()) {
-                    SendSession session = (SendSession) senders.next();
-                    session.cleanup();
+                    continue;
                 }
-                m_senders.clear();
+
+                break;
             }
         }
 
@@ -530,84 +741,33 @@
                 while (m_senders.size() == 0) {
                     try {
                         m_senderLock.wait(timeout);
+
                         if (m_senders.size() == 0) {
                             return null;
                         }
-                    }
-                    catch (InterruptedException ignore) {
+                    } catch (InterruptedException ignore) {
                         return null;
                     }
                 }
-                return (SendSession) m_senders.removeFirst();
-            }
-        }
 
-        private void returnSessionToPool(SendSession sendSession) {
-            synchronized (m_senderLock) {
-                m_senders.addLast(sendSession);
-                m_senderLock.notifyAll();
+                return (SendSession) m_senders.removeFirst();
             }
         }
 
         protected abstract class SendSession extends ConnectorSession {
             MessageProducer m_producer;
 
-            SendSession(Session session,
-                        MessageProducer producer)
-                    throws JMSException {
+            SendSession(Session session, MessageProducer producer) throws JMSException {
                 super(session);
                 m_producer = producer;
             }
 
-            protected abstract Destination createTemporaryDestination()
-                    throws JMSException;
-
-            protected abstract void deleteTemporaryDestination(Destination destination)
-                    throws JMSException;
-
-            protected abstract MessageConsumer createConsumer(Destination destination)
-                    throws JMSException;
-
-            protected abstract void send(Destination destination,
-                                         Message message,
-                                         int deliveryMode,
-                                         int priority,
-                                         long timeToLive)
-                    throws JMSException;
-
-            void send(JMSEndpoint endpoint, byte[] message, HashMap properties)
-                    throws Exception {
-                BytesMessage jmsMessage = m_session.createBytesMessage();
-                jmsMessage.writeBytes(message);
-                int deliveryMode = extractDeliveryMode(properties);
-                int priority = extractPriority(properties);
-                long timeToLive = extractTimeToLive(properties);
-
-                if (properties != null && !properties.isEmpty())
-                    setProperties(properties, jmsMessage);
-
-                send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
-                        priority, timeToLive);
-            }
-
-
-            void cleanup() {
-                try {
-                    m_producer.close();
-                } catch (Throwable t) {
-                }
-                try {
-                    m_session.close();
-                } catch (Throwable t) {
-                }
-            }
-
-            byte[] call(JMSEndpoint endpoint, byte[] message, long timeout,
-                        HashMap properties)
+            byte[] call(JMSEndpoint endpoint, byte[] message, long timeout, HashMap properties)
                     throws Exception {
                 Destination reply = createTemporaryDestination();
                 MessageConsumer subscriber = createConsumer(reply);
                 BytesMessage jmsMessage = m_session.createBytesMessage();
+
                 jmsMessage.writeBytes(message);
                 jmsMessage.setJMSReplyTo(reply);
 
@@ -615,269 +775,124 @@
                 int priority = extractPriority(properties);
                 long timeToLive = extractTimeToLive(properties);
 
-                if (properties != null && !properties.isEmpty())
+                if ((properties != null) && !properties.isEmpty()) {
                     setProperties(properties, jmsMessage);
+                }
+
+                send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, priority,
+                        timeToLive);
 
-                send(endpoint.getDestination(m_session), jmsMessage, deliveryMode,
-                        priority, timeToLive);
                 BytesMessage response = null;
+
                 try {
                     response = (BytesMessage) subscriber.receive(timeout);
                 } catch (ClassCastException cce) {
-                    throw new InvokeException
-                            ("Error: unexpected message type received - expected BytesMessage");
+                    throw new InvokeException(
+                            "Error: unexpected message type received - expected BytesMessage");
                 }
+
                 byte[] respBytes = null;
+
                 if (response != null) {
-                    byte[] buffer = new byte[8 * 1024];
+                    byte[]                buffer = new byte[8 * 1024];
                     ByteArrayOutputStream out = new ByteArrayOutputStream();
-                    for (int bytesRead = response.readBytes(buffer);
-                         bytesRead != -1; bytesRead = response.readBytes(buffer)) {
+
+                    for (int bytesRead = response.readBytes(buffer); bytesRead != -1;
+                         bytesRead = response.readBytes(buffer)) {
                         out.write(buffer, 0, bytesRead);
                     }
+
                     respBytes = out.toByteArray();
                 }
+
                 subscriber.close();
                 deleteTemporaryDestination(reply);
+
                 return respBytes;
             }
 
-            private int extractPriority(HashMap properties) {
-                return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,
-                        JMSConstants.DEFAULT_PRIORITY);
+            void cleanup() {
+                try {
+                    m_producer.close();
+                } catch (Throwable t) {
+                }
+
+                try {
+                    m_session.close();
+                } catch (Throwable t) {
+                }
             }
 
+            protected abstract MessageConsumer createConsumer(Destination destination)
+                    throws JMSException;
+
+            protected abstract Destination createTemporaryDestination() throws JMSException;
+
+            protected abstract void deleteTemporaryDestination(Destination destination)
+                    throws JMSException;
+
             private int extractDeliveryMode(HashMap properties) {
                 return MapUtils.removeIntProperty(properties, JMSConstants.DELIVERY_MODE,
                         JMSConstants.DEFAULT_DELIVERY_MODE);
             }
 
+            private int extractPriority(HashMap properties) {
+                return MapUtils.removeIntProperty(properties, JMSConstants.PRIORITY,
+                        JMSConstants.DEFAULT_PRIORITY);
+            }
+
             private long extractTimeToLive(HashMap properties) {
                 return MapUtils.removeLongProperty(properties, JMSConstants.TIME_TO_LIVE,
                         JMSConstants.DEFAULT_TIME_TO_LIVE);
             }
 
-            private void setProperties(HashMap properties, Message message)
-                    throws JMSException {
-                Iterator propertyIter = properties.entrySet().iterator();
-                while (propertyIter.hasNext()) {
-                    Map.Entry property = (Map.Entry) propertyIter.next();
-                    setProperty((String) property.getKey(), property.getValue(),
-                            message);
-                }
-            }
-
-            private void setProperty(String property, Object value, Message message)
-                    throws JMSException {
-                if (property == null)
-                    return;
-                if (property.equals(JMSConstants.JMS_CORRELATION_ID))
-                    message.setJMSCorrelationID((String) value);
-                else if (property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES))
-                    message.setJMSCorrelationIDAsBytes((byte[]) value);
-                else if (property.equals(JMSConstants.JMS_TYPE))
-                    message.setJMSType((String) value);
-                else
-                    message.setObjectProperty(property, value);
-            }
-        }
-    }
-
-    AsyncConnection getReceiveConnection() {
-        return m_receiveConnection;
-    }
-
-    protected abstract AsyncConnection createAsyncConnection(ConnectionFactory factory,
-                                                             javax.jms.Connection connection,
-                                                             String threadName,
-                                                             String clientID,
-                                                             String username,
-                                                             String password)
-
-            throws JMSException;
-
-    protected abstract class AsyncConnection extends Connection {
-        HashMap m_subscriptions;
-        Object m_subscriptionLock;
-
-        protected AsyncConnection(ConnectionFactory connectionFactory,
-                                  javax.jms.Connection connection,
-                                  String threadName,
-                                  String clientID,
-                                  String username,
-                                  String password)
-                throws JMSException {
-            super(connectionFactory, connection, threadName,
-                    clientID, username, password);
-            m_subscriptions = new HashMap();
-            m_subscriptionLock = new Object();
-        }
+            void send(JMSEndpoint endpoint, byte[] message, HashMap properties) throws Exception {
+                BytesMessage jmsMessage = m_session.createBytesMessage();
 
-        protected abstract ListenerSession createListenerSession(
-                javax.jms.Connection connection,
-                Subscription subscription)
-                throws Exception;
+                jmsMessage.writeBytes(message);
 
-        protected void onShutdown() {
-            synchronized (m_subscriptionLock) {
-                Iterator subscriptions = m_subscriptions.keySet().iterator();
-                while (subscriptions.hasNext()) {
-                    Subscription subscription = (Subscription) subscriptions.next();
-                    ListenerSession session = (ListenerSession)
-                            m_subscriptions.get(subscription);
-                    if (session != null) {
-                        session.cleanup();
-                    }
+                int deliveryMode = extractDeliveryMode(properties);
+                int priority = extractPriority(properties);
+                long timeToLive = extractTimeToLive(properties);
 
+                if ((properties != null) && !properties.isEmpty()) {
+                    setProperties(properties, jmsMessage);
                 }
-                m_subscriptions.clear();
-            }
-        }
-
-        /**
-         * @param subscription
-         * @todo add in security exception propagation
-         */
-        void subscribe(Subscription subscription)
-                throws Exception {
-            long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
-            synchronized (m_subscriptionLock) {
-                if (m_subscriptions.containsKey(subscription))
-                    return;
-                while (true) {
-                    if (System.currentTimeMillis() > timeoutTime) {
-                        throw new InvokeTimeoutException("Cannot subscribe listener");
-                    }
-
-                    try {
-                        ListenerSession session = createListenerSession(m_connection,
-                                subscription);
-                        m_subscriptions.put(subscription, session);
-                        break;
-                    }
-                    catch (JMSException jmse) {
-                        if (!m_adapter.isRecoverable(jmse, JMSVendorAdapter.SUBSCRIBE_ACTION)) {
-                            throw jmse;
-                        }
 
-                        try {
-                            m_subscriptionLock.wait(m_interactRetryInterval);
-                        }
-                        catch (InterruptedException ignore) {
-                        }
-                        //give reconnect a chance
-                        Thread.yield();
-                        continue;
-                    }
-                    catch (NullPointerException jmse) {
-                        //we ARE reconnecting
-                        try {
-                            m_subscriptionLock.wait(m_interactRetryInterval);
-                        }
-                        catch (InterruptedException ignore) {
-                        }
-                        //give reconnect a chance
-                        Thread.yield();
-                        continue;
-                    }
-                }
+                send(endpoint.getDestination(m_session), jmsMessage, deliveryMode, priority,
+                        timeToLive);
             }
-        }
 
-        void unsubscribe(Subscription subscription) {
-            long timeoutTime = System.currentTimeMillis() + m_timeoutTime;
-            synchronized (m_subscriptionLock) {
-                if (!m_subscriptions.containsKey(subscription))
-                    return;
-                while (true) {
-                    if (System.currentTimeMillis() > timeoutTime) {
-                        throw new InvokeTimeoutException("Cannot unsubscribe listener");
-                    }
+            protected abstract void send(Destination destination, Message message,
+                                         int deliveryMode, int priority, long timeToLive)
+                    throws JMSException;
 
-                    //give reconnect a chance
-                    Thread.yield();
-                    try {
-                        ListenerSession session = (ListenerSession)
-                                m_subscriptions.get(subscription);
-                        session.cleanup();
-                        m_subscriptions.remove(subscription);
-                        break;
-                    }
-                    catch (NullPointerException jmse) {
-                        //we are reconnecting
-                        try {
-                            m_subscriptionLock.wait(m_interactRetryInterval);
-                        }
-                        catch (InterruptedException ignore) {
-                        }
-                        continue;
-                    }
-                }
-            }
-        }
+            private void setProperties(HashMap properties, Message message) throws JMSException {
+                Iterator propertyIter = properties.entrySet().iterator();
 
-        protected void onConnect()
-                throws Exception {
-            synchronized (m_subscriptionLock) {
-                Iterator subscriptions = m_subscriptions.keySet().iterator();
-                while (subscriptions.hasNext()) {
-                    Subscription subscription = (Subscription) subscriptions.next();
+                while (propertyIter.hasNext()) {
+                    Map.Entry property = (Map.Entry) propertyIter.next();
 
-                    if (m_subscriptions.get(subscription) == null) {
-                        m_subscriptions.put(subscription,
-                                createListenerSession(m_connection, subscription));
-                    }
+                    setProperty((String) property.getKey(), property.getValue(), message);
                 }
-                m_subscriptionLock.notifyAll();
             }
-        }
 
-        protected void onException() {
-            synchronized (m_subscriptionLock) {
-                Iterator subscriptions = m_subscriptions.keySet().iterator();
-                while (subscriptions.hasNext()) {
-                    Subscription subscription = (Subscription) subscriptions.next();
-                    m_subscriptions.put(subscription, null);
+            private void setProperty(String property, Object value, Message message)
+                    throws JMSException {
+                if (property == null) {
+                    return;
                 }
-            }
-        }
-
-
-        protected class ListenerSession extends ConnectorSession {
-            protected MessageConsumer m_consumer;
-            protected Subscription m_subscription;
-
-            ListenerSession(Session session,
-                            MessageConsumer consumer,
-                            Subscription subscription)
-                    throws Exception {
-                super(session);
-                m_subscription = subscription;
-                m_consumer = consumer;
-                Destination destination = subscription.m_endpoint.getDestination(m_session);
-                m_consumer.setMessageListener(subscription.m_listener);
-            }
 
-            void cleanup() {
-                try {
-                    m_consumer.close();
-                } catch (Exception ignore) {
-                }
-                try {
-                    m_session.close();
-                } catch (Exception ignore) {
+                if (property.equals(JMSConstants.JMS_CORRELATION_ID)) {
+                    message.setJMSCorrelationID((String) value);
+                } else if (property.equals(JMSConstants.JMS_CORRELATION_ID_AS_BYTES)) {
+                    message.setJMSCorrelationIDAsBytes((byte[]) value);
+                } else if (property.equals(JMSConstants.JMS_TYPE)) {
+                    message.setJMSType((String) value);
+                } else {
+                    message.setObjectProperty(property, value);
                 }
             }
-
-        }
-    }
-
-    private abstract class ConnectorSession {
-        Session m_session;
-
-        ConnectorSession(Session session)
-                throws JMSException {
-            m_session = session;
         }
     }
-}
\ No newline at end of file
+}