You are viewing a plain text version of this content. The canonical link for it is here.
Posted to axis-cvs@ws.apache.org by di...@apache.org on 2005/12/05 22:38:38 UTC

svn commit: r354198 [3/3] - /webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/

Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/SimpleJMSWorker.java Mon Dec  5 13:38:30 2005
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.Constants;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.description.TransportOutDescription;
+import org.apache.axis2.engine.AxisEngine;
+import org.apache.axis2.i18n.Messages;
+import org.apache.axis2.om.OMException;
+import org.apache.axis2.om.impl.llom.builder.StAXBuilder;
+import org.apache.axis2.soap.SOAP11Constants;
+import org.apache.axis2.soap.SOAP12Constants;
+import org.apache.axis2.soap.SOAPEnvelope;
+import org.apache.axis2.soap.SOAPProcessingException;
+import org.apache.axis2.soap.impl.llom.builder.StAXSOAPModelBuilder;
+import org.apache.axis2.soap.impl.llom.soap12.SOAP12Factory;
+import org.apache.axis2.transport.http.HTTPConstants;
+import org.apache.axis2.transport.http.HTTPTransportUtils;
+import org.apache.axis2.transport.TransportUtils;
+import org.apache.axis2.util.UUIDGenerator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.xml.namespace.QName;
+import javax.xml.parsers.FactoryConfigurationError;
+import javax.xml.stream.XMLInputFactory;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.io.UnsupportedEncodingException;
+
+/**
+ * SimpleJMSWorker is a worker thread that processes messages that are
+ * received by SimpleJMSListener. It creates a new message context, invokes
+ * the server, and sends back response msg to the replyTo destination.
+ */
+public class SimpleJMSWorker implements Runnable {
+    protected static Log log =
+            LogFactory.getLog(SimpleJMSWorker.class.getName());
+
+    SimpleJMSListener listener;
+    BytesMessage message;
+    private ConfigurationContext configurationContext;
+
+    public SimpleJMSWorker(ConfigurationContext configurationContext, SimpleJMSListener listener, BytesMessage message) {
+        this.listener = listener;
+        this.message = message;
+        this.configurationContext = configurationContext;
+    }
+
+    /**
+     * This is where the incoming message is processed.
+     */
+    public void run() {
+        InputStream in = null;
+        try {
+            // get the incoming msg content into a byte array
+            byte[] buffer = new byte[8 * 1024];
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            for (int bytesRead = message.readBytes(buffer);
+                 bytesRead != -1; bytesRead = message.readBytes(buffer)) {
+                out.write(buffer, 0, bytesRead);
+            }
+            in = new ByteArrayInputStream(out.toByteArray());
+        }
+        catch (Exception e) {
+            log.error(Messages.getMessage("exception00"), e);
+            e.printStackTrace();
+            return;
+        }
+
+        // if the incoming message has a contentType set,
+        // pass it to my new Message
+        String contentType = null;
+        try {
+            contentType = message.getStringProperty("contentType");
+        }
+        catch (Exception e) {
+            log.error(Messages.getMessage("exception00"), e);
+            e.printStackTrace();
+            return;
+        }
+
+        MessageContext msgContext;
+        try {
+            TransportOutDescription transportOut =
+                    configurationContext.getAxisConfiguration().getTransportOut(
+                            new QName(Constants.TRANSPORT_HTTP));
+            msgContext = new MessageContext(
+                    configurationContext,
+                    configurationContext.getAxisConfiguration().getTransportIn(
+                            new QName(Constants.TRANSPORT_HTTP)),
+                    transportOut);
+            msgContext.setServerSide(true);
+        } catch (Exception e) {
+            log.error(Messages.getMessage("exception00"), e);
+            e.printStackTrace();
+            return;
+        }
+
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        msgContext.setProperty(MessageContext.TRANSPORT_OUT, baos);
+        msgContext.setServiceGroupContextId(UUIDGenerator.getUUID());
+
+        try {
+            processJMSRequest(
+                    msgContext,
+                    in,
+                    baos,
+                    contentType
+            );
+        } catch (Exception e) {
+            log.error(Messages.getMessage("exception00"), e);
+            e.printStackTrace();
+            return;
+        }
+
+//        Message msg = null;
+//        if (contentType != null && !contentType.trim().equals("")) {
+//            msg = new Message(in, true, contentType, null);
+//        } else {
+//            msg = new Message(in);
+//        }
+//
+//        MessageContext msgContext = new MessageContext(server);
+//        msgContext.setRequestMessage(msg);
+//        try {
+//            server.invoke(msgContext);
+//            msg = msgContext.getResponseMessage();
+//        }
+//        catch (AxisFault af) {
+//            msg = new Message(af);
+//            msg.setMessageContext(msgContext);
+//        }
+//        catch (Exception e) {
+//            msg = new Message(new AxisFault(e.toString()));
+//            msg.setMessageContext(msgContext);
+//        }
+
+        try {
+            // now we need to send the response
+            Destination destination = message.getJMSReplyTo();
+            if (destination == null)
+                return;
+            JMSEndpoint replyTo = listener.getConnector().createEndpoint(destination);
+            replyTo.send(baos.toByteArray());
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+
+//        if (msgContext.getProperty(MessageContext.QUIT_REQUESTED) != null)
+//            // why then, quit!
+//            try {
+//                listener.shutdown();
+//            } catch (Exception e) {
+//            }
+    }
+
+    public static void processJMSRequest(
+            MessageContext msgContext,
+            InputStream in,
+            OutputStream out,
+            String contentType
+    )
+            throws AxisFault {
+        boolean soap11 = false;
+        try {
+
+//            //remove the starting and trailing " from the SOAP Action
+//            if (soapActionHeader != null
+//                    && soapActionHeader.startsWith("\"")
+//                    && soapActionHeader.endsWith("\"")) {
+//
+//                soapActionHeader =
+//                        soapActionHeader.substring(
+//                                1,
+//                                soapActionHeader.length() - 1);
+//            }
+//            //fill up the Message Contexts
+//            msgContext.setWSAAction(soapActionHeader);
+//            msgContext.setSoapAction(soapActionHeader);
+//            msgContext.setTo(new EndpointReference(requestURI));
+            msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
+            msgContext.setServerSide(true);
+
+            SOAPEnvelope envelope = null;
+            StAXBuilder builder = null;
+            if (contentType != null) {
+                if (contentType
+                        .indexOf(HTTPConstants.HEADER_ACCEPT_MULTIPART_RELATED)
+                        > -1) {
+                    //It is MTOM
+                    builder = HTTPTransportUtils.selectBuilderForMIME(msgContext, in, contentType);
+                    envelope = (SOAPEnvelope) builder.getDocumentElement();
+                } else {
+                    Reader reader = new InputStreamReader(in);
+
+                    XMLStreamReader xmlreader;
+                    //Figure out the char set encoding and create the reader
+
+                    //If charset is not specified
+                    if (TransportUtils.getCharSetEncoding(contentType) == null) {
+                        xmlreader =
+                                XMLInputFactory
+                                        .newInstance()
+                                        .createXMLStreamReader(
+                                                in,
+                                                MessageContext.DEFAULT_CHAR_SET_ENCODING);
+                        //Set the encoding scheme in the message context
+                        msgContext.setProperty(
+                                MessageContext.CHARACTER_SET_ENCODING,
+                                MessageContext.DEFAULT_CHAR_SET_ENCODING);
+                    } else {
+                        //get the type of char encoding
+                        String charSetEnc = TransportUtils.getCharSetEncoding(contentType);
+                        xmlreader =
+                                XMLInputFactory
+                                        .newInstance()
+                                        .createXMLStreamReader(
+                                                in,
+                                                charSetEnc);
+
+                        //Setting the value in msgCtx
+                        msgContext.setProperty(
+                                MessageContext.CHARACTER_SET_ENCODING,
+                                charSetEnc);
+
+                    }
+                    if (contentType
+                            .indexOf(SOAP12Constants.SOAP_12_CONTENT_TYPE)
+                            > -1) {
+                        soap11 = false;
+                        //it is SOAP 1.2
+                        builder =
+                                new StAXSOAPModelBuilder(
+                                        xmlreader,
+                                        SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI);
+                        envelope = (SOAPEnvelope) builder.getDocumentElement();
+                    } else if (
+                            contentType.indexOf(
+                                    SOAP11Constants.SOAP_11_CONTENT_TYPE)
+                                    > -1) {
+                        soap11 = true;
+                        builder =
+                                new StAXSOAPModelBuilder(
+                                        xmlreader,
+                                        SOAP11Constants
+                                                .SOAP_ENVELOPE_NAMESPACE_URI);
+                        envelope =
+                                (SOAPEnvelope) builder.getDocumentElement();
+                    }
+
+                }
+
+            }
+
+            String charsetEncoding = builder.getDocument().getCharsetEncoding();
+            if (charsetEncoding != null && !"".equals(charsetEncoding) &&
+                    !((String) msgContext.getProperty(MessageContext.CHARACTER_SET_ENCODING))
+                            .equalsIgnoreCase(charsetEncoding)) {
+                String faultCode;
+                if (SOAP12Constants.SOAP_ENVELOPE_NAMESPACE_URI.equals(envelope.getNamespace().getName())) {
+                    faultCode = SOAP12Constants.FAULT_CODE_SENDER;
+                } else {
+                    faultCode = SOAP11Constants.FAULT_CODE_SENDER;
+                }
+                throw new AxisFault("Character Set Encoding from " +
+                        "transport information do not match with " +
+                        "character set encoding in the received SOAP message", faultCode);
+            }
+
+
+            msgContext.setEnvelope(envelope);
+            AxisEngine engine = new AxisEngine(msgContext.getConfigurationContext());
+            if (envelope.getBody().hasFault()) {
+                engine.receiveFault(msgContext);
+            } else {
+                engine.receive(msgContext);
+            }
+        } catch (SOAPProcessingException e) {
+            throw new AxisFault(e);
+
+        } catch (AxisFault e) {
+            //rethrow
+            throw e;
+        } catch (OMException e) {
+            throw new AxisFault(e);
+        } catch (XMLStreamException e) {
+            throw new AxisFault(e);
+        } catch (FactoryConfigurationError e) {
+            throw new AxisFault(e);
+        } catch (UnsupportedEncodingException e) {
+            throw new AxisFault(e);
+        } finally {
+            if (msgContext.getEnvelope() == null && !soap11) {
+                msgContext.setEnvelope(
+                        new SOAP12Factory().createSOAPEnvelope());
+            }
+
+        }
+    }
+}

Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/Subscription.java Mon Dec  5 13:38:30 2005
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import javax.jms.MessageListener;
+import java.util.HashMap;
+
+/*
+ * Subscription class holds information about a subscription
+ */
+
+public class Subscription {
+    MessageListener m_listener;
+    JMSEndpoint m_endpoint;
+    String m_messageSelector;
+    int m_ackMode;
+
+    Subscription(MessageListener listener,
+                 JMSEndpoint endpoint,
+                 HashMap properties) {
+        m_listener = listener;
+        m_endpoint = endpoint;
+        m_messageSelector = MapUtils.removeStringProperty(
+                properties,
+                JMSConstants.MESSAGE_SELECTOR,
+                null);
+        m_ackMode = MapUtils.removeIntProperty(properties,
+                JMSConstants.ACKNOWLEDGE_MODE,
+                JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
+    }
+
+    public int hashCode() {
+        return toString().hashCode();
+    }
+
+    public boolean equals(Object obj) {
+        if (obj == null || !(obj instanceof Subscription))
+            return false;
+        Subscription other = (Subscription) obj;
+        if (m_messageSelector == null) {
+            if (other.m_messageSelector != null)
+                return false;
+        } else {
+            if (other.m_messageSelector == null ||
+                    !other.m_messageSelector.equals(m_messageSelector))
+                return false;
+        }
+        return m_ackMode == other.m_ackMode &&
+                m_endpoint.equals(other.m_endpoint) &&
+                other.m_listener.equals(m_listener);
+    }
+
+    public String toString() {
+        return m_listener.toString();
+    }
+
+}
\ No newline at end of file

Added: webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java
URL: http://svn.apache.org/viewcvs/webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java?rev=354198&view=auto
==============================================================================
--- webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java (added)
+++ webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/TopicConnector.java Mon Dec  5 13:38:30 2005
@@ -0,0 +1,379 @@
+/*
+ * Copyright 2001, 2002,2004 The Apache Software Foundation.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.axis2.transport.jms;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+import java.util.HashMap;
+
+/**
+ * TopicConnector is a concrete JMSConnector subclass that specifically handles
+ * connections to topics (pub-sub domain).
+ */
+public class TopicConnector extends JMSConnector {
+    public TopicConnector(TopicConnectionFactory factory,
+                          int numRetries,
+                          int numSessions,
+                          long connectRetryInterval,
+                          long interactRetryInterval,
+                          long timeoutTime,
+                          boolean allowReceive,
+                          String clientID,
+                          String username,
+                          String password,
+                          JMSVendorAdapter adapter,
+                          JMSURLHelper jmsurl)
+            throws JMSException {
+        super(factory, numRetries, numSessions, connectRetryInterval,
+                interactRetryInterval, timeoutTime, allowReceive,
+                clientID, username, password, adapter, jmsurl);
+    }
+
+    protected javax.jms.Connection internalConnect(ConnectionFactory connectionFactory,
+                                                   String username, String password)
+            throws JMSException {
+        TopicConnectionFactory tcf = (TopicConnectionFactory) connectionFactory;
+        if (username == null)
+            return tcf.createTopicConnection();
+
+        return tcf.createTopicConnection(username, password);
+    }
+
+
+    protected SyncConnection createSyncConnection(ConnectionFactory factory,
+                                                  javax.jms.Connection connection,
+                                                  int numSessions,
+                                                  String threadName,
+                                                  String clientID,
+                                                  String username,
+                                                  String password)
+            throws JMSException {
+        return new TopicSyncConnection((TopicConnectionFactory) factory,
+                (TopicConnection) connection, numSessions,
+                threadName, clientID, username, password);
+    }
+
+    protected AsyncConnection createAsyncConnection(ConnectionFactory factory,
+                                                    javax.jms.Connection connection,
+                                                    String threadName,
+                                                    String clientID,
+                                                    String username,
+                                                    String password)
+            throws JMSException {
+        return new TopicAsyncConnection((TopicConnectionFactory) factory,
+                (TopicConnection) connection, threadName,
+                clientID, username, password);
+    }
+
+    public JMSEndpoint createEndpoint(String destination) {
+        return new TopicEndpoint(destination);
+    }
+
+    /**
+     * Create an endpoint for a queue destination.
+     *
+     * @param destination
+     * @return
+     * @throws JMSException
+     */
+    public JMSEndpoint createEndpoint(Destination destination)
+            throws JMSException {
+        if (!(destination instanceof Topic))
+            throw new IllegalArgumentException("The input be a topic for this connector");
+        return new TopicDestinationEndpoint((Topic) destination);
+    }
+
+    private TopicSession createTopicSession(TopicConnection connection, int ackMode)
+            throws JMSException {
+        return connection.createTopicSession(false,
+                ackMode);
+    }
+
+    private Topic createTopic(TopicSession session, String subject)
+            throws Exception {
+        return m_adapter.getTopic(session, subject);
+    }
+
+    private TopicSubscriber createSubscriber(TopicSession session,
+                                             TopicSubscription subscription)
+            throws Exception {
+        if (subscription.isDurable())
+            return createDurableSubscriber(session,
+                    (Topic) subscription.m_endpoint.getDestination(session),
+                    subscription.m_subscriptionName,
+                    subscription.m_messageSelector,
+                    subscription.m_noLocal);
+        else
+            return createSubscriber(session,
+                    (Topic) subscription.m_endpoint.getDestination(session),
+                    subscription.m_messageSelector,
+                    subscription.m_noLocal);
+    }
+
+    private TopicSubscriber createDurableSubscriber(TopicSession session,
+                                                    Topic topic,
+                                                    String subscriptionName,
+                                                    String messageSelector,
+                                                    boolean noLocal)
+            throws JMSException {
+        return session.createDurableSubscriber(topic, subscriptionName,
+                messageSelector, noLocal);
+    }
+
+    private TopicSubscriber createSubscriber(TopicSession session,
+                                             Topic topic,
+                                             String messageSelector,
+                                             boolean noLocal)
+            throws JMSException {
+        return session.createSubscriber(topic, messageSelector, noLocal);
+    }
+
+
+    private final class TopicAsyncConnection extends AsyncConnection {
+
+        TopicAsyncConnection(TopicConnectionFactory connectionFactory,
+                             TopicConnection connection,
+                             String threadName,
+                             String clientID,
+                             String username,
+                             String password)
+
+                throws JMSException {
+            super(connectionFactory, connection, threadName,
+                    clientID, username, password);
+        }
+
+        protected ListenerSession createListenerSession(javax.jms.Connection connection,
+                                                        Subscription subscription)
+                throws Exception {
+            TopicSession session = createTopicSession((TopicConnection) connection,
+                    subscription.m_ackMode);
+            TopicSubscriber subscriber = createSubscriber(session,
+                    (TopicSubscription) subscription);
+            return new TopicListenerSession(session, subscriber,
+                    (TopicSubscription) subscription);
+        }
+
+        private final class TopicListenerSession extends ListenerSession {
+
+            TopicListenerSession(TopicSession session,
+                                 TopicSubscriber subscriber,
+                                 TopicSubscription subscription)
+                    throws Exception {
+                super(session, subscriber, subscription);
+            }
+
+            void cleanup() {
+                try {
+                    m_consumer.close();
+                } catch (Exception ignore) {
+                }
+                try {
+                    TopicSubscription sub = (TopicSubscription) m_subscription;
+                    if (sub.isDurable() && sub.m_unsubscribe) {
+                        ((TopicSession) m_session).unsubscribe(sub.m_subscriptionName);
+                    }
+                }
+                catch (Exception ignore) {
+                }
+                try {
+                    m_session.close();
+                } catch (Exception ignore) {
+                }
+
+            }
+        }
+    }
+
+    private final class TopicSyncConnection extends SyncConnection {
+        TopicSyncConnection(TopicConnectionFactory connectionFactory,
+                            TopicConnection connection,
+                            int numSessions,
+                            String threadName,
+                            String clientID,
+                            String username,
+                            String password)
+
+                throws JMSException {
+            super(connectionFactory, connection, numSessions, threadName,
+                    clientID, username, password);
+        }
+
+        protected SendSession createSendSession(javax.jms.Connection connection)
+                throws JMSException {
+            TopicSession session = createTopicSession((TopicConnection) connection,
+                    JMSConstants.DEFAULT_ACKNOWLEDGE_MODE);
+            TopicPublisher publisher = session.createPublisher(null);
+            return new TopicSendSession(session, publisher);
+        }
+
+        private final class TopicSendSession extends SendSession {
+            TopicSendSession(TopicSession session,
+                             TopicPublisher publisher)
+                    throws JMSException {
+                super(session, publisher);
+            }
+
+
+            protected MessageConsumer createConsumer(Destination destination)
+                    throws JMSException {
+                return createSubscriber((TopicSession) m_session, (Topic) destination,
+                        null, JMSConstants.DEFAULT_NO_LOCAL);
+            }
+
+            protected void deleteTemporaryDestination(Destination destination)
+                    throws JMSException {
+                ((TemporaryTopic) destination).delete();
+            }
+
+
+            protected Destination createTemporaryDestination()
+                    throws JMSException {
+                return ((TopicSession) m_session).createTemporaryTopic();
+            }
+
+            protected void send(Destination destination, Message message,
+                                int deliveryMode, int priority, long timeToLive)
+                    throws JMSException {
+                ((TopicPublisher) m_producer).publish((Topic) destination, message,
+                        deliveryMode, priority, timeToLive);
+            }
+
+        }
+    }
+
+
+    private class TopicEndpoint
+            extends JMSEndpoint {
+        String m_topicName;
+
+        TopicEndpoint(String topicName) {
+            super(TopicConnector.this);
+            m_topicName = topicName;
+        }
+
+        Destination getDestination(Session session)
+                throws Exception {
+            return createTopic((TopicSession) session, m_topicName);
+        }
+
+        protected Subscription createSubscription(MessageListener listener,
+                                                  HashMap properties) {
+            return new TopicSubscription(listener, this, properties);
+        }
+
+        public String toString() {
+            StringBuffer buffer = new StringBuffer("TopicEndpoint:");
+            buffer.append(m_topicName);
+            return buffer.toString();
+        }
+
+        public boolean equals(Object object) {
+            if (!super.equals(object))
+                return false;
+
+            if (!(object instanceof TopicEndpoint))
+                return false;
+
+            return m_topicName.equals(((TopicEndpoint) object).m_topicName);
+        }
+    }
+
+    private final class TopicSubscription extends Subscription {
+        String m_subscriptionName;
+        boolean m_unsubscribe;
+        boolean m_noLocal;
+
+        TopicSubscription(MessageListener listener,
+                          JMSEndpoint endpoint,
+                          HashMap properties) {
+            super(listener, endpoint, properties);
+            m_subscriptionName = MapUtils.removeStringProperty(properties,
+                    JMSConstants.SUBSCRIPTION_NAME,
+                    null);
+            m_unsubscribe = MapUtils.removeBooleanProperty(properties,
+                    JMSConstants.UNSUBSCRIBE,
+                    JMSConstants.DEFAULT_UNSUBSCRIBE);
+            m_noLocal = MapUtils.removeBooleanProperty(properties,
+                    JMSConstants.NO_LOCAL,
+                    JMSConstants.DEFAULT_NO_LOCAL);
+        }
+
+        boolean isDurable() {
+            return m_subscriptionName != null;
+        }
+
+        public boolean equals(Object obj) {
+            if (!super.equals(obj))
+                return false;
+            if (!(obj instanceof TopicSubscription))
+                return false;
+
+            TopicSubscription other = (TopicSubscription) obj;
+            if (other.m_unsubscribe != m_unsubscribe || other.m_noLocal != m_noLocal)
+                return false;
+
+            if (isDurable()) {
+                return other.isDurable() && other.m_subscriptionName.equals(m_subscriptionName);
+            } else if (other.isDurable())
+                return false;
+            else
+                return true;
+        }
+
+        public String toString() {
+            StringBuffer buffer = new StringBuffer(super.toString());
+            buffer.append(":").append(m_noLocal).append(":").append(m_unsubscribe);
+            if (isDurable()) {
+                buffer.append(":");
+                buffer.append(m_subscriptionName);
+            }
+            return buffer.toString();
+        }
+
+    }
+
+    private final class TopicDestinationEndpoint
+            extends TopicEndpoint {
+        Topic m_topic;
+
+        TopicDestinationEndpoint(Topic topic)
+                throws JMSException {
+            super(topic.getTopicName());
+            m_topic = topic;
+        }
+
+        Destination getDestination(Session session) {
+            return m_topic;
+        }
+
+    }
+
+
+}
\ No newline at end of file



Re: svn commit: r354198 [3/3] - /webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/

Posted by Davanum Srinivas <da...@gmail.com>.
yep. am still trying to get something to work before i start cleaning up.

-- dims

On 12/6/05, Sanjiva Weerawarana <sa...@opensource.lk> wrote:
> Hi Dims,
>
> I know you said this is the initial cut .. but .. :):
>
> > +    public static void processJMSRequest(
> > +            MessageContext msgContext,
> > +            InputStream in,
> > +            OutputStream out,
> > +            String contentType
> > +    )
> > +            throws AxisFault {
> > +        boolean soap11 = false;
> > +        try {
> > +
> > +//            //remove the starting and trailing " from the SOAP Action
> > +//            if (soapActionHeader != null
> > +//                    && soapActionHeader.startsWith("\"")
> > +//                    && soapActionHeader.endsWith("\"")) {
> > +//
> > +//                soapActionHeader =
> > +//                        soapActionHeader.substring(
> > +//                                1,
> > +//                                soapActionHeader.length() - 1);
> > +//            }
> > +//            //fill up the Message Contexts
> > +//            msgContext.setWSAAction(soapActionHeader);
> > +//            msgContext.setSoapAction(soapActionHeader);
> > +//            msgContext.setTo(new EndpointReference(requestURI));
> > +            msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
> > +            msgContext.setServerSide(true);
> > +
> > +            SOAPEnvelope envelope = null;
> > +            StAXBuilder builder = null;
> ...
>
> Can this code not be shared with HTTP?
>
> Sanjiva.
>
>
>


--
Davanum Srinivas : http://wso2.com/blogs/

Re: svn commit: r354198 [3/3] - /webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/

Posted by Davanum Srinivas <da...@gmail.com>.
yep. am still trying to get something to work before i start cleaning up.

-- dims

On 12/6/05, Sanjiva Weerawarana <sa...@opensource.lk> wrote:
> Hi Dims,
>
> I know you said this is the initial cut .. but .. :):
>
> > +    public static void processJMSRequest(
> > +            MessageContext msgContext,
> > +            InputStream in,
> > +            OutputStream out,
> > +            String contentType
> > +    )
> > +            throws AxisFault {
> > +        boolean soap11 = false;
> > +        try {
> > +
> > +//            //remove the starting and trailing " from the SOAP Action
> > +//            if (soapActionHeader != null
> > +//                    && soapActionHeader.startsWith("\"")
> > +//                    && soapActionHeader.endsWith("\"")) {
> > +//
> > +//                soapActionHeader =
> > +//                        soapActionHeader.substring(
> > +//                                1,
> > +//                                soapActionHeader.length() - 1);
> > +//            }
> > +//            //fill up the Message Contexts
> > +//            msgContext.setWSAAction(soapActionHeader);
> > +//            msgContext.setSoapAction(soapActionHeader);
> > +//            msgContext.setTo(new EndpointReference(requestURI));
> > +            msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
> > +            msgContext.setServerSide(true);
> > +
> > +            SOAPEnvelope envelope = null;
> > +            StAXBuilder builder = null;
> ...
>
> Can this code not be shared with HTTP?
>
> Sanjiva.
>
>
>


--
Davanum Srinivas : http://wso2.com/blogs/

Re: svn commit: r354198 [3/3] - /webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/

Posted by Sanjiva Weerawarana <sa...@opensource.lk>.
Hi Dims,

I know you said this is the initial cut .. but .. :):

> +    public static void processJMSRequest(
> +            MessageContext msgContext,
> +            InputStream in,
> +            OutputStream out,
> +            String contentType
> +    )
> +            throws AxisFault {
> +        boolean soap11 = false;
> +        try {
> +
> +//            //remove the starting and trailing " from the SOAP Action
> +//            if (soapActionHeader != null
> +//                    && soapActionHeader.startsWith("\"")
> +//                    && soapActionHeader.endsWith("\"")) {
> +//
> +//                soapActionHeader =
> +//                        soapActionHeader.substring(
> +//                                1,
> +//                                soapActionHeader.length() - 1);
> +//            }
> +//            //fill up the Message Contexts
> +//            msgContext.setWSAAction(soapActionHeader);
> +//            msgContext.setSoapAction(soapActionHeader);
> +//            msgContext.setTo(new EndpointReference(requestURI));
> +            msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
> +            msgContext.setServerSide(true);
> +
> +            SOAPEnvelope envelope = null;
> +            StAXBuilder builder = null;
...

Can this code not be shared with HTTP?

Sanjiva.



Re: svn commit: r354198 [3/3] - /webservices/axis2/trunk/java/modules/core/src/org/apache/axis2/transport/jms/

Posted by Sanjiva Weerawarana <sa...@opensource.lk>.
Hi Dims,

I know you said this is the initial cut .. but .. :):

> +    public static void processJMSRequest(
> +            MessageContext msgContext,
> +            InputStream in,
> +            OutputStream out,
> +            String contentType
> +    )
> +            throws AxisFault {
> +        boolean soap11 = false;
> +        try {
> +
> +//            //remove the starting and trailing " from the SOAP Action
> +//            if (soapActionHeader != null
> +//                    && soapActionHeader.startsWith("\"")
> +//                    && soapActionHeader.endsWith("\"")) {
> +//
> +//                soapActionHeader =
> +//                        soapActionHeader.substring(
> +//                                1,
> +//                                soapActionHeader.length() - 1);
> +//            }
> +//            //fill up the Message Contexts
> +//            msgContext.setWSAAction(soapActionHeader);
> +//            msgContext.setSoapAction(soapActionHeader);
> +//            msgContext.setTo(new EndpointReference(requestURI));
> +            msgContext.setProperty(MessageContext.TRANSPORT_OUT, out);
> +            msgContext.setServerSide(true);
> +
> +            SOAPEnvelope envelope = null;
> +            StAXBuilder builder = null;
...

Can this code not be shared with HTTP?

Sanjiva.