You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by di...@apache.org on 2005/12/05 22:38:38 UTC
svn commit: r354198 [3/3] -
/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/
Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java Mon Dec 5 13:38:30 2005
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.i18n.Messages;
+import org.apache.axis2.om.OMException;
+import org.apache.axis2.om.impl.llom.builder.StAXBuilder;
+import org.apache.axis2.soap.SOAP11Constants;
+import org.apache.axis2.soap.SOAP12Constants;
+import org.apache.axis2.soap.SOAPEnvelope;
+import org.apache.axis2.soap.SOAPProcessingException;
+import org.apache.axis2.soap.impl.llom.builder.StAXSOAPModelBuilder;
+import org.apache.axis2.soap.impl.llom.soap12.SOAP12Factory;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.axis2.transport.http.HTTPTransportUtils;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.axis2.util.UUIDGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.xml.namespace.QName;
+import javax.xml.parsers.FactoryConfigurationError;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.io.UnsupportedEncodingException;
+
+/**
+ * SimpleJMSWorker is a worker thread that processes messages that are
+ * received by SimpleJMSListener. It creates a new message context, invokes
+ * the server, and sends back response msg to the replyTo destination.
+ */
+public class SimpleJMSWorker implements Runnable {
+ protected static Log log =
+ LogFactory.getLog(SimpleJMSWorker.class.getName());
+
+ SimpleJMSListener listener;
+ BytesMessage message;
+ private ConfigurationContext configurationContext;
+
+ public SimpleJMSWorker(ConfigurationContext configurationContext, SimpleJMSListener listener, BytesMessage message) {
+ this.listener = listener;
+ this.message = message;
+ this.configurationContext = configurationContext;
+ }
+
+ /**
+ * This is where the incoming message is processed.
+ */
+ public void run() {
+ InputStream in = null;
+ try {
+ // get the incoming msg content into a byte array
+ byte[] buffer = new byte[8 * 1024];
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ for (int bytesRead = message.readBytes(buffer);
+ bytesRead != -1; bytesRead = message.readBytes(buffer)) {
+ out.write(buffer, 0, bytesRead);
+ }
+ in = new ByteArrayInputStream(out.toByteArray());
+ }
+ catch (Exception e) {
+ log.error(Messages.getMessage("exception00"), e);
+ e.printStackTrace();
+ return;
+ }
+
+ // if the incoming message has a contentType set,
+ // pass it to my new Message
+ String contentType = null;
+ try {
+ contentType = message.getStringProperty("contentType");
+ }
+ catch (Exception e) {
+ log.error(Messages.getMessage("exception00"), e);
+ e.printStackTrace();
+ return;
+ }
+
+ MessageContext msgContext;
+ try {
+ TransportOutDescription transportOut =
+ configurationContext.getAxisConfiguration().getTransportOut(
+ new QName(Constants.TRANSPORT_HTTP));
+ msgContext = new MessageContext(
+ configurationContext,
+ configurationContext.getAxisConfiguration().getTransportIn(
+ new QName(Constants.TRANSPORT_HTTP)),
+ transportOut);
+ msgContext.setServerSide(true);
+ } catch (Exception e) {
+ log.error(Messages.getMessage("exception00"), e);
+ e.printStackTrace();
+ return;
+ }
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ msgContext.setProperty(MessageContext.TRANSPORT_OUT, baos);
+ msgContext.setServiceGroupContextId(UUIDGenerator.getUUID());
+
+ try {
+ processJMSRequest(
+ msgContext,
+ in,
+ baos,
+ contentType
+ );
+ } catch (Exception e) {
+ log.error(Messages.getMessage("exception00"), e);
+ e.printStackTrace();
+ return;
+ }
+
+// Message msg = null;
+// if (contentType != null && !contentType.trim().equals("")) {
+// msg = new Message(in, true, contentType, null);
+// } else {
+// msg = new Message(in);
+// }
+//
+// MessageContext msgContext = new MessageContext(server);
+// msgContext.setRequestMessage(msg);
+// try {
+// server.invoke(msgContext);
+// msg = msgContext.getResponseMessage();
+// }
+// catch (AxisFault af) {
+// msg = new Message(af);
+// msg.setMessageContext(msgContext);
+// }
+// catch (Exception e) {
+// msg = new Message(new AxisFault(e.toString()));
+// msg.setMessageContext(msgContext);
+// }
+
+ try {
+ // now we need to send the response
+ Destination destination = message.getJMSReplyTo();
+ if (destination == null)
+ return;
+ JMSEndpoint replyTo = listener.getConnector().createEndpoint(destination);
+ replyTo.send(baos.toByteArray());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ }
+
+// if (msgContext.getProperty(MessageContext.QUIT_REQUESTED) != null)
+// // why then, quit!
+// try {
+// listener.shutdown();
+// } catch (Exception e) {
+// }
+ }
+
+ public static void processJMSRequest(
+ MessageContext msgContext,
+ InputStream in,
+ OutputStream out,
+ String contentType
+ )
+ throws AxisFault {
+ boolean soap11 = false;
+ try {
+
+// //remove the starting and trailing " from the SOAP Action
+// if (soapActionHeader != null
+// && soapActionHeader.startsWith("\"")
+// && soapActionHeader.endsWith("\"")) {
+//
+// soapActionHeader =
+// soapActionHeader.substring(
+// 1,
+// soapActionHeader.length() - 1);
+// }
+// //fill up the Message Contexts
+// msgContext.setWSAAction(soapActionHeader);
+// msgContext.setSoapAction(soapActionHeader);
+// msgContext.setTo(new EndpointReference(requestURI));
+ msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
+ msgContext.setServerSide(true);
+
+ SOAPEnvelope envelope = null;
+ StAXBuilder builder = null;
+ if (contentType != null) {
+ if (contentType
+ .indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED)
+ > -1) {
+ //It is MTOM
+ builder = HTTPTransportUtils.selectBuilderForMIME(msgContext, in, contentType);
+ envelope = (SOAPEnvelope) builder.getDocumentElement();
+ } else {
+ Reader reader = new InputStreamReader(in);
+
+ XMLStreamReader xmlreader;
+ //Figure out the char set encoding and create the reader
+
+ //If charset is not specified
+ if (TransportUtils.getCharSetEncoding(contentType) == null) {
+ xmlreader =
+ XMLInputFactory
+ .newInstance()
+ .createXMLStreamReader(
+ in,
+ MessageContext.DEFAULT_CHAR_SET_ENCODING);
+ //Set the encoding scheme in the message context
+ msgContext.setProperty(
+ MessageContext.CHARACTER_SET_ENCODING,
+ MessageContext.DEFAULT_CHAR_SET_ENCODING);
+ } else {
+ //get the type of char encoding
+ String charSetEnc = TransportUtils.getCharSetEncoding(contentType);
+ xmlreader =
+ XMLInputFactory
+ .newInstance()
+ .createXMLStreamReader(
+ in,
+ charSetEnc);
+
+ //Setting the value in msgCtx
+ msgContext.setProperty(
+ MessageContext.CHARACTER_SET_ENCODING,
+ charSetEnc);
+
+ }
+ if (contentType
+ .indexOf(SOAP12Constants.SOAP_12_CONTENT_TYPE)
+ > -1) {
+ soap11 = false;
+ //it is SOAP 1.2
+ builder =
+ new StAXSOAPModelBuilder(
+ xmlreader,
+ SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+ envelope = (SOAPEnvelope) builder.getDocumentElement();
+ } else if (
+ contentType.indexOf(
+ SOAP11Constants.SOAP_11_CONTENT_TYPE)
+ > -1) {
+ soap11 = true;
+ builder =
+ new StAXSOAPModelBuilder(
+ xmlreader,
+ SOAP11Constants
+ .SOAP_ENVELOPE_NAMESPACE_URI);
+ envelope =
+ (SOAPEnvelope) builder.getDocumentElement();
+ }
+
+ }
+
+ }
+
+ String charsetEncoding = builder.getDocument().getCharsetEncoding();
+ if (charsetEncoding != null && !"".equals(charsetEncoding) &&
+ !((String) msgContext.getProperty(MessageContext.CHARACTER_SET_ENCODING))
+ .equalsIgnoreCase(charsetEncoding)) {
+ String faultCode;
+ if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(envelope.getNamespace().getName())) {
+ faultCode = SOAP12Constants.FAULT_CODE_SENDER;
+ } else {
+ faultCode = SOAP11Constants.FAULT_CODE_SENDER;
+ }
+ throw new AxisFault("Character Set Encoding from " +
+ "transport information do not match with " +
+ "character set encoding in the received SOAP message", faultCode);
+ }
+
+
+ msgContext.setEnvelope(envelope);
+ AxisEngine engine = new AxisEngine(msgContext.getConfigurationContext());
+ if (envelope.getBody().hasFault()) {
+ engine.receiveFault(msgContext);
+ } else {
+ engine.receive(msgContext);
+ }
+ } catch (SOAPProcessingException e) {
+ throw new AxisFault(e);
+
+ } catch (AxisFault e) {
+ //rethrow
+ throw e;
+ } catch (OMException e) {
+ throw new AxisFault(e);
+ } catch (XMLStreamException e) {
+ throw new AxisFault(e);
+ } catch (FactoryConfigurationError e) {
+ throw new AxisFault(e);
+ } catch (UnsupportedEncodingException e) {
+ throw new AxisFault(e);
+ } finally {
+ if (msgContext.getEnvelope() == null && !soap11) {
+ msgContext.setEnvelope(
+ new SOAP12Factory().createSOAPEnvelope());
+ }
+
+ }
+ }
+}
Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java Mon Dec 5 13:38:30 2005
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import javax.jms.MessageListener;
+import java.util.HashMap;
+
+/*
+ * Subscription class holds information about a subscription
+ */
+
+public class Subscription {
+ MessageListener m_listener;
+ JMSEndpoint m_endpoint;
+ String m_messageSelector;
+ int m_ackMode;
+
+ Subscription(MessageListener listener,
+ JMSEndpoint endpoint,
+ HashMap properties) {
+ m_listener = listener;
+ m_endpoint = endpoint;
+ m_messageSelector = MapUtils.removeStringProperty(
+ properties,
+ JMSConstants.MESSAGE_SELECTOR,
+ null);
+ m_ackMode = MapUtils.removeIntProperty(properties,
+ JMSConstants.ACKNOWLEDGE_MODE,
+ JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
+ }
+
+ public int hashCode() {
+ return toString().hashCode();
+ }
+
+ public boolean equals(Object obj) {
+ if (obj == null || !(obj instanceof Subscription))
+ return false;
+ Subscription other = (Subscription) obj;
+ if (m_messageSelector == null) {
+ if (other.m_messageSelector != null)
+ return false;
+ } else {
+ if (other.m_messageSelector == null ||
+ !other.m_messageSelector.equals(m_messageSelector))
+ return false;
+ }
+ return m_ackMode == other.m_ackMode &&
+ m_endpoint.equals(other.m_endpoint) &&
+ other.m_listener.equals(m_listener);
+ }
+
+ public String toString() {
+ return m_listener.toString();
+ }
+
+}
\ No newline at end of file
Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java Mon Dec 5 13:38:30 2005
@@ -0,0 +1,379 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import java.util.HashMap;
+
+/**
+ * TopicConnector is a concrete JMSConnector subclass that specifically handles
+ * connections to topics (pub-sub domain).
+ */
+public class TopicConnector extends JMSConnector {
+ public TopicConnector(TopicConnectionFactory factory,
+ int numRetries,
+ int numSessions,
+ long connectRetryInterval,
+ long interactRetryInterval,
+ long timeoutTime,
+ boolean allowReceive,
+ String clientID,
+ String username,
+ String password,
+ JMSVendorAdapter adapter,
+ JMSURLHelper jmsurl)
+ throws JMSException {
+ super(factory, numRetries, numSessions, connectRetryInterval,
+ interactRetryInterval, timeoutTime, allowReceive,
+ clientID, username, password, adapter, jmsurl);
+ }
+
+ protected javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
+ String username, String password)
+ throws JMSException {
+ TopicConnectionFactory tcf = (TopicConnectionFactory) connectionFactory;
+ if (username == null)
+ return tcf.createTopicConnection();
+
+ return tcf.createTopicConnection(username, password);
+ }
+
+
+ protected SyncConnection createSyncConnection(ConnectionFactory factory,
+ javax.jms.Connection connection,
+ int numSessions,
+ String threadName,
+ String clientID,
+ String username,
+ String password)
+ throws JMSException {
+ return new TopicSyncConnection((TopicConnectionFactory) factory,
+ (TopicConnection) connection, numSessions,
+ threadName, clientID, username, password);
+ }
+
+ protected AsyncConnection createAsyncConnection(ConnectionFactory factory,
+ javax.jms.Connection connection,
+ String threadName,
+ String clientID,
+ String username,
+ String password)
+ throws JMSException {
+ return new TopicAsyncConnection((TopicConnectionFactory) factory,
+ (TopicConnection) connection, threadName,
+ clientID, username, password);
+ }
+
+ public JMSEndpoint createEndpoint(String destination) {
+ return new TopicEndpoint(destination);
+ }
+
+ /**
+ * Create an endpoint for a queue destination.
+ *
+ * @param destination
+ * @return
+ * @throws JMSException
+ */
+ public JMSEndpoint createEndpoint(Destination destination)
+ throws JMSException {
+ if (!(destination instanceof Topic))
+ throw new IllegalArgumentException("The input be a topic for this connector");
+ return new TopicDestinationEndpoint((Topic) destination);
+ }
+
+ private TopicSession createTopicSession(TopicConnection connection, int ackMode)
+ throws JMSException {
+ return connection.createTopicSession(false,
+ ackMode);
+ }
+
+ private Topic createTopic(TopicSession session, String subject)
+ throws Exception {
+ return m_adapter.getTopic(session, subject);
+ }
+
+ private TopicSubscriber createSubscriber(TopicSession session,
+ TopicSubscription subscription)
+ throws Exception {
+ if (subscription.isDurable())
+ return createDurableSubscriber(session,
+ (Topic) subscription.m_endpoint.getDestination(session),
+ subscription.m_subscriptionName,
+ subscription.m_messageSelector,
+ subscription.m_noLocal);
+ else
+ return createSubscriber(session,
+ (Topic) subscription.m_endpoint.getDestination(session),
+ subscription.m_messageSelector,
+ subscription.m_noLocal);
+ }
+
+ private TopicSubscriber createDurableSubscriber(TopicSession session,
+ Topic topic,
+ String subscriptionName,
+ String messageSelector,
+ boolean noLocal)
+ throws JMSException {
+ return session.createDurableSubscriber(topic, subscriptionName,
+ messageSelector, noLocal);
+ }
+
+ private TopicSubscriber createSubscriber(TopicSession session,
+ Topic topic,
+ String messageSelector,
+ boolean noLocal)
+ throws JMSException {
+ return session.createSubscriber(topic, messageSelector, noLocal);
+ }
+
+
+ private final class TopicAsyncConnection extends AsyncConnection {
+
+ TopicAsyncConnection(TopicConnectionFactory connectionFactory,
+ TopicConnection connection,
+ String threadName,
+ String clientID,
+ String username,
+ String password)
+
+ throws JMSException {
+ super(connectionFactory, connection, threadName,
+ clientID, username, password);
+ }
+
+ protected ListenerSession createListenerSession(javax.jms.Connection connection,
+ Subscription subscription)
+ throws Exception {
+ TopicSession session = createTopicSession((TopicConnection) connection,
+ subscription.m_ackMode);
+ TopicSubscriber subscriber = createSubscriber(session,
+ (TopicSubscription) subscription);
+ return new TopicListenerSession(session, subscriber,
+ (TopicSubscription) subscription);
+ }
+
+ private final class TopicListenerSession extends ListenerSession {
+
+ TopicListenerSession(TopicSession session,
+ TopicSubscriber subscriber,
+ TopicSubscription subscription)
+ throws Exception {
+ super(session, subscriber, subscription);
+ }
+
+ void cleanup() {
+ try {
+ m_consumer.close();
+ } catch (Exception ignore) {
+ }
+ try {
+ TopicSubscription sub = (TopicSubscription) m_subscription;
+ if (sub.isDurable() && sub.m_unsubscribe) {
+ ((TopicSession) m_session).unsubscribe(sub.m_subscriptionName);
+ }
+ }
+ catch (Exception ignore) {
+ }
+ try {
+ m_session.close();
+ } catch (Exception ignore) {
+ }
+
+ }
+ }
+ }
+
+ private final class TopicSyncConnection extends SyncConnection {
+ TopicSyncConnection(TopicConnectionFactory connectionFactory,
+ TopicConnection connection,
+ int numSessions,
+ String threadName,
+ String clientID,
+ String username,
+ String password)
+
+ throws JMSException {
+ super(connectionFactory, connection, numSessions, threadName,
+ clientID, username, password);
+ }
+
+ protected SendSession createSendSession(javax.jms.Connection connection)
+ throws JMSException {
+ TopicSession session = createTopicSession((TopicConnection) connection,
+ JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
+ TopicPublisher publisher = session.createPublisher(null);
+ return new TopicSendSession(session, publisher);
+ }
+
+ private final class TopicSendSession extends SendSession {
+ TopicSendSession(TopicSession session,
+ TopicPublisher publisher)
+ throws JMSException {
+ super(session, publisher);
+ }
+
+
+ protected MessageConsumer createConsumer(Destination destination)
+ throws JMSException {
+ return createSubscriber((TopicSession) m_session, (Topic) destination,
+ null, JMSConstants.DEFAULT_NO_LOCAL);
+ }
+
+ protected void deleteTemporaryDestination(Destination destination)
+ throws JMSException {
+ ((TemporaryTopic) destination).delete();
+ }
+
+
+ protected Destination createTemporaryDestination()
+ throws JMSException {
+ return ((TopicSession) m_session).createTemporaryTopic();
+ }
+
+ protected void send(Destination destination, Message message,
+ int deliveryMode, int priority, long timeToLive)
+ throws JMSException {
+ ((TopicPublisher) m_producer).publish((Topic) destination, message,
+ deliveryMode, priority, timeToLive);
+ }
+
+ }
+ }
+
+
+ private class TopicEndpoint
+ extends JMSEndpoint {
+ String m_topicName;
+
+ TopicEndpoint(String topicName) {
+ super(TopicConnector.this);
+ m_topicName = topicName;
+ }
+
+ Destination getDestination(Session session)
+ throws Exception {
+ return createTopic((TopicSession) session, m_topicName);
+ }
+
+ protected Subscription createSubscription(MessageListener listener,
+ HashMap properties) {
+ return new TopicSubscription(listener, this, properties);
+ }
+
+ public String toString() {
+ StringBuffer buffer = new StringBuffer("TopicEndpoint:");
+ buffer.append(m_topicName);
+ return buffer.toString();
+ }
+
+ public boolean equals(Object object) {
+ if (!super.equals(object))
+ return false;
+
+ if (!(object instanceof TopicEndpoint))
+ return false;
+
+ return m_topicName.equals(((TopicEndpoint) object).m_topicName);
+ }
+ }
+
+ private final class TopicSubscription extends Subscription {
+ String m_subscriptionName;
+ boolean m_unsubscribe;
+ boolean m_noLocal;
+
+ TopicSubscription(MessageListener listener,
+ JMSEndpoint endpoint,
+ HashMap properties) {
+ super(listener, endpoint, properties);
+ m_subscriptionName = MapUtils.removeStringProperty(properties,
+ JMSConstants.SUBSCRIPTION_NAME,
+ null);
+ m_unsubscribe = MapUtils.removeBooleanProperty(properties,
+ JMSConstants.UNSUBSCRIBE,
+ JMSConstants.DEFAULT_UNSUBSCRIBE);
+ m_noLocal = MapUtils.removeBooleanProperty(properties,
+ JMSConstants.NO_LOCAL,
+ JMSConstants.DEFAULT_NO_LOCAL);
+ }
+
+ boolean isDurable() {
+ return m_subscriptionName != null;
+ }
+
+ public boolean equals(Object obj) {
+ if (!super.equals(obj))
+ return false;
+ if (!(obj instanceof TopicSubscription))
+ return false;
+
+ TopicSubscription other = (TopicSubscription) obj;
+ if (other.m_unsubscribe != m_unsubscribe || other.m_noLocal != m_noLocal)
+ return false;
+
+ if (isDurable()) {
+ return other.isDurable() && other.m_subscriptionName.equals(m_subscriptionName);
+ } else if (other.isDurable())
+ return false;
+ else
+ return true;
+ }
+
+ public String toString() {
+ StringBuffer buffer = new StringBuffer(super.toString());
+ buffer.append(":").append(m_noLocal).append(":").append(m_unsubscribe);
+ if (isDurable()) {
+ buffer.append(":");
+ buffer.append(m_subscriptionName);
+ }
+ return buffer.toString();
+ }
+
+ }
+
+ private final class TopicDestinationEndpoint
+ extends TopicEndpoint {
+ Topic m_topic;
+
+ TopicDestinationEndpoint(Topic topic)
+ throws JMSException {
+ super(topic.getTopicName());
+ m_topic = topic;
+ }
+
+ Destination getDestination(Session session) {
+ return m_topic;
+ }
+
+ }
+
+
+}
\ No newline at end of file
Re: svn commit: r354198 [3/3] - /webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/
Posted by Davanum Srinivas <da...@gmail.com>.
yep. am still trying to get something to work before i start cleaning up.
-- dims
On 12/6/05, Sanjiva Weerawarana <sa...@opensource.lk> wrote:
> Hi Dims,
>
> I know you said this is the initial cut .. but .. :):
>
> > + public static void processJMSRequest(
> > + MessageContext msgContext,
> > + InputStream in,
> > + OutputStream out,
> > + String contentType
> > + )
> > + throws AxisFault {
> > + boolean soap11 = false;
> > + try {
> > +
> > +// //remove the starting and trailing " from the SOAP Action
> > +// if (soapActionHeader != null
> > +// && soapActionHeader.startsWith("\"")
> > +// && soapActionHeader.endsWith("\"")) {
> > +//
> > +// soapActionHeader =
> > +// soapActionHeader.substring(
> > +// 1,
> > +// soapActionHeader.length() - 1);
> > +// }
> > +// //fill up the Message Contexts
> > +// msgContext.setWSAAction(soapActionHeader);
> > +// msgContext.setSoapAction(soapActionHeader);
> > +// msgContext.setTo(new EndpointReference(requestURI));
> > + msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
> > + msgContext.setServerSide(true);
> > +
> > + SOAPEnvelope envelope = null;
> > + StAXBuilder builder = null;
> ...
>
> Can this code not be shared with HTTP?
>
> Sanjiva.
>
>
>
--
Davanum Srinivas : http://wso2.com/blogs/
Re: svn commit: r354198 [3/3] - /webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/
Posted by Davanum Srinivas <da...@gmail.com>.
yep. am still trying to get something to work before i start cleaning up.
-- dims
On 12/6/05, Sanjiva Weerawarana <sa...@opensource.lk> wrote:
> Hi Dims,
>
> I know you said this is the initial cut .. but .. :):
>
> > + public static void processJMSRequest(
> > + MessageContext msgContext,
> > + InputStream in,
> > + OutputStream out,
> > + String contentType
> > + )
> > + throws AxisFault {
> > + boolean soap11 = false;
> > + try {
> > +
> > +// //remove the starting and trailing " from the SOAP Action
> > +// if (soapActionHeader != null
> > +// && soapActionHeader.startsWith("\"")
> > +// && soapActionHeader.endsWith("\"")) {
> > +//
> > +// soapActionHeader =
> > +// soapActionHeader.substring(
> > +// 1,
> > +// soapActionHeader.length() - 1);
> > +// }
> > +// //fill up the Message Contexts
> > +// msgContext.setWSAAction(soapActionHeader);
> > +// msgContext.setSoapAction(soapActionHeader);
> > +// msgContext.setTo(new EndpointReference(requestURI));
> > + msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
> > + msgContext.setServerSide(true);
> > +
> > + SOAPEnvelope envelope = null;
> > + StAXBuilder builder = null;
> ...
>
> Can this code not be shared with HTTP?
>
> Sanjiva.
>
>
>
--
Davanum Srinivas : http://wso2.com/blogs/
Re: svn commit: r354198 [3/3] -
/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/
Posted by Sanjiva Weerawarana <sa...@opensource.lk>.
Hi Dims,
I know you said this is the initial cut .. but .. :):
> + public static void processJMSRequest(
> + MessageContext msgContext,
> + InputStream in,
> + OutputStream out,
> + String contentType
> + )
> + throws AxisFault {
> + boolean soap11 = false;
> + try {
> +
> +// //remove the starting and trailing " from the SOAP Action
> +// if (soapActionHeader != null
> +// && soapActionHeader.startsWith("\"")
> +// && soapActionHeader.endsWith("\"")) {
> +//
> +// soapActionHeader =
> +// soapActionHeader.substring(
> +// 1,
> +// soapActionHeader.length() - 1);
> +// }
> +// //fill up the Message Contexts
> +// msgContext.setWSAAction(soapActionHeader);
> +// msgContext.setSoapAction(soapActionHeader);
> +// msgContext.setTo(new EndpointReference(requestURI));
> + msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
> + msgContext.setServerSide(true);
> +
> + SOAPEnvelope envelope = null;
> + StAXBuilder builder = null;
...
Can this code not be shared with HTTP?
Sanjiva.
Re: svn commit: r354198 [3/3] -
/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/
Posted by Sanjiva Weerawarana <sa...@opensource.lk>.
Hi Dims,
I know you said this is the initial cut .. but .. :):
> + public static void processJMSRequest(
> + MessageContext msgContext,
> + InputStream in,
> + OutputStream out,
> + String contentType
> + )
> + throws AxisFault {
> + boolean soap11 = false;
> + try {
> +
> +// //remove the starting and trailing " from the SOAP Action
> +// if (soapActionHeader != null
> +// && soapActionHeader.startsWith("\"")
> +// && soapActionHeader.endsWith("\"")) {
> +//
> +// soapActionHeader =
> +// soapActionHeader.substring(
> +// 1,
> +// soapActionHeader.length() - 1);
> +// }
> +// //fill up the Message Contexts
> +// msgContext.setWSAAction(soapActionHeader);
> +// msgContext.setSoapAction(soapActionHeader);
> +// msgContext.setTo(new EndpointReference(requestURI));
> + msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
> + msgContext.setServerSide(true);
> +
> + SOAPEnvelope envelope = null;
> + StAXBuilder builder = null;
...
Can this code not be shared with HTTP?
Sanjiva.