You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by di...@apache.org on 2005/12/16 18:18:08 UTC

svn commit: r357187 [22/25] - in /webservices/axis2/trunk/java/modules/core/src/org/apache/axis2: ./ addressing/ client/ client/async/ context/ deployment/ deployment/listener/ deployment/repository/util/ deployment/scheduler/ deployment/util/ descript...

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/QueueConnector.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/QueueConnector.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/QueueConnector.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/QueueConnector.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
 /*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 
 package org.apache.axis2.transport.jms;
 
@@ -35,27 +36,22 @@
  * connections to queues (ptp domain).
  */
 public class QueueConnector extends JMSConnector {
-
-    public QueueConnector(ConnectionFactory factory,
-                          int numRetries,
-                          int numSessions,
-                          long connectRetryInterval,
-                          long interactRetryInterval,
-                          long timeoutTime,
-                          boolean allowReceive,
-                          String clientID,
-                          String username,
-                          String password,
-                          JMSVendorAdapter adapter,
-                          JMSURLHelper jmsurl)
+    public QueueConnector(ConnectionFactory factory, int numRetries, int numSessions,
+                          long connectRetryInterval, long interactRetryInterval, long timeoutTime,
+                          boolean allowReceive, String clientID, String username, String password,
+                          JMSVendorAdapter adapter, JMSURLHelper jmsurl)
             throws JMSException {
-        super(factory, numRetries, numSessions, connectRetryInterval,
-                interactRetryInterval, timeoutTime, allowReceive, clientID,
-                username, password, adapter, jmsurl);
+        super(factory, numRetries, numSessions, connectRetryInterval, interactRetryInterval,
+                timeoutTime, allowReceive, clientID, username, password, adapter, jmsurl);
     }
 
-    public JMSEndpoint createEndpoint(String destination) {
-        return new QueueEndpoint(destination);
+    protected AsyncConnection createAsyncConnection(ConnectionFactory factory,
+                                                    javax.jms.Connection connection, String threadName, String clientID, String username,
+                                                    String password)
+            throws JMSException {
+        return new QueueAsyncConnection((QueueConnectionFactory) factory,
+                (QueueConnection) connection, threadName, clientID,
+                username, password);
     }
 
     /**
@@ -65,37 +61,20 @@
      * @return
      * @throws JMSException
      */
-    public JMSEndpoint createEndpoint(Destination destination)
-            throws JMSException {
-        if (!(destination instanceof Queue))
+    public JMSEndpoint createEndpoint(Destination destination) throws JMSException {
+        if (!(destination instanceof Queue)) {
             throw new IllegalArgumentException("The input must be a queue for this connector");
+        }
+
         return new QueueDestinationEndpoint((Queue) destination);
     }
 
-    protected javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
-                                                   String username,
-                                                   String password)
-            throws JMSException {
-        QueueConnectionFactory qcf = (QueueConnectionFactory) connectionFactory;
-        if (username == null)
-            return qcf.createQueueConnection();
-
-        return qcf.createQueueConnection(username, password);
+    public JMSEndpoint createEndpoint(String destination) {
+        return new QueueEndpoint(destination);
     }
 
-
-    protected SyncConnection createSyncConnection(ConnectionFactory factory,
-                                                  javax.jms.Connection connection,
-                                                  int numSessions,
-                                                  String threadName,
-                                                  String clientID,
-                                                  String username,
-                                                  String password)
-
-            throws JMSException {
-        return new QueueSyncConnection((QueueConnectionFactory) factory,
-                (QueueConnection) connection, numSessions,
-                threadName, clientID, username, password);
+    private Queue createQueue(QueueSession session, String subject) throws Exception {
+        return m_adapter.getQueue(session, subject);
     }
 
     private QueueSession createQueueSession(QueueConnection connection, int ackMode)
@@ -103,74 +82,68 @@
         return connection.createQueueSession(false, ackMode);
     }
 
-    private Queue createQueue(QueueSession session, String subject)
-            throws Exception {
-        return m_adapter.getQueue(session, subject);
+    private QueueReceiver createReceiver(QueueSession session, Queue queue, String messageSelector)
+            throws JMSException {
+        return session.createReceiver(queue, messageSelector);
     }
 
-    private QueueReceiver createReceiver(QueueSession session,
-                                         Queue queue,
-                                         String messageSelector)
+    protected SyncConnection createSyncConnection(ConnectionFactory factory,
+                                                  javax.jms.Connection connection, int numSessions, String threadName, String clientID,
+                                                  String username, String password)
             throws JMSException {
-        return session.createReceiver(queue, messageSelector);
+        return new QueueSyncConnection((QueueConnectionFactory) factory,
+                (QueueConnection) connection, numSessions, threadName,
+                clientID, username, password);
     }
 
-    private final class QueueSyncConnection extends SyncConnection {
-        QueueSyncConnection(QueueConnectionFactory connectionFactory,
-                            QueueConnection connection,
-                            int numSessions,
-                            String threadName,
-                            String clientID,
-                            String username,
-                            String password)
-                throws JMSException {
-            super(connectionFactory, connection, numSessions, threadName,
-                    clientID, username, password);
+    protected javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
+                                                   String username, String password)
+            throws JMSException {
+        QueueConnectionFactory qcf = (QueueConnectionFactory) connectionFactory;
+
+        if (username == null) {
+            return qcf.createQueueConnection();
         }
 
-        protected SendSession createSendSession(javax.jms.Connection connection)
+        return qcf.createQueueConnection(username, password);
+    }
+
+    private final class QueueAsyncConnection extends AsyncConnection {
+        QueueAsyncConnection(QueueConnectionFactory connectionFactory, QueueConnection connection,
+                             String threadName, String clientID, String username, String password)
                 throws JMSException {
-            QueueSession session = createQueueSession((QueueConnection) connection,
-                    JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
-            QueueSender sender = session.createSender(null);
-            return new QueueSendSession(session, sender);
+            super(connectionFactory, connection, threadName, clientID, username, password);
         }
 
-        private final class QueueSendSession extends SendSession {
-            QueueSendSession(QueueSession session,
-                             QueueSender sender)
-                    throws JMSException {
-                super(session, sender);
-            }
-
-            protected MessageConsumer createConsumer(Destination destination)
-                    throws JMSException {
-                return createReceiver((QueueSession) m_session, (Queue) destination, null);
-            }
+        protected ListenerSession createListenerSession(javax.jms.Connection connection,
+                                                        Subscription subscription)
+                throws Exception {
+            QueueSession session = createQueueSession((QueueConnection) connection,
+                    subscription.m_ackMode);
+            QueueReceiver receiver = createReceiver(session,
+                    (Queue) subscription.m_endpoint.getDestination(session),
+                    subscription.m_messageSelector);
 
+            return new ListenerSession(session, receiver, subscription);
+        }
+    }
 
-            protected Destination createTemporaryDestination()
-                    throws JMSException {
-                return ((QueueSession) m_session).createTemporaryQueue();
-            }
 
-            protected void deleteTemporaryDestination(Destination destination)
-                    throws JMSException {
-                ((TemporaryQueue) destination).delete();
-            }
+    private final class QueueDestinationEndpoint extends QueueEndpoint {
+        Queue m_queue;
 
-            protected void send(Destination destination, Message message,
-                                int deliveryMode, int priority, long timeToLive)
-                    throws JMSException {
-                ((QueueSender) m_producer).send((Queue) destination, message,
-                        deliveryMode, priority, timeToLive);
-            }
+        QueueDestinationEndpoint(Queue queue) throws JMSException {
+            super(queue.getQueueName());
+            m_queue = queue;
+        }
 
+        Destination getDestination(Session session) {
+            return m_queue;
         }
     }
 
-    private class QueueEndpoint
-            extends JMSEndpoint {
+
+    private class QueueEndpoint extends JMSEndpoint {
         String m_queueName;
 
         QueueEndpoint(String queueName) {
@@ -178,80 +151,73 @@
             m_queueName = queueName;
         }
 
-        Destination getDestination(Session session)
-                throws Exception {
-            return createQueue((QueueSession) session, m_queueName);
-        }
-
-        public String toString() {
-            StringBuffer buffer = new StringBuffer("QueueEndpoint:");
-            buffer.append(m_queueName);
-            return buffer.toString();
-        }
-
         public boolean equals(Object object) {
-            if (!super.equals(object))
+            if (!super.equals(object)) {
                 return false;
+            }
 
-            if (!(object instanceof QueueEndpoint))
+            if (!(object instanceof QueueEndpoint)) {
                 return false;
+            }
 
             return m_queueName.equals(((QueueEndpoint) object).m_queueName);
         }
-    }
 
+        public String toString() {
+            StringBuffer buffer = new StringBuffer("QueueEndpoint:");
 
-    private final class QueueDestinationEndpoint
-            extends QueueEndpoint {
-        Queue m_queue;
+            buffer.append(m_queueName);
 
-        QueueDestinationEndpoint(Queue queue)
-                throws JMSException {
-            super(queue.getQueueName());
-            m_queue = queue;
+            return buffer.toString();
         }
 
-        Destination getDestination(Session session) {
-            return m_queue;
+        Destination getDestination(Session session) throws Exception {
+            return createQueue((QueueSession) session, m_queueName);
         }
-
-    }
-
-    protected AsyncConnection createAsyncConnection(ConnectionFactory factory,
-                                                    javax.jms.Connection connection,
-                                                    String threadName,
-                                                    String clientID,
-                                                    String username,
-                                                    String password)
-            throws JMSException {
-        return new QueueAsyncConnection((QueueConnectionFactory) factory,
-                (QueueConnection) connection, threadName,
-                clientID, username, password);
     }
 
-    private final class QueueAsyncConnection extends AsyncConnection {
 
-        QueueAsyncConnection(QueueConnectionFactory connectionFactory,
-                             QueueConnection connection,
-                             String threadName,
-                             String clientID,
-                             String username,
-                             String password)
+    private final class QueueSyncConnection extends SyncConnection {
+        QueueSyncConnection(QueueConnectionFactory connectionFactory, QueueConnection connection,
+                            int numSessions, String threadName, String clientID, String username,
+                            String password)
                 throws JMSException {
-            super(connectionFactory, connection, threadName, clientID, username, password);
+            super(connectionFactory, connection, numSessions, threadName, clientID, username,
+                    password);
         }
 
-        protected ListenerSession createListenerSession(javax.jms.Connection connection,
-                                                        Subscription subscription)
-                throws Exception {
+        protected SendSession createSendSession(javax.jms.Connection connection)
+                throws JMSException {
             QueueSession session = createQueueSession((QueueConnection) connection,
-                    subscription.m_ackMode);
-            QueueReceiver receiver = createReceiver(session,
-                    (Queue) subscription.m_endpoint.getDestination(session),
-                    subscription.m_messageSelector);
-            return new ListenerSession(session, receiver, subscription);
+                    JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
+            QueueSender sender = session.createSender(null);
+
+            return new QueueSendSession(session, sender);
         }
 
-    }
+        private final class QueueSendSession extends SendSession {
+            QueueSendSession(QueueSession session, QueueSender sender) throws JMSException {
+                super(session, sender);
+            }
 
-}
\ No newline at end of file
+            protected MessageConsumer createConsumer(Destination destination) throws JMSException {
+                return createReceiver((QueueSession) m_session, (Queue) destination, null);
+            }
+
+            protected Destination createTemporaryDestination() throws JMSException {
+                return ((QueueSession) m_session).createTemporaryQueue();
+            }
+
+            protected void deleteTemporaryDestination(Destination destination) throws JMSException {
+                ((TemporaryQueue) destination).delete();
+            }
+
+            protected void send(Destination destination, Message message, int deliveryMode,
+                                int priority, long timeToLive)
+                    throws JMSException {
+                ((QueueSender) m_producer).send((Queue) destination, message, deliveryMode,
+                        priority, timeToLive);
+            }
+        }
+    }
+}

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSListener.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSListener.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSListener.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSListener.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
 /*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 
 package org.apache.axis2.transport.jms;
 
@@ -37,7 +38,6 @@
 import java.util.Iterator;
 import java.util.Properties;
 
-
 /**
  * SimpleJMSListener implements the javax.jms.MessageListener interface. Its
  * basic purpose is listen asynchronously for messages and to pass them off
@@ -48,72 +48,112 @@
  * for production code, but for demos, debugging, and performance profiling.
  */
 public class SimpleJMSListener extends TransportListener implements MessageListener {
-    protected static Log log =
-            LogFactory.getLog(SimpleJMSListener.class.getName());
+    protected static Log log = LogFactory.getLog(SimpleJMSListener.class.getName());
 
     // Do we use (multiple) threads to process incoming messages?
     private boolean doThreads = true;
-
+    protected ConfigurationContext configurationContext;
     private JMSConnector connector;
+    private String destination;
     private JMSEndpoint endpoint;
     private HashMap properties;
-    private String destination;
-    protected ConfigurationContext configurationContext;
 
     public SimpleJMSListener() {
+    }
+
+    public SimpleJMSListener(String repositoryDirectory, HashMap connectorMap, HashMap cfMap,
+                             String destination, String username, String password,
+                             boolean doThreads)
+            throws Exception {
+        ConfigurationContextFactory erfac = new ConfigurationContextFactory();
+
+        this.configurationContext = erfac.buildConfigurationContext(repositoryDirectory);
+        this.doThreads = doThreads;
+        initListener(connectorMap, cfMap, username, password, destination);
+    }
+
+    public static final HashMap createCFMap(OptionsParser optionsParser) throws IOException {
+        String cfFile = optionsParser.isValueSet('c');
 
+        if (cfFile == null) {
+            return null;
+        }
+
+        Properties cfProps = new Properties();
+
+        cfProps.load(new BufferedInputStream(new FileInputStream(cfFile)));
+
+        HashMap cfMap = new HashMap(cfProps);
+
+        return cfMap;
+    }
+
+    public static final HashMap createConnectorMap(
+            org.apache.axis2.util.OptionsParser optionsParser) {
+        HashMap connectorMap = new HashMap();
+
+        if (optionsParser.isFlagSet('t') > 0) {
+
+            // queue is default so only setup map if topic domain is required
+            connectorMap.put(JMSConstants.DOMAIN, JMSConstants.DOMAIN_TOPIC);
+        }
+
+        return connectorMap;
     }
 
-    public void init(ConfigurationContext axisConf, TransportInDescription transprtIn) throws AxisFault {
+    public void init(ConfigurationContext axisConf, TransportInDescription transprtIn)
+            throws AxisFault {
         try {
             this.configurationContext = axisConf;
+
             HashMap params = new HashMap();
             Iterator iterator = transprtIn.getParameters().iterator();
+
             while (iterator.hasNext()) {
                 Parameter param = (Parameter) iterator.next();
+
                 params.put(param.getName(), param.getValue());
             }
-            String user = null, password = null, destination = null;
+
+            String user = null,
+                    password = null,
+                    destination = null;
+
             if (transprtIn.getParameter(JNDIVendorAdapter.USER) != null) {
                 user = (String) transprtIn.getParameter(JNDIVendorAdapter.USER).getValue();
             }
+
             if (transprtIn.getParameter(JNDIVendorAdapter.PASSWORD) != null) {
                 password = (String) transprtIn.getParameter(JNDIVendorAdapter.PASSWORD).getValue();
             }
+
             if (transprtIn.getParameter(JNDIVendorAdapter.DESTINATION) != null) {
-                destination = (String) transprtIn.getParameter(JNDIVendorAdapter.DESTINATION).getValue();
+                destination =
+                        (String) transprtIn.getParameter(JNDIVendorAdapter.DESTINATION).getValue();
             }
+
             initListener(params, params, user, password, destination);
         } catch (Exception e1) {
             throw new AxisFault(e1);
         }
     }
 
-    public SimpleJMSListener(String repositoryDirectory, HashMap connectorMap, HashMap cfMap,
-                             String destination, String username,
-                             String password, boolean doThreads)
+    private void initListener(HashMap connectorMap, HashMap cfMap, String username,
+                              String password, String destination)
             throws Exception {
-        ConfigurationContextFactory erfac = new ConfigurationContextFactory();
-        this.configurationContext = erfac.buildConfigurationContext(repositoryDirectory);
-        this.doThreads = doThreads;
-
-        initListener(connectorMap, cfMap, username, password, destination);
-    }
-
-    private void initListener(HashMap connectorMap, HashMap cfMap, String username, String password, String destination) throws Exception {
         try {
+
             // create a JMS connector using the default vendor adapter
             JMSVendorAdapter adapter = JMSVendorAdapterFactory.getJMSVendorAdapter();
-            this.connector = JMSConnectorFactory.createServerConnector(connectorMap,
-                    cfMap,
-                    username,
-                    password,
-                    adapter);
+
+            this.connector = JMSConnectorFactory.createServerConnector(connectorMap, cfMap,
+                    username, password, adapter);
             this.properties = new HashMap(connectorMap);
             this.properties.putAll(cfMap);
             this.destination = destination;
         } catch (Exception e) {
             log.error(Messages.getMessage("exception00"), e);
+
             throw e;
         }
 
@@ -121,12 +161,21 @@
         endpoint = connector.createEndpoint(destination);
     }
 
-    protected JMSConnector getConnector() {
-        return connector;
-    }
+    public static void main(String[] args) throws Exception {
+        OptionsParser optionsParser = new OptionsParser(args);
 
-    public ConfigurationContext getConfigurationContext() {
-        return this.configurationContext;
+        // first check if we should print usage
+        if ((optionsParser.isFlagSet('?') > 0) || (optionsParser.isFlagSet('h') > 0)) {
+            printUsage();
+        }
+
+        SimpleJMSListener listener = new SimpleJMSListener(optionsParser.isValueSet('r'),
+                createConnectorMap(optionsParser),
+                createCFMap(optionsParser), optionsParser.isValueSet('d'),
+                optionsParser.getUser(), optionsParser.getPassword(),
+                optionsParser.isFlagSet('s') > 0);
+
+        listener.start();
     }
 
     /**
@@ -136,24 +185,44 @@
      */
     public void onMessage(javax.jms.Message message) {
         try {
+
             // pass off the message to a worker as a BytesMessage
-            SimpleJMSWorker worker = new SimpleJMSWorker(configurationContext, this, (BytesMessage) message);
+            SimpleJMSWorker worker = new SimpleJMSWorker(configurationContext, this,
+                    (BytesMessage) message);
 
             // do we allow multi-threaded workers?
             if (doThreads) {
                 Thread t = new Thread(worker);
+
                 t.start();
             } else {
                 worker.run();
             }
-        }
-        catch (ClassCastException cce) {
+        } catch (ClassCastException cce) {
             log.error(Messages.getMessage("exception00"), cce);
             cce.printStackTrace();
+
             return;
         }
     }
 
+    public static void printUsage() {
+        System.out.println("Usage: SimpleJMSListener [options]");
+        System.out.println(" Opts: -? this message");
+        System.out.println();
+        System.out.println("       -r repository directory location");
+        System.out.println("       -c connection factory properties filename");
+        System.out.println("       -d destination");
+        System.out.println("       -t topic [absence of -t indicates queue]");
+        System.out.println();
+        System.out.println("       -u username");
+        System.out.println("       -w password");
+        System.out.println();
+        System.out.println("       -s single-threaded listener");
+        System.out.println("          [absence of option => multithreaded]");
+        System.exit(1);
+    }
+
     public void start() {
         try {
             endpoint.registerListener(this, properties);
@@ -161,6 +230,7 @@
             log.error(Messages.getMessage("exception00"), e);
             e.printStackTrace();
         }
+
         connector.start();
     }
 
@@ -175,75 +245,29 @@
         }
     }
 
-    public EndpointReference getReplyToEPR(String serviceName) throws AxisFault {
-        try {
-            JMSURLHelper url = new JMSURLHelper("jms:/" + destination);
-            url.getProperties().putAll(properties);
-            return new EndpointReference(url.getURLString());
-        } catch (Exception e) {
-            log.error(Messages.getMessage("exception00"), e);
-            throw AxisFault.makeFault(e);
-        }
+    public ConfigurationContext getConfigurationContext() {
+        return this.configurationContext;
     }
 
-    public static final HashMap createConnectorMap(org.apache.axis2.util.OptionsParser optionsParser) {
-        HashMap connectorMap = new HashMap();
-        if (optionsParser.isFlagSet('t') > 0) {
-            //queue is default so only setup map if topic domain is required
-            connectorMap.put(JMSConstants.DOMAIN, JMSConstants.DOMAIN_TOPIC);
-        }
-        return connectorMap;
+    protected JMSConnector getConnector() {
+        return connector;
     }
 
-    public static final HashMap createCFMap(OptionsParser optionsParser)
-            throws IOException {
-        String cfFile = optionsParser.isValueSet('c');
-        if (cfFile == null)
-            return null;
-
-        Properties cfProps = new Properties();
-        cfProps.load(new BufferedInputStream(new FileInputStream(cfFile)));
-        HashMap cfMap = new HashMap(cfProps);
-        return cfMap;
+    public HashMap getProperties() {
+        return properties;
     }
 
-    public static void main(String[] args) throws Exception {
-        OptionsParser optionsParser = new OptionsParser(args);
-
-        // first check if we should print usage
-        if ((optionsParser.isFlagSet('?') > 0) || (optionsParser.isFlagSet('h') > 0))
-            printUsage();
-
-        SimpleJMSListener listener = new SimpleJMSListener(
-                optionsParser.isValueSet('r'),
-                createConnectorMap(optionsParser),
-                createCFMap(optionsParser),
-                optionsParser.isValueSet('d'),
-                optionsParser.getUser(),
-                optionsParser.getPassword(),
-                optionsParser.isFlagSet('s') > 0);
-        listener.start();
-    }
+    public EndpointReference getReplyToEPR(String serviceName) throws AxisFault {
+        try {
+            JMSURLHelper url = new JMSURLHelper("jms:/" + destination);
 
-    public static void printUsage() {
-        System.out.println("Usage: SimpleJMSListener [options]");
-        System.out.println(" Opts: -? this message");
-        System.out.println();
-        System.out.println("       -r repository directory location");
-        System.out.println("       -c connection factory properties filename");
-        System.out.println("       -d destination");
-        System.out.println("       -t topic [absence of -t indicates queue]");
-        System.out.println();
-        System.out.println("       -u username");
-        System.out.println("       -w password");
-        System.out.println();
-        System.out.println("       -s single-threaded listener");
-        System.out.println("          [absence of option => multithreaded]");
+            url.getProperties().putAll(properties);
 
-        System.exit(1);
-    }
+            return new EndpointReference(url.getURLString());
+        } catch (Exception e) {
+            log.error(Messages.getMessage("exception00"), e);
 
-    public HashMap getProperties() {
-        return properties;
+            throw AxisFault.makeFault(e);
+        }
     }
 }

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
 /*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 
 package org.apache.axis2.transport.jms;
 
@@ -20,8 +21,8 @@
 import org.apache.axis2.Constants;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.TransportOutDescription;
 import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.TransportOutDescription;
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.i18n.Messages;
 import org.apache.axis2.om.OMException;
@@ -58,65 +59,181 @@
  * the server, and sends back response msg to the replyTo destination.
  */
 public class SimpleJMSWorker implements Runnable {
-    protected static Log log =
-            LogFactory.getLog(SimpleJMSWorker.class.getName());
-
+    protected static Log log = LogFactory.getLog(SimpleJMSWorker.class.getName());
+    private ConfigurationContext configurationContext;
     SimpleJMSListener listener;
     BytesMessage message;
-    private ConfigurationContext configurationContext;
 
-    public SimpleJMSWorker(ConfigurationContext configurationContext, SimpleJMSListener listener, BytesMessage message) {
+    public SimpleJMSWorker(ConfigurationContext configurationContext, SimpleJMSListener listener,
+                           BytesMessage message) {
         this.listener = listener;
         this.message = message;
         this.configurationContext = configurationContext;
     }
 
+    public static void processJMSRequest(MessageContext msgContext, InputStream in,
+                                         String contentType)
+            throws AxisFault {
+        boolean soap11 = false;
+
+        try {
+            msgContext.setServerSide(true);
+
+            SOAPEnvelope envelope = null;
+            StAXBuilder builder = null;
+
+            if (contentType != null) {
+                if (contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) {
+
+                    // It is MTOM
+                    builder = HTTPTransportUtils.selectBuilderForMIME(msgContext, in, contentType);
+                    envelope = (SOAPEnvelope) builder.getDocumentElement();
+                } else {
+                    Reader reader = new InputStreamReader(in);
+                    XMLStreamReader xmlreader;
+
+                    // Figure out the char set encoding and create the reader
+
+                    // If charset is not specified
+                    if (TransportUtils.getCharSetEncoding(contentType) == null) {
+                        xmlreader = XMLInputFactory.newInstance().createXMLStreamReader(in,
+                                MessageContext.DEFAULT_CHAR_SET_ENCODING);
+
+                        // Set the encoding scheme in the message context
+                        msgContext.setProperty(MessageContext.CHARACTER_SET_ENCODING,
+                                MessageContext.DEFAULT_CHAR_SET_ENCODING);
+                    } else {
+
+                        // get the type of char encoding
+                        String charSetEnc = TransportUtils.getCharSetEncoding(contentType);
+
+                        xmlreader = XMLInputFactory.newInstance().createXMLStreamReader(in,
+                                charSetEnc);
+
+                        // Setting the value in msgCtx
+                        msgContext.setProperty(MessageContext.CHARACTER_SET_ENCODING, charSetEnc);
+                    }
+
+                    if (contentType.indexOf(SOAP12Constants.SOAP_12_CONTENT_TYPE) > -1) {
+                        soap11 = false;
+
+                        // it is SOAP 1.2
+                        builder =
+                                new StAXSOAPModelBuilder(xmlreader,
+                                        SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+                        envelope = (SOAPEnvelope) builder.getDocumentElement();
+                    } else if (contentType.indexOf(SOAP11Constants.SOAP_11_CONTENT_TYPE) > -1) {
+                        soap11 = true;
+                        builder =
+                                new StAXSOAPModelBuilder(xmlreader,
+                                        SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+                        envelope = (SOAPEnvelope) builder.getDocumentElement();
+                    }
+                }
+            }
+
+            String charsetEncoding = builder.getDocument().getCharsetEncoding();
+
+            if ((charsetEncoding != null) && !"".equals(charsetEncoding)
+                    && !((String) msgContext.getProperty(
+                    MessageContext.CHARACTER_SET_ENCODING)).equalsIgnoreCase(charsetEncoding)) {
+                String faultCode;
+
+                if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(
+                        envelope.getNamespace().getName())) {
+                    faultCode = SOAP12Constants.FAULT_CODE_SENDER;
+                } else {
+                    faultCode = SOAP11Constants.FAULT_CODE_SENDER;
+                }
+
+                throw new AxisFault(
+                        "Character Set Encoding from " + "transport information do not match with "
+                                + "character set encoding in the received SOAP message", faultCode);
+            }
+
+            msgContext.setEnvelope(envelope);
+
+            AxisEngine engine = new AxisEngine(msgContext.getConfigurationContext());
+
+            if (envelope.getBody().hasFault()) {
+                engine.receiveFault(msgContext);
+            } else {
+                engine.receive(msgContext);
+            }
+        } catch (SOAPProcessingException e) {
+            throw new AxisFault(e);
+        } catch (AxisFault e) {
+
+            // rethrow
+            throw e;
+        } catch (OMException e) {
+            throw new AxisFault(e);
+        } catch (XMLStreamException e) {
+            throw new AxisFault(e);
+        } catch (FactoryConfigurationError e) {
+            throw new AxisFault(e);
+        } catch (UnsupportedEncodingException e) {
+            throw new AxisFault(e);
+        } finally {
+            if ((msgContext.getEnvelope() == null) && !soap11) {
+                msgContext.setEnvelope(new SOAP12Factory().createSOAPEnvelope());
+            }
+        }
+    }
+
     /**
      * This is where the incoming message is processed.
      */
     public void run() {
         InputStream in = null;
+
         try {
+
             // get the incoming msg content into a byte array
-            byte[] buffer = new byte[8 * 1024];
+            byte[]                buffer = new byte[8 * 1024];
             ByteArrayOutputStream out = new ByteArrayOutputStream();
-            for (int bytesRead = message.readBytes(buffer);
-                 bytesRead != -1; bytesRead = message.readBytes(buffer)) {
+
+            for (int bytesRead = message.readBytes(buffer); bytesRead != -1;
+                 bytesRead = message.readBytes(buffer)) {
                 out.write(buffer, 0, bytesRead);
             }
+
             in = new ByteArrayInputStream(out.toByteArray());
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.error(Messages.getMessage("exception00"), e);
             e.printStackTrace();
+
             return;
         }
 
         // if the incoming message has a contentType set,
         // pass it to my new Message
         String contentType = null;
+
         try {
             contentType = message.getStringProperty("contentType");
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.error(Messages.getMessage("exception00"), e);
             e.printStackTrace();
+
             return;
         }
 
         // if the incoming message has a contentType set,
         // pass it to my new Message
         String soapAction = null;
+
         try {
             soapAction = message.getStringProperty("SOAPAction");
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.error(Messages.getMessage("exception00"), e);
             e.printStackTrace();
+
             return;
         }
 
         MessageContext msgContext;
+
         try {
             TransportInDescription transportIn =
                     configurationContext.getAxisConfiguration().getTransportIn(
@@ -124,163 +241,33 @@
             TransportOutDescription transportOut =
                     configurationContext.getAxisConfiguration().getTransportOut(
                             new QName(Constants.TRANSPORT_JMS));
-            msgContext = new MessageContext(
-                    configurationContext,
-                    transportIn,
-                    transportOut);
-            msgContext.setProperty(
-                    Constants.OUT_TRANSPORT_INFO,
-                    new JMSOutTransportInfo(message.getJMSReplyTo(), listener.getProperties()));
+
+            msgContext = new MessageContext(configurationContext, transportIn, transportOut);
+            msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
+                    new JMSOutTransportInfo(message.getJMSReplyTo(),
+                            listener.getProperties()));
             msgContext.setTransportOut(transportOut);
             msgContext.setServerSide(true);
         } catch (Exception e) {
             log.error(Messages.getMessage("exception00"), e);
             e.printStackTrace();
+
             return;
         }
 
         msgContext.setServiceGroupContextId(UUIDGenerator.getUUID());
+
         if (soapAction != null) {
             msgContext.setSoapAction(soapAction);
         }
 
         try {
-            processJMSRequest(
-                    msgContext,
-                    in,
-                    contentType
-            );
+            processJMSRequest(msgContext, in, contentType);
         } catch (Exception e) {
             log.error(Messages.getMessage("exception00"), e);
             e.printStackTrace();
-            return;
-        }
-    }
-
-    public static void processJMSRequest(
-            MessageContext msgContext,
-            InputStream in,
-            String contentType
-    )
-            throws AxisFault {
-        boolean soap11 = false;
-        try {
-            msgContext.setServerSide(true);
-
-            SOAPEnvelope envelope = null;
-            StAXBuilder builder = null;
-            if (contentType != null) {
-                if (contentType
-                        .indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED)
-                        > -1) {
-                    //It is MTOM
-                    builder = HTTPTransportUtils.selectBuilderForMIME(msgContext, in, contentType);
-                    envelope = (SOAPEnvelope) builder.getDocumentElement();
-                } else {
-                    Reader reader = new InputStreamReader(in);
-
-                    XMLStreamReader xmlreader;
-                    //Figure out the char set encoding and create the reader
-
-                    //If charset is not specified
-                    if (TransportUtils.getCharSetEncoding(contentType) == null) {
-                        xmlreader =
-                                XMLInputFactory
-                                        .newInstance()
-                                        .createXMLStreamReader(
-                                                in,
-                                                MessageContext.DEFAULT_CHAR_SET_ENCODING);
-                        //Set the encoding scheme in the message context
-                        msgContext.setProperty(
-                                MessageContext.CHARACTER_SET_ENCODING,
-                                MessageContext.DEFAULT_CHAR_SET_ENCODING);
-                    } else {
-                        //get the type of char encoding
-                        String charSetEnc = TransportUtils.getCharSetEncoding(contentType);
-                        xmlreader =
-                                XMLInputFactory
-                                        .newInstance()
-                                        .createXMLStreamReader(
-                                                in,
-                                                charSetEnc);
-
-                        //Setting the value in msgCtx
-                        msgContext.setProperty(
-                                MessageContext.CHARACTER_SET_ENCODING,
-                                charSetEnc);
-
-                    }
-                    if (contentType
-                            .indexOf(SOAP12Constants.SOAP_12_CONTENT_TYPE)
-                            > -1) {
-                        soap11 = false;
-                        //it is SOAP 1.2
-                        builder =
-                                new StAXSOAPModelBuilder(
-                                        xmlreader,
-                                        SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI);
-                        envelope = (SOAPEnvelope) builder.getDocumentElement();
-                    } else if (
-                            contentType.indexOf(
-                                    SOAP11Constants.SOAP_11_CONTENT_TYPE)
-                                    > -1) {
-                        soap11 = true;
-                        builder =
-                                new StAXSOAPModelBuilder(
-                                        xmlreader,
-                                        SOAP11Constants
-                                                .SOAP_ENVELOPE_NAMESPACE_URI);
-                        envelope =
-                                (SOAPEnvelope) builder.getDocumentElement();
-                    }
-
-                }
-
-            }
-
-            String charsetEncoding = builder.getDocument().getCharsetEncoding();
-            if (charsetEncoding != null && !"".equals(charsetEncoding) &&
-                    !((String) msgContext.getProperty(MessageContext.CHARACTER_SET_ENCODING))
-                            .equalsIgnoreCase(charsetEncoding)) {
-                String faultCode;
-                if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(envelope.getNamespace().getName())) {
-                    faultCode = SOAP12Constants.FAULT_CODE_SENDER;
-                } else {
-                    faultCode = SOAP11Constants.FAULT_CODE_SENDER;
-                }
-                throw new AxisFault("Character Set Encoding from " +
-                        "transport information do not match with " +
-                        "character set encoding in the received SOAP message", faultCode);
-            }
-
-
-            msgContext.setEnvelope(envelope);
-            AxisEngine engine = new AxisEngine(msgContext.getConfigurationContext());
-            if (envelope.getBody().hasFault()) {
-                engine.receiveFault(msgContext);
-            } else {
-                engine.receive(msgContext);
-            }
-        } catch (SOAPProcessingException e) {
-            throw new AxisFault(e);
-
-        } catch (AxisFault e) {
-            //rethrow
-            throw e;
-        } catch (OMException e) {
-            throw new AxisFault(e);
-        } catch (XMLStreamException e) {
-            throw new AxisFault(e);
-        } catch (FactoryConfigurationError e) {
-            throw new AxisFault(e);
-        } catch (UnsupportedEncodingException e) {
-            throw new AxisFault(e);
-        } finally {
-            if (msgContext.getEnvelope() == null && !soap11) {
-                msgContext.setEnvelope(
-                        new SOAP12Factory().createSOAPEnvelope());
-            }
 
+            return;
         }
     }
 }

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
 /*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 
 package org.apache.axis2.transport.jms;
 
@@ -24,48 +25,47 @@
  */
 
 public class Subscription {
-    MessageListener m_listener;
+    int m_ackMode;
     JMSEndpoint m_endpoint;
+    MessageListener m_listener;
     String m_messageSelector;
-    int m_ackMode;
 
-    Subscription(MessageListener listener,
-                 JMSEndpoint endpoint,
-                 HashMap properties) {
+    Subscription(MessageListener listener, JMSEndpoint endpoint, HashMap properties) {
         m_listener = listener;
         m_endpoint = endpoint;
-        m_messageSelector = MapUtils.removeStringProperty(
-                properties,
-                JMSConstants.MESSAGE_SELECTOR,
-                null);
-        m_ackMode = MapUtils.removeIntProperty(properties,
-                JMSConstants.ACKNOWLEDGE_MODE,
+        m_messageSelector = MapUtils.removeStringProperty(properties,
+                JMSConstants.MESSAGE_SELECTOR, null);
+        m_ackMode = MapUtils.removeIntProperty(properties, JMSConstants.ACKNOWLEDGE_MODE,
                 JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
     }
 
-    public int hashCode() {
-        return toString().hashCode();
-    }
-
     public boolean equals(Object obj) {
-        if (obj == null || !(obj instanceof Subscription))
+        if ((obj == null) || !(obj instanceof Subscription)) {
             return false;
+        }
+
         Subscription other = (Subscription) obj;
+
         if (m_messageSelector == null) {
-            if (other.m_messageSelector != null)
+            if (other.m_messageSelector != null) {
                 return false;
+            }
         } else {
-            if (other.m_messageSelector == null ||
-                    !other.m_messageSelector.equals(m_messageSelector))
+            if ((other.m_messageSelector == null)
+                    || !other.m_messageSelector.equals(m_messageSelector)) {
                 return false;
+            }
         }
-        return m_ackMode == other.m_ackMode &&
-                m_endpoint.equals(other.m_endpoint) &&
-                other.m_listener.equals(m_listener);
+
+        return (m_ackMode == other.m_ackMode) && m_endpoint.equals(other.m_endpoint)
+                && other.m_listener.equals(m_listener);
+    }
+
+    public int hashCode() {
+        return toString().hashCode();
     }
 
     public String toString() {
         return m_listener.toString();
     }
-
-}
\ No newline at end of file
+}

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
 /*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 
 package org.apache.axis2.transport.jms;
 
@@ -37,62 +38,28 @@
  * connections to topics (pub-sub domain).
  */
 public class TopicConnector extends JMSConnector {
-    public TopicConnector(TopicConnectionFactory factory,
-                          int numRetries,
-                          int numSessions,
-                          long connectRetryInterval,
-                          long interactRetryInterval,
-                          long timeoutTime,
-                          boolean allowReceive,
-                          String clientID,
-                          String username,
-                          String password,
-                          JMSVendorAdapter adapter,
-                          JMSURLHelper jmsurl)
-            throws JMSException {
-        super(factory, numRetries, numSessions, connectRetryInterval,
-                interactRetryInterval, timeoutTime, allowReceive,
-                clientID, username, password, adapter, jmsurl);
-    }
-
-    protected javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
-                                                   String username, String password)
-            throws JMSException {
-        TopicConnectionFactory tcf = (TopicConnectionFactory) connectionFactory;
-        if (username == null)
-            return tcf.createTopicConnection();
-
-        return tcf.createTopicConnection(username, password);
-    }
-
-
-    protected SyncConnection createSyncConnection(ConnectionFactory factory,
-                                                  javax.jms.Connection connection,
-                                                  int numSessions,
-                                                  String threadName,
-                                                  String clientID,
-                                                  String username,
-                                                  String password)
+    public TopicConnector(TopicConnectionFactory factory, int numRetries, int numSessions,
+                          long connectRetryInterval, long interactRetryInterval, long timeoutTime,
+                          boolean allowReceive, String clientID, String username, String password,
+                          JMSVendorAdapter adapter, JMSURLHelper jmsurl)
             throws JMSException {
-        return new TopicSyncConnection((TopicConnectionFactory) factory,
-                (TopicConnection) connection, numSessions,
-                threadName, clientID, username, password);
+        super(factory, numRetries, numSessions, connectRetryInterval, interactRetryInterval,
+                timeoutTime, allowReceive, clientID, username, password, adapter, jmsurl);
     }
 
     protected AsyncConnection createAsyncConnection(ConnectionFactory factory,
-                                                    javax.jms.Connection connection,
-                                                    String threadName,
-                                                    String clientID,
-                                                    String username,
+                                                    javax.jms.Connection connection, String threadName, String clientID, String username,
                                                     String password)
             throws JMSException {
         return new TopicAsyncConnection((TopicConnectionFactory) factory,
-                (TopicConnection) connection, threadName,
-                clientID, username, password);
+                (TopicConnection) connection, threadName, clientID,
+                username, password);
     }
 
-    public JMSEndpoint createEndpoint(String destination) {
-        return new TopicEndpoint(destination);
+    private TopicSubscriber createDurableSubscriber(TopicSession session, Topic topic,
+                                                    String subscriptionName, String messageSelector, boolean noLocal)
+            throws JMSException {
+        return session.createDurableSubscriber(topic, subscriptionName, messageSelector, noLocal);
     }
 
     /**
@@ -102,71 +69,73 @@
      * @return
      * @throws JMSException
      */
-    public JMSEndpoint createEndpoint(Destination destination)
-            throws JMSException {
-        if (!(destination instanceof Topic))
+    public JMSEndpoint createEndpoint(Destination destination) throws JMSException {
+        if (!(destination instanceof Topic)) {
             throw new IllegalArgumentException("The input be a topic for this connector");
-        return new TopicDestinationEndpoint((Topic) destination);
-    }
+        }
 
-    private TopicSession createTopicSession(TopicConnection connection, int ackMode)
-            throws JMSException {
-        return connection.createTopicSession(false,
-                ackMode);
+        return new TopicDestinationEndpoint((Topic) destination);
     }
 
-    private Topic createTopic(TopicSession session, String subject)
-            throws Exception {
-        return m_adapter.getTopic(session, subject);
+    public JMSEndpoint createEndpoint(String destination) {
+        return new TopicEndpoint(destination);
     }
 
-    private TopicSubscriber createSubscriber(TopicSession session,
-                                             TopicSubscription subscription)
+    private TopicSubscriber createSubscriber(TopicSession session, TopicSubscription subscription)
             throws Exception {
-        if (subscription.isDurable())
+        if (subscription.isDurable()) {
             return createDurableSubscriber(session,
                     (Topic) subscription.m_endpoint.getDestination(session),
                     subscription.m_subscriptionName,
-                    subscription.m_messageSelector,
-                    subscription.m_noLocal);
-        else
+                    subscription.m_messageSelector, subscription.m_noLocal);
+        } else {
             return createSubscriber(session,
                     (Topic) subscription.m_endpoint.getDestination(session),
-                    subscription.m_messageSelector,
-                    subscription.m_noLocal);
+                    subscription.m_messageSelector, subscription.m_noLocal);
+        }
     }
 
-    private TopicSubscriber createDurableSubscriber(TopicSession session,
-                                                    Topic topic,
-                                                    String subscriptionName,
-                                                    String messageSelector,
-                                                    boolean noLocal)
+    private TopicSubscriber createSubscriber(TopicSession session, Topic topic,
+                                             String messageSelector, boolean noLocal)
             throws JMSException {
-        return session.createDurableSubscriber(topic, subscriptionName,
-                messageSelector, noLocal);
+        return session.createSubscriber(topic, messageSelector, noLocal);
     }
 
-    private TopicSubscriber createSubscriber(TopicSession session,
-                                             Topic topic,
-                                             String messageSelector,
-                                             boolean noLocal)
+    protected SyncConnection createSyncConnection(ConnectionFactory factory,
+                                                  javax.jms.Connection connection, int numSessions, String threadName, String clientID,
+                                                  String username, String password)
             throws JMSException {
-        return session.createSubscriber(topic, messageSelector, noLocal);
+        return new TopicSyncConnection((TopicConnectionFactory) factory,
+                (TopicConnection) connection, numSessions, threadName,
+                clientID, username, password);
     }
 
+    private Topic createTopic(TopicSession session, String subject) throws Exception {
+        return m_adapter.getTopic(session, subject);
+    }
 
-    private final class TopicAsyncConnection extends AsyncConnection {
+    private TopicSession createTopicSession(TopicConnection connection, int ackMode)
+            throws JMSException {
+        return connection.createTopicSession(false, ackMode);
+    }
+
+    protected javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
+                                                   String username, String password)
+            throws JMSException {
+        TopicConnectionFactory tcf = (TopicConnectionFactory) connectionFactory;
 
-        TopicAsyncConnection(TopicConnectionFactory connectionFactory,
-                             TopicConnection connection,
-                             String threadName,
-                             String clientID,
-                             String username,
-                             String password)
+        if (username == null) {
+            return tcf.createTopicConnection();
+        }
+
+        return tcf.createTopicConnection(username, password);
+    }
 
+    private final class TopicAsyncConnection extends AsyncConnection {
+        TopicAsyncConnection(TopicConnectionFactory connectionFactory, TopicConnection connection,
+                             String threadName, String clientID, String username, String password)
                 throws JMSException {
-            super(connectionFactory, connection, threadName,
-                    clientID, username, password);
+            super(connectionFactory, connection, threadName, clientID, username, password);
         }
 
         protected ListenerSession createListenerSession(javax.jms.Connection connection,
@@ -176,14 +145,12 @@
                     subscription.m_ackMode);
             TopicSubscriber subscriber = createSubscriber(session,
                     (TopicSubscription) subscription);
-            return new TopicListenerSession(session, subscriber,
-                    (TopicSubscription) subscription);
+
+            return new TopicListenerSession(session, subscriber, (TopicSubscription) subscription);
         }
 
         private final class TopicListenerSession extends ListenerSession {
-
-            TopicListenerSession(TopicSession session,
-                                 TopicSubscriber subscriber,
+            TopicListenerSession(TopicSession session, TopicSubscriber subscriber,
                                  TopicSubscription subscription)
                     throws Exception {
                 super(session, subscriber, subscription);
@@ -194,83 +161,40 @@
                     m_consumer.close();
                 } catch (Exception ignore) {
                 }
+
                 try {
                     TopicSubscription sub = (TopicSubscription) m_subscription;
+
                     if (sub.isDurable() && sub.m_unsubscribe) {
                         ((TopicSession) m_session).unsubscribe(sub.m_subscriptionName);
                     }
+                } catch (Exception ignore) {
                 }
-                catch (Exception ignore) {
-                }
+
                 try {
                     m_session.close();
                 } catch (Exception ignore) {
                 }
-
             }
         }
     }
 
-    private final class TopicSyncConnection extends SyncConnection {
-        TopicSyncConnection(TopicConnectionFactory connectionFactory,
-                            TopicConnection connection,
-                            int numSessions,
-                            String threadName,
-                            String clientID,
-                            String username,
-                            String password)
 
-                throws JMSException {
-            super(connectionFactory, connection, numSessions, threadName,
-                    clientID, username, password);
-        }
+    private final class TopicDestinationEndpoint extends TopicEndpoint {
+        Topic m_topic;
 
-        protected SendSession createSendSession(javax.jms.Connection connection)
-                throws JMSException {
-            TopicSession session = createTopicSession((TopicConnection) connection,
-                    JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
-            TopicPublisher publisher = session.createPublisher(null);
-            return new TopicSendSession(session, publisher);
+        TopicDestinationEndpoint(Topic topic) throws JMSException {
+            super(topic.getTopicName());
+            m_topic = topic;
         }
 
-        private final class TopicSendSession extends SendSession {
-            TopicSendSession(TopicSession session,
-                             TopicPublisher publisher)
-                    throws JMSException {
-                super(session, publisher);
-            }
-
-
-            protected MessageConsumer createConsumer(Destination destination)
-                    throws JMSException {
-                return createSubscriber((TopicSession) m_session, (Topic) destination,
-                        null, JMSConstants.DEFAULT_NO_LOCAL);
-            }
-
-            protected void deleteTemporaryDestination(Destination destination)
-                    throws JMSException {
-                ((TemporaryTopic) destination).delete();
-            }
-
-
-            protected Destination createTemporaryDestination()
-                    throws JMSException {
-                return ((TopicSession) m_session).createTemporaryTopic();
-            }
-
-            protected void send(Destination destination, Message message,
-                                int deliveryMode, int priority, long timeToLive)
-                    throws JMSException {
-                ((TopicPublisher) m_producer).publish((Topic) destination, message,
-                        deliveryMode, priority, timeToLive);
-            }
-
+        Destination getDestination(Session session) {
+            return m_topic;
         }
     }
 
 
-    private class TopicEndpoint
-            extends JMSEndpoint {
+    private class TopicEndpoint extends JMSEndpoint {
         String m_topicName;
 
         TopicEndpoint(String topicName) {
@@ -278,102 +202,136 @@
             m_topicName = topicName;
         }
 
-        Destination getDestination(Session session)
-                throws Exception {
-            return createTopic((TopicSession) session, m_topicName);
+        protected Subscription createSubscription(MessageListener listener, HashMap properties) {
+            return new TopicSubscription(listener, this, properties);
         }
 
-        protected Subscription createSubscription(MessageListener listener,
-                                                  HashMap properties) {
-            return new TopicSubscription(listener, this, properties);
+        public boolean equals(Object object) {
+            if (!super.equals(object)) {
+                return false;
+            }
+
+            if (!(object instanceof TopicEndpoint)) {
+                return false;
+            }
+
+            return m_topicName.equals(((TopicEndpoint) object).m_topicName);
         }
 
         public String toString() {
             StringBuffer buffer = new StringBuffer("TopicEndpoint:");
+
             buffer.append(m_topicName);
+
             return buffer.toString();
         }
 
-        public boolean equals(Object object) {
-            if (!super.equals(object))
-                return false;
-
-            if (!(object instanceof TopicEndpoint))
-                return false;
-
-            return m_topicName.equals(((TopicEndpoint) object).m_topicName);
+        Destination getDestination(Session session) throws Exception {
+            return createTopic((TopicSession) session, m_topicName);
         }
     }
 
+
     private final class TopicSubscription extends Subscription {
+        boolean m_noLocal;
         String m_subscriptionName;
         boolean m_unsubscribe;
-        boolean m_noLocal;
 
-        TopicSubscription(MessageListener listener,
-                          JMSEndpoint endpoint,
-                          HashMap properties) {
+        TopicSubscription(MessageListener listener, JMSEndpoint endpoint, HashMap properties) {
             super(listener, endpoint, properties);
             m_subscriptionName = MapUtils.removeStringProperty(properties,
-                    JMSConstants.SUBSCRIPTION_NAME,
-                    null);
-            m_unsubscribe = MapUtils.removeBooleanProperty(properties,
-                    JMSConstants.UNSUBSCRIBE,
+                    JMSConstants.SUBSCRIPTION_NAME, null);
+            m_unsubscribe = MapUtils.removeBooleanProperty(properties, JMSConstants.UNSUBSCRIBE,
                     JMSConstants.DEFAULT_UNSUBSCRIBE);
-            m_noLocal = MapUtils.removeBooleanProperty(properties,
-                    JMSConstants.NO_LOCAL,
+            m_noLocal = MapUtils.removeBooleanProperty(properties, JMSConstants.NO_LOCAL,
                     JMSConstants.DEFAULT_NO_LOCAL);
         }
 
-        boolean isDurable() {
-            return m_subscriptionName != null;
-        }
-
         public boolean equals(Object obj) {
-            if (!super.equals(obj))
+            if (!super.equals(obj)) {
                 return false;
-            if (!(obj instanceof TopicSubscription))
+            }
+
+            if (!(obj instanceof TopicSubscription)) {
                 return false;
+            }
 
             TopicSubscription other = (TopicSubscription) obj;
-            if (other.m_unsubscribe != m_unsubscribe || other.m_noLocal != m_noLocal)
+
+            if ((other.m_unsubscribe != m_unsubscribe) || (other.m_noLocal != m_noLocal)) {
                 return false;
+            }
 
             if (isDurable()) {
                 return other.isDurable() && other.m_subscriptionName.equals(m_subscriptionName);
-            } else if (other.isDurable())
+            } else if (other.isDurable()) {
                 return false;
-            else
+            } else {
                 return true;
+            }
         }
 
         public String toString() {
             StringBuffer buffer = new StringBuffer(super.toString());
+
             buffer.append(":").append(m_noLocal).append(":").append(m_unsubscribe);
+
             if (isDurable()) {
                 buffer.append(":");
                 buffer.append(m_subscriptionName);
             }
+
             return buffer.toString();
         }
 
+        boolean isDurable() {
+            return m_subscriptionName != null;
+        }
     }
 
-    private final class TopicDestinationEndpoint
-            extends TopicEndpoint {
-        Topic m_topic;
 
-        TopicDestinationEndpoint(Topic topic)
+    private final class TopicSyncConnection extends SyncConnection {
+        TopicSyncConnection(TopicConnectionFactory connectionFactory, TopicConnection connection,
+                            int numSessions, String threadName, String clientID, String username,
+                            String password)
                 throws JMSException {
-            super(topic.getTopicName());
-            m_topic = topic;
+            super(connectionFactory, connection, numSessions, threadName, clientID, username,
+                    password);
         }
 
-        Destination getDestination(Session session) {
-            return m_topic;
+        protected SendSession createSendSession(javax.jms.Connection connection)
+                throws JMSException {
+            TopicSession session = createTopicSession((TopicConnection) connection,
+                    JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
+            TopicPublisher publisher = session.createPublisher(null);
+
+            return new TopicSendSession(session, publisher);
         }
 
-    }
+        private final class TopicSendSession extends SendSession {
+            TopicSendSession(TopicSession session, TopicPublisher publisher) throws JMSException {
+                super(session, publisher);
+            }
 
+            protected MessageConsumer createConsumer(Destination destination) throws JMSException {
+                return createSubscriber((TopicSession) m_session, (Topic) destination, null,
+                        JMSConstants.DEFAULT_NO_LOCAL);
+            }
 
-}
\ No newline at end of file
+            protected Destination createTemporaryDestination() throws JMSException {
+                return ((TopicSession) m_session).createTemporaryTopic();
+            }
+
+            protected void deleteTemporaryDestination(Destination destination) throws JMSException {
+                ((TemporaryTopic) destination).delete();
+            }
+
+            protected void send(Destination destination, Message message, int deliveryMode,
+                                int priority, long timeToLive)
+                    throws JMSException {
+                ((TopicPublisher) m_producer).publish((Topic) destination, message, deliveryMode,
+                        priority, timeToLive);
+            }
+        }
+    }
+}

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalResponder.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalResponder.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalResponder.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalResponder.java Fri Dec 16 09:13:57 2005
@@ -13,6 +13,8 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
+
+
 package org.apache.axis2.transport.local;
 
 import org.apache.axis2.AxisFault;
@@ -32,30 +34,37 @@
         this.sender = sender;
     }
 
-    public OutputStream startSendWithToAddress(MessageContext msgContext, OutputStream out) throws AxisFault {
-        return out;
+    /**
+     * Clean up
+     *
+     * @param msgContext
+     * @throws org.apache.axis2.AxisFault
+     */
+    public void cleanUp(MessageContext msgContext) throws AxisFault {
     }
 
-    public void finalizeSendWithToAddress(MessageContext msgContext, OutputStream out) throws AxisFault {
+    public void finalizeSendWithOutputStreamFromIncomingConnection(MessageContext msgContext,
+                                                                   OutputStream out)
+            throws AxisFault {
     }
 
-    public OutputStream startSendWithOutputStreamFromIncomingConnection(MessageContext msgContext, OutputStream out) throws AxisFault {
-        return null;  
+    public void finalizeSendWithToAddress(MessageContext msgContext, OutputStream out)
+            throws AxisFault {
     }
 
-    public void finalizeSendWithOutputStreamFromIncomingConnection(MessageContext msgContext, OutputStream out) throws AxisFault {
+    protected OutputStream openTheConnection(EndpointReference epr, MessageContext msgctx)
+            throws AxisFault {
+        return sender.getResponse();
     }
 
-    protected OutputStream openTheConnection(EndpointReference epr, MessageContext msgctx) throws AxisFault {
-        return sender.getResponse();
+    public OutputStream startSendWithOutputStreamFromIncomingConnection(MessageContext msgContext,
+                                                                        OutputStream out)
+            throws AxisFault {
+        return null;
     }
 
-    /**
-     * Clean up
-     *
-     * @param msgContext
-     * @throws org.apache.axis2.AxisFault
-     */
-    public void cleanUp(MessageContext msgContext) throws AxisFault {
+    public OutputStream startSendWithToAddress(MessageContext msgContext, OutputStream out)
+            throws AxisFault {
+        return out;
     }
 }

Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalTransportReceiver.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalTransportReceiver.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalTransportReceiver.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalTransportReceiver.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
 /*
- * Copyright 2004,2005 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
 
 package org.apache.axis2.transport.local;
 
@@ -39,7 +40,6 @@
 
 public class LocalTransportReceiver {
     public static ConfigurationContext CONFIG_CONTEXT;
-
     private ConfigurationContext confContext;
     private LocalTransportSender sender;
 
@@ -52,30 +52,28 @@
         this.sender = sender;
     }
 
-    public void processMessage(InputStream in,
-                               EndpointReference to) throws AxisFault {
+    public void processMessage(InputStream in, EndpointReference to) throws AxisFault {
         try {
-            TransportInDescription tIn =
-                confContext.getAxisConfiguration().getTransportIn(
+            TransportInDescription tIn = confContext.getAxisConfiguration().getTransportIn(
                     new QName(Constants.TRANSPORT_LOCAL));
-            TransportOutDescription tOut =
-                confContext.getAxisConfiguration().getTransportOut(
+            TransportOutDescription tOut = confContext.getAxisConfiguration().getTransportOut(
                     new QName(Constants.TRANSPORT_LOCAL));
+
             tOut.setSender(new LocalResponder(sender));
 
             MessageContext msgCtx = new MessageContext(confContext, tIn, tOut);
+
             msgCtx.setTo(to);
             msgCtx.setServerSide(true);
-
             msgCtx.setProperty(MessageContext.TRANSPORT_OUT, sender.getResponse());
 
-            XMLStreamReader reader =
-                XMLInputFactory.newInstance().createXMLStreamReader(
+            XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(
                     new BufferedReader(new InputStreamReader(in)));
-
             StAXBuilder builder = new StAXSOAPModelBuilder(reader, null);
             SOAPEnvelope envelope = (SOAPEnvelope) builder.getDocumentElement();
+
             msgCtx.setEnvelope(envelope);
+
             AxisEngine engine = new AxisEngine(confContext);
 
             if (envelope.getBody().hasFault()) {
@@ -89,5 +87,4 @@
             throw new AxisFault(e);
         }
     }
-
 }