You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by di...@apache.org on 2005/12/16 18:18:08 UTC
svn commit: r357187 [22/25] - in
/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2: ./
addressing/ client/ client/async/ context/ deployment/ deployment/listener/
deployment/repository/util/ deployment/scheduler/ deployment/util/
descript...
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/QueueConnector.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/QueueConnector.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/QueueConnector.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/QueueConnector.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
/*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.axis2.transport.jms;
@@ -35,27 +36,22 @@
* connections to queues (ptp domain).
*/
public class QueueConnector extends JMSConnector {
-
- public QueueConnector(ConnectionFactory factory,
- int numRetries,
- int numSessions,
- long connectRetryInterval,
- long interactRetryInterval,
- long timeoutTime,
- boolean allowReceive,
- String clientID,
- String username,
- String password,
- JMSVendorAdapter adapter,
- JMSURLHelper jmsurl)
+ public QueueConnector(ConnectionFactory factory, int numRetries, int numSessions,
+ long connectRetryInterval, long interactRetryInterval, long timeoutTime,
+ boolean allowReceive, String clientID, String username, String password,
+ JMSVendorAdapter adapter, JMSURLHelper jmsurl)
throws JMSException {
- super(factory, numRetries, numSessions, connectRetryInterval,
- interactRetryInterval, timeoutTime, allowReceive, clientID,
- username, password, adapter, jmsurl);
+ super(factory, numRetries, numSessions, connectRetryInterval, interactRetryInterval,
+ timeoutTime, allowReceive, clientID, username, password, adapter, jmsurl);
}
- public JMSEndpoint createEndpoint(String destination) {
- return new QueueEndpoint(destination);
+ protected AsyncConnection createAsyncConnection(ConnectionFactory factory,
+ javax.jms.Connection connection, String threadName, String clientID, String username,
+ String password)
+ throws JMSException {
+ return new QueueAsyncConnection((QueueConnectionFactory) factory,
+ (QueueConnection) connection, threadName, clientID,
+ username, password);
}
/**
@@ -65,37 +61,20 @@
* @return
* @throws JMSException
*/
- public JMSEndpoint createEndpoint(Destination destination)
- throws JMSException {
- if (!(destination instanceof Queue))
+ public JMSEndpoint createEndpoint(Destination destination) throws JMSException {
+ if (!(destination instanceof Queue)) {
throw new IllegalArgumentException("The input must be a queue for this connector");
+ }
+
return new QueueDestinationEndpoint((Queue) destination);
}
- protected javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
- String username,
- String password)
- throws JMSException {
- QueueConnectionFactory qcf = (QueueConnectionFactory) connectionFactory;
- if (username == null)
- return qcf.createQueueConnection();
-
- return qcf.createQueueConnection(username, password);
+ public JMSEndpoint createEndpoint(String destination) {
+ return new QueueEndpoint(destination);
}
-
- protected SyncConnection createSyncConnection(ConnectionFactory factory,
- javax.jms.Connection connection,
- int numSessions,
- String threadName,
- String clientID,
- String username,
- String password)
-
- throws JMSException {
- return new QueueSyncConnection((QueueConnectionFactory) factory,
- (QueueConnection) connection, numSessions,
- threadName, clientID, username, password);
+ private Queue createQueue(QueueSession session, String subject) throws Exception {
+ return m_adapter.getQueue(session, subject);
}
private QueueSession createQueueSession(QueueConnection connection, int ackMode)
@@ -103,74 +82,68 @@
return connection.createQueueSession(false, ackMode);
}
- private Queue createQueue(QueueSession session, String subject)
- throws Exception {
- return m_adapter.getQueue(session, subject);
+ private QueueReceiver createReceiver(QueueSession session, Queue queue, String messageSelector)
+ throws JMSException {
+ return session.createReceiver(queue, messageSelector);
}
- private QueueReceiver createReceiver(QueueSession session,
- Queue queue,
- String messageSelector)
+ protected SyncConnection createSyncConnection(ConnectionFactory factory,
+ javax.jms.Connection connection, int numSessions, String threadName, String clientID,
+ String username, String password)
throws JMSException {
- return session.createReceiver(queue, messageSelector);
+ return new QueueSyncConnection((QueueConnectionFactory) factory,
+ (QueueConnection) connection, numSessions, threadName,
+ clientID, username, password);
}
- private final class QueueSyncConnection extends SyncConnection {
- QueueSyncConnection(QueueConnectionFactory connectionFactory,
- QueueConnection connection,
- int numSessions,
- String threadName,
- String clientID,
- String username,
- String password)
- throws JMSException {
- super(connectionFactory, connection, numSessions, threadName,
- clientID, username, password);
+ protected javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
+ String username, String password)
+ throws JMSException {
+ QueueConnectionFactory qcf = (QueueConnectionFactory) connectionFactory;
+
+ if (username == null) {
+ return qcf.createQueueConnection();
}
- protected SendSession createSendSession(javax.jms.Connection connection)
+ return qcf.createQueueConnection(username, password);
+ }
+
+ private final class QueueAsyncConnection extends AsyncConnection {
+ QueueAsyncConnection(QueueConnectionFactory connectionFactory, QueueConnection connection,
+ String threadName, String clientID, String username, String password)
throws JMSException {
- QueueSession session = createQueueSession((QueueConnection) connection,
- JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
- QueueSender sender = session.createSender(null);
- return new QueueSendSession(session, sender);
+ super(connectionFactory, connection, threadName, clientID, username, password);
}
- private final class QueueSendSession extends SendSession {
- QueueSendSession(QueueSession session,
- QueueSender sender)
- throws JMSException {
- super(session, sender);
- }
-
- protected MessageConsumer createConsumer(Destination destination)
- throws JMSException {
- return createReceiver((QueueSession) m_session, (Queue) destination, null);
- }
+ protected ListenerSession createListenerSession(javax.jms.Connection connection,
+ Subscription subscription)
+ throws Exception {
+ QueueSession session = createQueueSession((QueueConnection) connection,
+ subscription.m_ackMode);
+ QueueReceiver receiver = createReceiver(session,
+ (Queue) subscription.m_endpoint.getDestination(session),
+ subscription.m_messageSelector);
+ return new ListenerSession(session, receiver, subscription);
+ }
+ }
- protected Destination createTemporaryDestination()
- throws JMSException {
- return ((QueueSession) m_session).createTemporaryQueue();
- }
- protected void deleteTemporaryDestination(Destination destination)
- throws JMSException {
- ((TemporaryQueue) destination).delete();
- }
+ private final class QueueDestinationEndpoint extends QueueEndpoint {
+ Queue m_queue;
- protected void send(Destination destination, Message message,
- int deliveryMode, int priority, long timeToLive)
- throws JMSException {
- ((QueueSender) m_producer).send((Queue) destination, message,
- deliveryMode, priority, timeToLive);
- }
+ QueueDestinationEndpoint(Queue queue) throws JMSException {
+ super(queue.getQueueName());
+ m_queue = queue;
+ }
+ Destination getDestination(Session session) {
+ return m_queue;
}
}
- private class QueueEndpoint
- extends JMSEndpoint {
+
+ private class QueueEndpoint extends JMSEndpoint {
String m_queueName;
QueueEndpoint(String queueName) {
@@ -178,80 +151,73 @@
m_queueName = queueName;
}
- Destination getDestination(Session session)
- throws Exception {
- return createQueue((QueueSession) session, m_queueName);
- }
-
- public String toString() {
- StringBuffer buffer = new StringBuffer("QueueEndpoint:");
- buffer.append(m_queueName);
- return buffer.toString();
- }
-
public boolean equals(Object object) {
- if (!super.equals(object))
+ if (!super.equals(object)) {
return false;
+ }
- if (!(object instanceof QueueEndpoint))
+ if (!(object instanceof QueueEndpoint)) {
return false;
+ }
return m_queueName.equals(((QueueEndpoint) object).m_queueName);
}
- }
+ public String toString() {
+ StringBuffer buffer = new StringBuffer("QueueEndpoint:");
- private final class QueueDestinationEndpoint
- extends QueueEndpoint {
- Queue m_queue;
+ buffer.append(m_queueName);
- QueueDestinationEndpoint(Queue queue)
- throws JMSException {
- super(queue.getQueueName());
- m_queue = queue;
+ return buffer.toString();
}
- Destination getDestination(Session session) {
- return m_queue;
+ Destination getDestination(Session session) throws Exception {
+ return createQueue((QueueSession) session, m_queueName);
}
-
- }
-
- protected AsyncConnection createAsyncConnection(ConnectionFactory factory,
- javax.jms.Connection connection,
- String threadName,
- String clientID,
- String username,
- String password)
- throws JMSException {
- return new QueueAsyncConnection((QueueConnectionFactory) factory,
- (QueueConnection) connection, threadName,
- clientID, username, password);
}
- private final class QueueAsyncConnection extends AsyncConnection {
- QueueAsyncConnection(QueueConnectionFactory connectionFactory,
- QueueConnection connection,
- String threadName,
- String clientID,
- String username,
- String password)
+ private final class QueueSyncConnection extends SyncConnection {
+ QueueSyncConnection(QueueConnectionFactory connectionFactory, QueueConnection connection,
+ int numSessions, String threadName, String clientID, String username,
+ String password)
throws JMSException {
- super(connectionFactory, connection, threadName, clientID, username, password);
+ super(connectionFactory, connection, numSessions, threadName, clientID, username,
+ password);
}
- protected ListenerSession createListenerSession(javax.jms.Connection connection,
- Subscription subscription)
- throws Exception {
+ protected SendSession createSendSession(javax.jms.Connection connection)
+ throws JMSException {
QueueSession session = createQueueSession((QueueConnection) connection,
- subscription.m_ackMode);
- QueueReceiver receiver = createReceiver(session,
- (Queue) subscription.m_endpoint.getDestination(session),
- subscription.m_messageSelector);
- return new ListenerSession(session, receiver, subscription);
+ JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
+ QueueSender sender = session.createSender(null);
+
+ return new QueueSendSession(session, sender);
}
- }
+ private final class QueueSendSession extends SendSession {
+ QueueSendSession(QueueSession session, QueueSender sender) throws JMSException {
+ super(session, sender);
+ }
-}
\ No newline at end of file
+ protected MessageConsumer createConsumer(Destination destination) throws JMSException {
+ return createReceiver((QueueSession) m_session, (Queue) destination, null);
+ }
+
+ protected Destination createTemporaryDestination() throws JMSException {
+ return ((QueueSession) m_session).createTemporaryQueue();
+ }
+
+ protected void deleteTemporaryDestination(Destination destination) throws JMSException {
+ ((TemporaryQueue) destination).delete();
+ }
+
+ protected void send(Destination destination, Message message, int deliveryMode,
+ int priority, long timeToLive)
+ throws JMSException {
+ ((QueueSender) m_producer).send((Queue) destination, message, deliveryMode,
+ priority, timeToLive);
+ }
+ }
+ }
+}
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSListener.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSListener.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSListener.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSListener.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
/*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.axis2.transport.jms;
@@ -37,7 +38,6 @@
import java.util.Iterator;
import java.util.Properties;
-
/**
* SimpleJMSListener implements the javax.jms.MessageListener interface. Its
* basic purpose is listen asynchronously for messages and to pass them off
@@ -48,72 +48,112 @@
* for production code, but for demos, debugging, and performance profiling.
*/
public class SimpleJMSListener extends TransportListener implements MessageListener {
- protected static Log log =
- LogFactory.getLog(SimpleJMSListener.class.getName());
+ protected static Log log = LogFactory.getLog(SimpleJMSListener.class.getName());
// Do we use (multiple) threads to process incoming messages?
private boolean doThreads = true;
-
+ protected ConfigurationContext configurationContext;
private JMSConnector connector;
+ private String destination;
private JMSEndpoint endpoint;
private HashMap properties;
- private String destination;
- protected ConfigurationContext configurationContext;
public SimpleJMSListener() {
+ }
+
+ public SimpleJMSListener(String repositoryDirectory, HashMap connectorMap, HashMap cfMap,
+ String destination, String username, String password,
+ boolean doThreads)
+ throws Exception {
+ ConfigurationContextFactory erfac = new ConfigurationContextFactory();
+
+ this.configurationContext = erfac.buildConfigurationContext(repositoryDirectory);
+ this.doThreads = doThreads;
+ initListener(connectorMap, cfMap, username, password, destination);
+ }
+
+ public static final HashMap createCFMap(OptionsParser optionsParser) throws IOException {
+ String cfFile = optionsParser.isValueSet('c');
+ if (cfFile == null) {
+ return null;
+ }
+
+ Properties cfProps = new Properties();
+
+ cfProps.load(new BufferedInputStream(new FileInputStream(cfFile)));
+
+ HashMap cfMap = new HashMap(cfProps);
+
+ return cfMap;
+ }
+
+ public static final HashMap createConnectorMap(
+ org.apache.axis2.util.OptionsParser optionsParser) {
+ HashMap connectorMap = new HashMap();
+
+ if (optionsParser.isFlagSet('t') > 0) {
+
+ // queue is default so only setup map if topic domain is required
+ connectorMap.put(JMSConstants.DOMAIN, JMSConstants.DOMAIN_TOPIC);
+ }
+
+ return connectorMap;
}
- public void init(ConfigurationContext axisConf, TransportInDescription transprtIn) throws AxisFault {
+ public void init(ConfigurationContext axisConf, TransportInDescription transprtIn)
+ throws AxisFault {
try {
this.configurationContext = axisConf;
+
HashMap params = new HashMap();
Iterator iterator = transprtIn.getParameters().iterator();
+
while (iterator.hasNext()) {
Parameter param = (Parameter) iterator.next();
+
params.put(param.getName(), param.getValue());
}
- String user = null, password = null, destination = null;
+
+ String user = null,
+ password = null,
+ destination = null;
+
if (transprtIn.getParameter(JNDIVendorAdapter.USER) != null) {
user = (String) transprtIn.getParameter(JNDIVendorAdapter.USER).getValue();
}
+
if (transprtIn.getParameter(JNDIVendorAdapter.PASSWORD) != null) {
password = (String) transprtIn.getParameter(JNDIVendorAdapter.PASSWORD).getValue();
}
+
if (transprtIn.getParameter(JNDIVendorAdapter.DESTINATION) != null) {
- destination = (String) transprtIn.getParameter(JNDIVendorAdapter.DESTINATION).getValue();
+ destination =
+ (String) transprtIn.getParameter(JNDIVendorAdapter.DESTINATION).getValue();
}
+
initListener(params, params, user, password, destination);
} catch (Exception e1) {
throw new AxisFault(e1);
}
}
- public SimpleJMSListener(String repositoryDirectory, HashMap connectorMap, HashMap cfMap,
- String destination, String username,
- String password, boolean doThreads)
+ private void initListener(HashMap connectorMap, HashMap cfMap, String username,
+ String password, String destination)
throws Exception {
- ConfigurationContextFactory erfac = new ConfigurationContextFactory();
- this.configurationContext = erfac.buildConfigurationContext(repositoryDirectory);
- this.doThreads = doThreads;
-
- initListener(connectorMap, cfMap, username, password, destination);
- }
-
- private void initListener(HashMap connectorMap, HashMap cfMap, String username, String password, String destination) throws Exception {
try {
+
// create a JMS connector using the default vendor adapter
JMSVendorAdapter adapter = JMSVendorAdapterFactory.getJMSVendorAdapter();
- this.connector = JMSConnectorFactory.createServerConnector(connectorMap,
- cfMap,
- username,
- password,
- adapter);
+
+ this.connector = JMSConnectorFactory.createServerConnector(connectorMap, cfMap,
+ username, password, adapter);
this.properties = new HashMap(connectorMap);
this.properties.putAll(cfMap);
this.destination = destination;
} catch (Exception e) {
log.error(Messages.getMessage("exception00"), e);
+
throw e;
}
@@ -121,12 +161,21 @@
endpoint = connector.createEndpoint(destination);
}
- protected JMSConnector getConnector() {
- return connector;
- }
+ public static void main(String[] args) throws Exception {
+ OptionsParser optionsParser = new OptionsParser(args);
- public ConfigurationContext getConfigurationContext() {
- return this.configurationContext;
+ // first check if we should print usage
+ if ((optionsParser.isFlagSet('?') > 0) || (optionsParser.isFlagSet('h') > 0)) {
+ printUsage();
+ }
+
+ SimpleJMSListener listener = new SimpleJMSListener(optionsParser.isValueSet('r'),
+ createConnectorMap(optionsParser),
+ createCFMap(optionsParser), optionsParser.isValueSet('d'),
+ optionsParser.getUser(), optionsParser.getPassword(),
+ optionsParser.isFlagSet('s') > 0);
+
+ listener.start();
}
/**
@@ -136,24 +185,44 @@
*/
public void onMessage(javax.jms.Message message) {
try {
+
// pass off the message to a worker as a BytesMessage
- SimpleJMSWorker worker = new SimpleJMSWorker(configurationContext, this, (BytesMessage) message);
+ SimpleJMSWorker worker = new SimpleJMSWorker(configurationContext, this,
+ (BytesMessage) message);
// do we allow multi-threaded workers?
if (doThreads) {
Thread t = new Thread(worker);
+
t.start();
} else {
worker.run();
}
- }
- catch (ClassCastException cce) {
+ } catch (ClassCastException cce) {
log.error(Messages.getMessage("exception00"), cce);
cce.printStackTrace();
+
return;
}
}
+ public static void printUsage() {
+ System.out.println("Usage: SimpleJMSListener [options]");
+ System.out.println(" Opts: -? this message");
+ System.out.println();
+ System.out.println(" -r repository directory location");
+ System.out.println(" -c connection factory properties filename");
+ System.out.println(" -d destination");
+ System.out.println(" -t topic [absence of -t indicates queue]");
+ System.out.println();
+ System.out.println(" -u username");
+ System.out.println(" -w password");
+ System.out.println();
+ System.out.println(" -s single-threaded listener");
+ System.out.println(" [absence of option => multithreaded]");
+ System.exit(1);
+ }
+
public void start() {
try {
endpoint.registerListener(this, properties);
@@ -161,6 +230,7 @@
log.error(Messages.getMessage("exception00"), e);
e.printStackTrace();
}
+
connector.start();
}
@@ -175,75 +245,29 @@
}
}
- public EndpointReference getReplyToEPR(String serviceName) throws AxisFault {
- try {
- JMSURLHelper url = new JMSURLHelper("jms:/" + destination);
- url.getProperties().putAll(properties);
- return new EndpointReference(url.getURLString());
- } catch (Exception e) {
- log.error(Messages.getMessage("exception00"), e);
- throw AxisFault.makeFault(e);
- }
+ public ConfigurationContext getConfigurationContext() {
+ return this.configurationContext;
}
- public static final HashMap createConnectorMap(org.apache.axis2.util.OptionsParser optionsParser) {
- HashMap connectorMap = new HashMap();
- if (optionsParser.isFlagSet('t') > 0) {
- //queue is default so only setup map if topic domain is required
- connectorMap.put(JMSConstants.DOMAIN, JMSConstants.DOMAIN_TOPIC);
- }
- return connectorMap;
+ protected JMSConnector getConnector() {
+ return connector;
}
- public static final HashMap createCFMap(OptionsParser optionsParser)
- throws IOException {
- String cfFile = optionsParser.isValueSet('c');
- if (cfFile == null)
- return null;
-
- Properties cfProps = new Properties();
- cfProps.load(new BufferedInputStream(new FileInputStream(cfFile)));
- HashMap cfMap = new HashMap(cfProps);
- return cfMap;
+ public HashMap getProperties() {
+ return properties;
}
- public static void main(String[] args) throws Exception {
- OptionsParser optionsParser = new OptionsParser(args);
-
- // first check if we should print usage
- if ((optionsParser.isFlagSet('?') > 0) || (optionsParser.isFlagSet('h') > 0))
- printUsage();
-
- SimpleJMSListener listener = new SimpleJMSListener(
- optionsParser.isValueSet('r'),
- createConnectorMap(optionsParser),
- createCFMap(optionsParser),
- optionsParser.isValueSet('d'),
- optionsParser.getUser(),
- optionsParser.getPassword(),
- optionsParser.isFlagSet('s') > 0);
- listener.start();
- }
+ public EndpointReference getReplyToEPR(String serviceName) throws AxisFault {
+ try {
+ JMSURLHelper url = new JMSURLHelper("jms:/" + destination);
- public static void printUsage() {
- System.out.println("Usage: SimpleJMSListener [options]");
- System.out.println(" Opts: -? this message");
- System.out.println();
- System.out.println(" -r repository directory location");
- System.out.println(" -c connection factory properties filename");
- System.out.println(" -d destination");
- System.out.println(" -t topic [absence of -t indicates queue]");
- System.out.println();
- System.out.println(" -u username");
- System.out.println(" -w password");
- System.out.println();
- System.out.println(" -s single-threaded listener");
- System.out.println(" [absence of option => multithreaded]");
+ url.getProperties().putAll(properties);
- System.exit(1);
- }
+ return new EndpointReference(url.getURLString());
+ } catch (Exception e) {
+ log.error(Messages.getMessage("exception00"), e);
- public HashMap getProperties() {
- return properties;
+ throw AxisFault.makeFault(e);
+ }
}
}
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
/*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.axis2.transport.jms;
@@ -20,8 +21,8 @@
import org.apache.axis2.Constants;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
-import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.description.TransportInDescription;
+import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.i18n.Messages;
import org.apache.axis2.om.OMException;
@@ -58,65 +59,181 @@
* the server, and sends back response msg to the replyTo destination.
*/
public class SimpleJMSWorker implements Runnable {
- protected static Log log =
- LogFactory.getLog(SimpleJMSWorker.class.getName());
-
+ protected static Log log = LogFactory.getLog(SimpleJMSWorker.class.getName());
+ private ConfigurationContext configurationContext;
SimpleJMSListener listener;
BytesMessage message;
- private ConfigurationContext configurationContext;
- public SimpleJMSWorker(ConfigurationContext configurationContext, SimpleJMSListener listener, BytesMessage message) {
+ public SimpleJMSWorker(ConfigurationContext configurationContext, SimpleJMSListener listener,
+ BytesMessage message) {
this.listener = listener;
this.message = message;
this.configurationContext = configurationContext;
}
+ public static void processJMSRequest(MessageContext msgContext, InputStream in,
+ String contentType)
+ throws AxisFault {
+ boolean soap11 = false;
+
+ try {
+ msgContext.setServerSide(true);
+
+ SOAPEnvelope envelope = null;
+ StAXBuilder builder = null;
+
+ if (contentType != null) {
+ if (contentType.indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED) > -1) {
+
+ // It is MTOM
+ builder = HTTPTransportUtils.selectBuilderForMIME(msgContext, in, contentType);
+ envelope = (SOAPEnvelope) builder.getDocumentElement();
+ } else {
+ Reader reader = new InputStreamReader(in);
+ XMLStreamReader xmlreader;
+
+ // Figure out the char set encoding and create the reader
+
+ // If charset is not specified
+ if (TransportUtils.getCharSetEncoding(contentType) == null) {
+ xmlreader = XMLInputFactory.newInstance().createXMLStreamReader(in,
+ MessageContext.DEFAULT_CHAR_SET_ENCODING);
+
+ // Set the encoding scheme in the message context
+ msgContext.setProperty(MessageContext.CHARACTER_SET_ENCODING,
+ MessageContext.DEFAULT_CHAR_SET_ENCODING);
+ } else {
+
+ // get the type of char encoding
+ String charSetEnc = TransportUtils.getCharSetEncoding(contentType);
+
+ xmlreader = XMLInputFactory.newInstance().createXMLStreamReader(in,
+ charSetEnc);
+
+ // Setting the value in msgCtx
+ msgContext.setProperty(MessageContext.CHARACTER_SET_ENCODING, charSetEnc);
+ }
+
+ if (contentType.indexOf(SOAP12Constants.SOAP_12_CONTENT_TYPE) > -1) {
+ soap11 = false;
+
+ // it is SOAP 1.2
+ builder =
+ new StAXSOAPModelBuilder(xmlreader,
+ SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+ envelope = (SOAPEnvelope) builder.getDocumentElement();
+ } else if (contentType.indexOf(SOAP11Constants.SOAP_11_CONTENT_TYPE) > -1) {
+ soap11 = true;
+ builder =
+ new StAXSOAPModelBuilder(xmlreader,
+ SOAP11Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+ envelope = (SOAPEnvelope) builder.getDocumentElement();
+ }
+ }
+ }
+
+ String charsetEncoding = builder.getDocument().getCharsetEncoding();
+
+ if ((charsetEncoding != null) && !"".equals(charsetEncoding)
+ && !((String) msgContext.getProperty(
+ MessageContext.CHARACTER_SET_ENCODING)).equalsIgnoreCase(charsetEncoding)) {
+ String faultCode;
+
+ if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(
+ envelope.getNamespace().getName())) {
+ faultCode = SOAP12Constants.FAULT_CODE_SENDER;
+ } else {
+ faultCode = SOAP11Constants.FAULT_CODE_SENDER;
+ }
+
+ throw new AxisFault(
+ "Character Set Encoding from " + "transport information do not match with "
+ + "character set encoding in the received SOAP message", faultCode);
+ }
+
+ msgContext.setEnvelope(envelope);
+
+ AxisEngine engine = new AxisEngine(msgContext.getConfigurationContext());
+
+ if (envelope.getBody().hasFault()) {
+ engine.receiveFault(msgContext);
+ } else {
+ engine.receive(msgContext);
+ }
+ } catch (SOAPProcessingException e) {
+ throw new AxisFault(e);
+ } catch (AxisFault e) {
+
+ // rethrow
+ throw e;
+ } catch (OMException e) {
+ throw new AxisFault(e);
+ } catch (XMLStreamException e) {
+ throw new AxisFault(e);
+ } catch (FactoryConfigurationError e) {
+ throw new AxisFault(e);
+ } catch (UnsupportedEncodingException e) {
+ throw new AxisFault(e);
+ } finally {
+ if ((msgContext.getEnvelope() == null) && !soap11) {
+ msgContext.setEnvelope(new SOAP12Factory().createSOAPEnvelope());
+ }
+ }
+ }
+
/**
* This is where the incoming message is processed.
*/
public void run() {
InputStream in = null;
+
try {
+
// get the incoming msg content into a byte array
- byte[] buffer = new byte[8 * 1024];
+ byte[] buffer = new byte[8 * 1024];
ByteArrayOutputStream out = new ByteArrayOutputStream();
- for (int bytesRead = message.readBytes(buffer);
- bytesRead != -1; bytesRead = message.readBytes(buffer)) {
+
+ for (int bytesRead = message.readBytes(buffer); bytesRead != -1;
+ bytesRead = message.readBytes(buffer)) {
out.write(buffer, 0, bytesRead);
}
+
in = new ByteArrayInputStream(out.toByteArray());
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error(Messages.getMessage("exception00"), e);
e.printStackTrace();
+
return;
}
// if the incoming message has a contentType set,
// pass it to my new Message
String contentType = null;
+
try {
contentType = message.getStringProperty("contentType");
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error(Messages.getMessage("exception00"), e);
e.printStackTrace();
+
return;
}
// if the incoming message has a contentType set,
// pass it to my new Message
String soapAction = null;
+
try {
soapAction = message.getStringProperty("SOAPAction");
- }
- catch (Exception e) {
+ } catch (Exception e) {
log.error(Messages.getMessage("exception00"), e);
e.printStackTrace();
+
return;
}
MessageContext msgContext;
+
try {
TransportInDescription transportIn =
configurationContext.getAxisConfiguration().getTransportIn(
@@ -124,163 +241,33 @@
TransportOutDescription transportOut =
configurationContext.getAxisConfiguration().getTransportOut(
new QName(Constants.TRANSPORT_JMS));
- msgContext = new MessageContext(
- configurationContext,
- transportIn,
- transportOut);
- msgContext.setProperty(
- Constants.OUT_TRANSPORT_INFO,
- new JMSOutTransportInfo(message.getJMSReplyTo(), listener.getProperties()));
+
+ msgContext = new MessageContext(configurationContext, transportIn, transportOut);
+ msgContext.setProperty(Constants.OUT_TRANSPORT_INFO,
+ new JMSOutTransportInfo(message.getJMSReplyTo(),
+ listener.getProperties()));
msgContext.setTransportOut(transportOut);
msgContext.setServerSide(true);
} catch (Exception e) {
log.error(Messages.getMessage("exception00"), e);
e.printStackTrace();
+
return;
}
msgContext.setServiceGroupContextId(UUIDGenerator.getUUID());
+
if (soapAction != null) {
msgContext.setSoapAction(soapAction);
}
try {
- processJMSRequest(
- msgContext,
- in,
- contentType
- );
+ processJMSRequest(msgContext, in, contentType);
} catch (Exception e) {
log.error(Messages.getMessage("exception00"), e);
e.printStackTrace();
- return;
- }
- }
-
- public static void processJMSRequest(
- MessageContext msgContext,
- InputStream in,
- String contentType
- )
- throws AxisFault {
- boolean soap11 = false;
- try {
- msgContext.setServerSide(true);
-
- SOAPEnvelope envelope = null;
- StAXBuilder builder = null;
- if (contentType != null) {
- if (contentType
- .indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED)
- > -1) {
- //It is MTOM
- builder = HTTPTransportUtils.selectBuilderForMIME(msgContext, in, contentType);
- envelope = (SOAPEnvelope) builder.getDocumentElement();
- } else {
- Reader reader = new InputStreamReader(in);
-
- XMLStreamReader xmlreader;
- //Figure out the char set encoding and create the reader
-
- //If charset is not specified
- if (TransportUtils.getCharSetEncoding(contentType) == null) {
- xmlreader =
- XMLInputFactory
- .newInstance()
- .createXMLStreamReader(
- in,
- MessageContext.DEFAULT_CHAR_SET_ENCODING);
- //Set the encoding scheme in the message context
- msgContext.setProperty(
- MessageContext.CHARACTER_SET_ENCODING,
- MessageContext.DEFAULT_CHAR_SET_ENCODING);
- } else {
- //get the type of char encoding
- String charSetEnc = TransportUtils.getCharSetEncoding(contentType);
- xmlreader =
- XMLInputFactory
- .newInstance()
- .createXMLStreamReader(
- in,
- charSetEnc);
-
- //Setting the value in msgCtx
- msgContext.setProperty(
- MessageContext.CHARACTER_SET_ENCODING,
- charSetEnc);
-
- }
- if (contentType
- .indexOf(SOAP12Constants.SOAP_12_CONTENT_TYPE)
- > -1) {
- soap11 = false;
- //it is SOAP 1.2
- builder =
- new StAXSOAPModelBuilder(
- xmlreader,
- SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI);
- envelope = (SOAPEnvelope) builder.getDocumentElement();
- } else if (
- contentType.indexOf(
- SOAP11Constants.SOAP_11_CONTENT_TYPE)
- > -1) {
- soap11 = true;
- builder =
- new StAXSOAPModelBuilder(
- xmlreader,
- SOAP11Constants
- .SOAP_ENVELOPE_NAMESPACE_URI);
- envelope =
- (SOAPEnvelope) builder.getDocumentElement();
- }
-
- }
-
- }
-
- String charsetEncoding = builder.getDocument().getCharsetEncoding();
- if (charsetEncoding != null && !"".equals(charsetEncoding) &&
- !((String) msgContext.getProperty(MessageContext.CHARACTER_SET_ENCODING))
- .equalsIgnoreCase(charsetEncoding)) {
- String faultCode;
- if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(envelope.getNamespace().getName())) {
- faultCode = SOAP12Constants.FAULT_CODE_SENDER;
- } else {
- faultCode = SOAP11Constants.FAULT_CODE_SENDER;
- }
- throw new AxisFault("Character Set Encoding from " +
- "transport information do not match with " +
- "character set encoding in the received SOAP message", faultCode);
- }
-
-
- msgContext.setEnvelope(envelope);
- AxisEngine engine = new AxisEngine(msgContext.getConfigurationContext());
- if (envelope.getBody().hasFault()) {
- engine.receiveFault(msgContext);
- } else {
- engine.receive(msgContext);
- }
- } catch (SOAPProcessingException e) {
- throw new AxisFault(e);
-
- } catch (AxisFault e) {
- //rethrow
- throw e;
- } catch (OMException e) {
- throw new AxisFault(e);
- } catch (XMLStreamException e) {
- throw new AxisFault(e);
- } catch (FactoryConfigurationError e) {
- throw new AxisFault(e);
- } catch (UnsupportedEncodingException e) {
- throw new AxisFault(e);
- } finally {
- if (msgContext.getEnvelope() == null && !soap11) {
- msgContext.setEnvelope(
- new SOAP12Factory().createSOAPEnvelope());
- }
+ return;
}
}
}
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
/*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.axis2.transport.jms;
@@ -24,48 +25,47 @@
*/
public class Subscription {
- MessageListener m_listener;
+ int m_ackMode;
JMSEndpoint m_endpoint;
+ MessageListener m_listener;
String m_messageSelector;
- int m_ackMode;
- Subscription(MessageListener listener,
- JMSEndpoint endpoint,
- HashMap properties) {
+ Subscription(MessageListener listener, JMSEndpoint endpoint, HashMap properties) {
m_listener = listener;
m_endpoint = endpoint;
- m_messageSelector = MapUtils.removeStringProperty(
- properties,
- JMSConstants.MESSAGE_SELECTOR,
- null);
- m_ackMode = MapUtils.removeIntProperty(properties,
- JMSConstants.ACKNOWLEDGE_MODE,
+ m_messageSelector = MapUtils.removeStringProperty(properties,
+ JMSConstants.MESSAGE_SELECTOR, null);
+ m_ackMode = MapUtils.removeIntProperty(properties, JMSConstants.ACKNOWLEDGE_MODE,
JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
}
- public int hashCode() {
- return toString().hashCode();
- }
-
public boolean equals(Object obj) {
- if (obj == null || !(obj instanceof Subscription))
+ if ((obj == null) || !(obj instanceof Subscription)) {
return false;
+ }
+
Subscription other = (Subscription) obj;
+
if (m_messageSelector == null) {
- if (other.m_messageSelector != null)
+ if (other.m_messageSelector != null) {
return false;
+ }
} else {
- if (other.m_messageSelector == null ||
- !other.m_messageSelector.equals(m_messageSelector))
+ if ((other.m_messageSelector == null)
+ || !other.m_messageSelector.equals(m_messageSelector)) {
return false;
+ }
}
- return m_ackMode == other.m_ackMode &&
- m_endpoint.equals(other.m_endpoint) &&
- other.m_listener.equals(m_listener);
+
+ return (m_ackMode == other.m_ackMode) && m_endpoint.equals(other.m_endpoint)
+ && other.m_listener.equals(m_listener);
+ }
+
+ public int hashCode() {
+ return toString().hashCode();
}
public String toString() {
return m_listener.toString();
}
-
-}
\ No newline at end of file
+}
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
/*
- * Copyright 2001, 2002,2004 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2001, 2002,2004 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.axis2.transport.jms;
@@ -37,62 +38,28 @@
* connections to topics (pub-sub domain).
*/
public class TopicConnector extends JMSConnector {
- public TopicConnector(TopicConnectionFactory factory,
- int numRetries,
- int numSessions,
- long connectRetryInterval,
- long interactRetryInterval,
- long timeoutTime,
- boolean allowReceive,
- String clientID,
- String username,
- String password,
- JMSVendorAdapter adapter,
- JMSURLHelper jmsurl)
- throws JMSException {
- super(factory, numRetries, numSessions, connectRetryInterval,
- interactRetryInterval, timeoutTime, allowReceive,
- clientID, username, password, adapter, jmsurl);
- }
-
- protected javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
- String username, String password)
- throws JMSException {
- TopicConnectionFactory tcf = (TopicConnectionFactory) connectionFactory;
- if (username == null)
- return tcf.createTopicConnection();
-
- return tcf.createTopicConnection(username, password);
- }
-
-
- protected SyncConnection createSyncConnection(ConnectionFactory factory,
- javax.jms.Connection connection,
- int numSessions,
- String threadName,
- String clientID,
- String username,
- String password)
+ public TopicConnector(TopicConnectionFactory factory, int numRetries, int numSessions,
+ long connectRetryInterval, long interactRetryInterval, long timeoutTime,
+ boolean allowReceive, String clientID, String username, String password,
+ JMSVendorAdapter adapter, JMSURLHelper jmsurl)
throws JMSException {
- return new TopicSyncConnection((TopicConnectionFactory) factory,
- (TopicConnection) connection, numSessions,
- threadName, clientID, username, password);
+ super(factory, numRetries, numSessions, connectRetryInterval, interactRetryInterval,
+ timeoutTime, allowReceive, clientID, username, password, adapter, jmsurl);
}
protected AsyncConnection createAsyncConnection(ConnectionFactory factory,
- javax.jms.Connection connection,
- String threadName,
- String clientID,
- String username,
+ javax.jms.Connection connection, String threadName, String clientID, String username,
String password)
throws JMSException {
return new TopicAsyncConnection((TopicConnectionFactory) factory,
- (TopicConnection) connection, threadName,
- clientID, username, password);
+ (TopicConnection) connection, threadName, clientID,
+ username, password);
}
- public JMSEndpoint createEndpoint(String destination) {
- return new TopicEndpoint(destination);
+ private TopicSubscriber createDurableSubscriber(TopicSession session, Topic topic,
+ String subscriptionName, String messageSelector, boolean noLocal)
+ throws JMSException {
+ return session.createDurableSubscriber(topic, subscriptionName, messageSelector, noLocal);
}
/**
@@ -102,71 +69,73 @@
* @return
* @throws JMSException
*/
- public JMSEndpoint createEndpoint(Destination destination)
- throws JMSException {
- if (!(destination instanceof Topic))
+ public JMSEndpoint createEndpoint(Destination destination) throws JMSException {
+ if (!(destination instanceof Topic)) {
throw new IllegalArgumentException("The input be a topic for this connector");
- return new TopicDestinationEndpoint((Topic) destination);
- }
+ }
- private TopicSession createTopicSession(TopicConnection connection, int ackMode)
- throws JMSException {
- return connection.createTopicSession(false,
- ackMode);
+ return new TopicDestinationEndpoint((Topic) destination);
}
- private Topic createTopic(TopicSession session, String subject)
- throws Exception {
- return m_adapter.getTopic(session, subject);
+ public JMSEndpoint createEndpoint(String destination) {
+ return new TopicEndpoint(destination);
}
- private TopicSubscriber createSubscriber(TopicSession session,
- TopicSubscription subscription)
+ private TopicSubscriber createSubscriber(TopicSession session, TopicSubscription subscription)
throws Exception {
- if (subscription.isDurable())
+ if (subscription.isDurable()) {
return createDurableSubscriber(session,
(Topic) subscription.m_endpoint.getDestination(session),
subscription.m_subscriptionName,
- subscription.m_messageSelector,
- subscription.m_noLocal);
- else
+ subscription.m_messageSelector, subscription.m_noLocal);
+ } else {
return createSubscriber(session,
(Topic) subscription.m_endpoint.getDestination(session),
- subscription.m_messageSelector,
- subscription.m_noLocal);
+ subscription.m_messageSelector, subscription.m_noLocal);
+ }
}
- private TopicSubscriber createDurableSubscriber(TopicSession session,
- Topic topic,
- String subscriptionName,
- String messageSelector,
- boolean noLocal)
+ private TopicSubscriber createSubscriber(TopicSession session, Topic topic,
+ String messageSelector, boolean noLocal)
throws JMSException {
- return session.createDurableSubscriber(topic, subscriptionName,
- messageSelector, noLocal);
+ return session.createSubscriber(topic, messageSelector, noLocal);
}
- private TopicSubscriber createSubscriber(TopicSession session,
- Topic topic,
- String messageSelector,
- boolean noLocal)
+ protected SyncConnection createSyncConnection(ConnectionFactory factory,
+ javax.jms.Connection connection, int numSessions, String threadName, String clientID,
+ String username, String password)
throws JMSException {
- return session.createSubscriber(topic, messageSelector, noLocal);
+ return new TopicSyncConnection((TopicConnectionFactory) factory,
+ (TopicConnection) connection, numSessions, threadName,
+ clientID, username, password);
}
+ private Topic createTopic(TopicSession session, String subject) throws Exception {
+ return m_adapter.getTopic(session, subject);
+ }
- private final class TopicAsyncConnection extends AsyncConnection {
+ private TopicSession createTopicSession(TopicConnection connection, int ackMode)
+ throws JMSException {
+ return connection.createTopicSession(false, ackMode);
+ }
+
+ protected javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
+ String username, String password)
+ throws JMSException {
+ TopicConnectionFactory tcf = (TopicConnectionFactory) connectionFactory;
- TopicAsyncConnection(TopicConnectionFactory connectionFactory,
- TopicConnection connection,
- String threadName,
- String clientID,
- String username,
- String password)
+ if (username == null) {
+ return tcf.createTopicConnection();
+ }
+
+ return tcf.createTopicConnection(username, password);
+ }
+ private final class TopicAsyncConnection extends AsyncConnection {
+ TopicAsyncConnection(TopicConnectionFactory connectionFactory, TopicConnection connection,
+ String threadName, String clientID, String username, String password)
throws JMSException {
- super(connectionFactory, connection, threadName,
- clientID, username, password);
+ super(connectionFactory, connection, threadName, clientID, username, password);
}
protected ListenerSession createListenerSession(javax.jms.Connection connection,
@@ -176,14 +145,12 @@
subscription.m_ackMode);
TopicSubscriber subscriber = createSubscriber(session,
(TopicSubscription) subscription);
- return new TopicListenerSession(session, subscriber,
- (TopicSubscription) subscription);
+
+ return new TopicListenerSession(session, subscriber, (TopicSubscription) subscription);
}
private final class TopicListenerSession extends ListenerSession {
-
- TopicListenerSession(TopicSession session,
- TopicSubscriber subscriber,
+ TopicListenerSession(TopicSession session, TopicSubscriber subscriber,
TopicSubscription subscription)
throws Exception {
super(session, subscriber, subscription);
@@ -194,83 +161,40 @@
m_consumer.close();
} catch (Exception ignore) {
}
+
try {
TopicSubscription sub = (TopicSubscription) m_subscription;
+
if (sub.isDurable() && sub.m_unsubscribe) {
((TopicSession) m_session).unsubscribe(sub.m_subscriptionName);
}
+ } catch (Exception ignore) {
}
- catch (Exception ignore) {
- }
+
try {
m_session.close();
} catch (Exception ignore) {
}
-
}
}
}
- private final class TopicSyncConnection extends SyncConnection {
- TopicSyncConnection(TopicConnectionFactory connectionFactory,
- TopicConnection connection,
- int numSessions,
- String threadName,
- String clientID,
- String username,
- String password)
- throws JMSException {
- super(connectionFactory, connection, numSessions, threadName,
- clientID, username, password);
- }
+ private final class TopicDestinationEndpoint extends TopicEndpoint {
+ Topic m_topic;
- protected SendSession createSendSession(javax.jms.Connection connection)
- throws JMSException {
- TopicSession session = createTopicSession((TopicConnection) connection,
- JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
- TopicPublisher publisher = session.createPublisher(null);
- return new TopicSendSession(session, publisher);
+ TopicDestinationEndpoint(Topic topic) throws JMSException {
+ super(topic.getTopicName());
+ m_topic = topic;
}
- private final class TopicSendSession extends SendSession {
- TopicSendSession(TopicSession session,
- TopicPublisher publisher)
- throws JMSException {
- super(session, publisher);
- }
-
-
- protected MessageConsumer createConsumer(Destination destination)
- throws JMSException {
- return createSubscriber((TopicSession) m_session, (Topic) destination,
- null, JMSConstants.DEFAULT_NO_LOCAL);
- }
-
- protected void deleteTemporaryDestination(Destination destination)
- throws JMSException {
- ((TemporaryTopic) destination).delete();
- }
-
-
- protected Destination createTemporaryDestination()
- throws JMSException {
- return ((TopicSession) m_session).createTemporaryTopic();
- }
-
- protected void send(Destination destination, Message message,
- int deliveryMode, int priority, long timeToLive)
- throws JMSException {
- ((TopicPublisher) m_producer).publish((Topic) destination, message,
- deliveryMode, priority, timeToLive);
- }
-
+ Destination getDestination(Session session) {
+ return m_topic;
}
}
- private class TopicEndpoint
- extends JMSEndpoint {
+ private class TopicEndpoint extends JMSEndpoint {
String m_topicName;
TopicEndpoint(String topicName) {
@@ -278,102 +202,136 @@
m_topicName = topicName;
}
- Destination getDestination(Session session)
- throws Exception {
- return createTopic((TopicSession) session, m_topicName);
+ protected Subscription createSubscription(MessageListener listener, HashMap properties) {
+ return new TopicSubscription(listener, this, properties);
}
- protected Subscription createSubscription(MessageListener listener,
- HashMap properties) {
- return new TopicSubscription(listener, this, properties);
+ public boolean equals(Object object) {
+ if (!super.equals(object)) {
+ return false;
+ }
+
+ if (!(object instanceof TopicEndpoint)) {
+ return false;
+ }
+
+ return m_topicName.equals(((TopicEndpoint) object).m_topicName);
}
public String toString() {
StringBuffer buffer = new StringBuffer("TopicEndpoint:");
+
buffer.append(m_topicName);
+
return buffer.toString();
}
- public boolean equals(Object object) {
- if (!super.equals(object))
- return false;
-
- if (!(object instanceof TopicEndpoint))
- return false;
-
- return m_topicName.equals(((TopicEndpoint) object).m_topicName);
+ Destination getDestination(Session session) throws Exception {
+ return createTopic((TopicSession) session, m_topicName);
}
}
+
private final class TopicSubscription extends Subscription {
+ boolean m_noLocal;
String m_subscriptionName;
boolean m_unsubscribe;
- boolean m_noLocal;
- TopicSubscription(MessageListener listener,
- JMSEndpoint endpoint,
- HashMap properties) {
+ TopicSubscription(MessageListener listener, JMSEndpoint endpoint, HashMap properties) {
super(listener, endpoint, properties);
m_subscriptionName = MapUtils.removeStringProperty(properties,
- JMSConstants.SUBSCRIPTION_NAME,
- null);
- m_unsubscribe = MapUtils.removeBooleanProperty(properties,
- JMSConstants.UNSUBSCRIBE,
+ JMSConstants.SUBSCRIPTION_NAME, null);
+ m_unsubscribe = MapUtils.removeBooleanProperty(properties, JMSConstants.UNSUBSCRIBE,
JMSConstants.DEFAULT_UNSUBSCRIBE);
- m_noLocal = MapUtils.removeBooleanProperty(properties,
- JMSConstants.NO_LOCAL,
+ m_noLocal = MapUtils.removeBooleanProperty(properties, JMSConstants.NO_LOCAL,
JMSConstants.DEFAULT_NO_LOCAL);
}
- boolean isDurable() {
- return m_subscriptionName != null;
- }
-
public boolean equals(Object obj) {
- if (!super.equals(obj))
+ if (!super.equals(obj)) {
return false;
- if (!(obj instanceof TopicSubscription))
+ }
+
+ if (!(obj instanceof TopicSubscription)) {
return false;
+ }
TopicSubscription other = (TopicSubscription) obj;
- if (other.m_unsubscribe != m_unsubscribe || other.m_noLocal != m_noLocal)
+
+ if ((other.m_unsubscribe != m_unsubscribe) || (other.m_noLocal != m_noLocal)) {
return false;
+ }
if (isDurable()) {
return other.isDurable() && other.m_subscriptionName.equals(m_subscriptionName);
- } else if (other.isDurable())
+ } else if (other.isDurable()) {
return false;
- else
+ } else {
return true;
+ }
}
public String toString() {
StringBuffer buffer = new StringBuffer(super.toString());
+
buffer.append(":").append(m_noLocal).append(":").append(m_unsubscribe);
+
if (isDurable()) {
buffer.append(":");
buffer.append(m_subscriptionName);
}
+
return buffer.toString();
}
+ boolean isDurable() {
+ return m_subscriptionName != null;
+ }
}
- private final class TopicDestinationEndpoint
- extends TopicEndpoint {
- Topic m_topic;
- TopicDestinationEndpoint(Topic topic)
+ private final class TopicSyncConnection extends SyncConnection {
+ TopicSyncConnection(TopicConnectionFactory connectionFactory, TopicConnection connection,
+ int numSessions, String threadName, String clientID, String username,
+ String password)
throws JMSException {
- super(topic.getTopicName());
- m_topic = topic;
+ super(connectionFactory, connection, numSessions, threadName, clientID, username,
+ password);
}
- Destination getDestination(Session session) {
- return m_topic;
+ protected SendSession createSendSession(javax.jms.Connection connection)
+ throws JMSException {
+ TopicSession session = createTopicSession((TopicConnection) connection,
+ JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
+ TopicPublisher publisher = session.createPublisher(null);
+
+ return new TopicSendSession(session, publisher);
}
- }
+ private final class TopicSendSession extends SendSession {
+ TopicSendSession(TopicSession session, TopicPublisher publisher) throws JMSException {
+ super(session, publisher);
+ }
+ protected MessageConsumer createConsumer(Destination destination) throws JMSException {
+ return createSubscriber((TopicSession) m_session, (Topic) destination, null,
+ JMSConstants.DEFAULT_NO_LOCAL);
+ }
-}
\ No newline at end of file
+ protected Destination createTemporaryDestination() throws JMSException {
+ return ((TopicSession) m_session).createTemporaryTopic();
+ }
+
+ protected void deleteTemporaryDestination(Destination destination) throws JMSException {
+ ((TemporaryTopic) destination).delete();
+ }
+
+ protected void send(Destination destination, Message message, int deliveryMode,
+ int priority, long timeToLive)
+ throws JMSException {
+ ((TopicPublisher) m_producer).publish((Topic) destination, message, deliveryMode,
+ priority, timeToLive);
+ }
+ }
+ }
+}
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalResponder.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalResponder.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalResponder.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalResponder.java Fri Dec 16 09:13:57 2005
@@ -13,6 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+
package org.apache.axis2.transport.local;
import org.apache.axis2.AxisFault;
@@ -32,30 +34,37 @@
this.sender = sender;
}
- public OutputStream startSendWithToAddress(MessageContext msgContext, OutputStream out) throws AxisFault {
- return out;
+ /**
+ * Clean up
+ *
+ * @param msgContext
+ * @throws org.apache.axis2.AxisFault
+ */
+ public void cleanUp(MessageContext msgContext) throws AxisFault {
}
- public void finalizeSendWithToAddress(MessageContext msgContext, OutputStream out) throws AxisFault {
+ public void finalizeSendWithOutputStreamFromIncomingConnection(MessageContext msgContext,
+ OutputStream out)
+ throws AxisFault {
}
- public OutputStream startSendWithOutputStreamFromIncomingConnection(MessageContext msgContext, OutputStream out) throws AxisFault {
- return null;
+ public void finalizeSendWithToAddress(MessageContext msgContext, OutputStream out)
+ throws AxisFault {
}
- public void finalizeSendWithOutputStreamFromIncomingConnection(MessageContext msgContext, OutputStream out) throws AxisFault {
+ protected OutputStream openTheConnection(EndpointReference epr, MessageContext msgctx)
+ throws AxisFault {
+ return sender.getResponse();
}
- protected OutputStream openTheConnection(EndpointReference epr, MessageContext msgctx) throws AxisFault {
- return sender.getResponse();
+ public OutputStream startSendWithOutputStreamFromIncomingConnection(MessageContext msgContext,
+ OutputStream out)
+ throws AxisFault {
+ return null;
}
- /**
- * Clean up
- *
- * @param msgContext
- * @throws org.apache.axis2.AxisFault
- */
- public void cleanUp(MessageContext msgContext) throws AxisFault {
+ public OutputStream startSendWithToAddress(MessageContext msgContext, OutputStream out)
+ throws AxisFault {
+ return out;
}
}
Modified: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalTransportReceiver.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalTransportReceiver.java?rev=357187&r1=357186&r2=357187&view=diff
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalTransportReceiver.java (original)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/local/LocalTransportReceiver.java Fri Dec 16 09:13:57 2005
@@ -1,18 +1,19 @@
/*
- * Copyright 2004,2005 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Copyright 2004,2005 The Apache Software Foundation.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
package org.apache.axis2.transport.local;
@@ -39,7 +40,6 @@
public class LocalTransportReceiver {
public static ConfigurationContext CONFIG_CONTEXT;
-
private ConfigurationContext confContext;
private LocalTransportSender sender;
@@ -52,30 +52,28 @@
this.sender = sender;
}
- public void processMessage(InputStream in,
- EndpointReference to) throws AxisFault {
+ public void processMessage(InputStream in, EndpointReference to) throws AxisFault {
try {
- TransportInDescription tIn =
- confContext.getAxisConfiguration().getTransportIn(
+ TransportInDescription tIn = confContext.getAxisConfiguration().getTransportIn(
new QName(Constants.TRANSPORT_LOCAL));
- TransportOutDescription tOut =
- confContext.getAxisConfiguration().getTransportOut(
+ TransportOutDescription tOut = confContext.getAxisConfiguration().getTransportOut(
new QName(Constants.TRANSPORT_LOCAL));
+
tOut.setSender(new LocalResponder(sender));
MessageContext msgCtx = new MessageContext(confContext, tIn, tOut);
+
msgCtx.setTo(to);
msgCtx.setServerSide(true);
-
msgCtx.setProperty(MessageContext.TRANSPORT_OUT, sender.getResponse());
- XMLStreamReader reader =
- XMLInputFactory.newInstance().createXMLStreamReader(
+ XMLStreamReader reader = XMLInputFactory.newInstance().createXMLStreamReader(
new BufferedReader(new InputStreamReader(in)));
-
StAXBuilder builder = new StAXSOAPModelBuilder(reader, null);
SOAPEnvelope envelope = (SOAPEnvelope) builder.getDocumentElement();
+
msgCtx.setEnvelope(envelope);
+
AxisEngine engine = new AxisEngine(confContext);
if (envelope.getBody().hasFault()) {
@@ -89,5 +87,4 @@
throw new AxisFault(e);
}
}
-
}