You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2013/08/26 17:52:05 UTC

svn commit: r1517577 - in /cxf/branches/2.7.x-fixes: rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ testutils/src/main/resources/wsdl/

Author: dkulp
Date: Mon Aug 26 15:52:04 2013
New Revision: 1517577

URL: http://svn.apache.org/r1517577
Log:
Merged revisions 1517549 via  git cherry-pick from
https://svn.apache.org/repos/asf/cxf/trunk

........
  r1517549 | dkulp | 2013-08-26 10:44:47 -0400 (Mon, 26 Aug 2013) | 2 lines

  [CXF-5233] Allow responses to TOPIC's.  In the case of multiple services listening on a TOPIC, the client may get multiple responses (it will really just get the first one delivered to it), but it should be an allowable usecase as you could have a single service on the TOPIC

........

Added:
    cxf/branches/2.7.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
Modified:
    cxf/branches/2.7.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/branches/2.7.x-fixes/testutils/src/main/resources/wsdl/jms_spec_testsuite.wsdl

Modified: cxf/branches/2.7.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.7.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=1517577&r1=1517576&r2=1517577&view=diff
==============================================================================
--- cxf/branches/2.7.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/branches/2.7.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Mon Aug 26 15:52:04 2013
@@ -281,15 +281,7 @@ public class JMSDestination extends Abst
         }
         Message inMessage = exchange.getInMessage();
         final Message outMessage = exchange.getOutMessage();
-        if (jmsConfig.isPubSubDomain()) {
-            // we will never receive a non-oneway invocation in pub-sub
-            // domain from CXF client - however a mis-behaving pure JMS
-            // client could conceivably make suce an invocation, in which
-            // case we silently discard the reply
-            getLogger().log(Level.WARNING, "discarding reply for non-oneway invocation ",
-                            "with 'topic' destinationStyle");
-            return;
-        }
+
         try {
             final JMSMessageHeadersType messageProperties = (JMSMessageHeadersType)outMessage
                 .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);

Added: cxf/branches/2.7.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.7.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java?rev=1517577&view=auto
==============================================================================
--- cxf/branches/2.7.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java (added)
+++ cxf/branches/2.7.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java Mon Aug 26 15:52:04 2013
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.StringReader;
+
+import javax.jms.DeliveryMode;
+
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.MessageObserver;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RequestResponseTest extends AbstractJMSTester {
+    private static final int MAX_RECEIVE_TIME = 10;
+
+    public RequestResponseTest() {
+    }
+    
+    @BeforeClass
+    public static void createAndStartBroker() throws Exception {
+        startBroker(new JMSBrokerSetup("tcp://localhost:" + JMS_PORT));
+    }
+
+    private void waitForReceiveInMessage() {
+        int waitTime = 0;
+        while (inMessage == null && waitTime < MAX_RECEIVE_TIME) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                // do nothing here
+            }
+            waitTime++;
+        }
+        assertTrue("Can't receive the Conduit Message in " + MAX_RECEIVE_TIME + " seconds",
+                   inMessage != null);
+    }
+
+    private JMSDestination setupJMSDestination(boolean send) throws IOException {
+
+        adjustEndpointInfoURL();
+        JMSConfiguration jmsConfig = new JMSOldConfigHolder()
+            .createJMSConfigurationFromEndpointInfo(bus, endpointInfo, null, false);
+        
+        JMSDestination jmsDestination = new JMSDestination(bus, endpointInfo, jmsConfig);
+
+        if (send) {
+            // setMessageObserver
+            observer = new MessageObserver() {
+                public void onMessage(Message m) {
+                    Exchange exchange = new ExchangeImpl();
+                    exchange.setInMessage(m);
+                    m.setExchange(exchange);
+                }
+            };
+            jmsDestination.setMessageObserver(observer);
+        }
+        return jmsDestination;
+    }
+    
+    private void setupMessageHeader(Message outMessage, String correlationId, String replyTo) {
+        JMSMessageHeadersType header = new JMSMessageHeadersType();
+        header.setJMSCorrelationID(correlationId);
+        header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+        header.setJMSPriority(1);
+        header.setTimeToLive(1000);
+        header.setJMSReplyTo(replyTo != null ? replyTo : null);
+        outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
+        outMessage.put(Message.ENCODING, "US-ASCII");
+    }
+
+    private void setupMessageHeader(Message outMessage, String correlationId) {
+        setupMessageHeader(outMessage, correlationId, null);
+    }
+
+    private void verifyReceivedMessage(Message message) {
+        ByteArrayInputStream bis = (ByteArrayInputStream)message.getContent(InputStream.class);
+        String response = "<not found>";
+        if (bis != null) {
+            byte bytes[] = new byte[bis.available()];
+            try {
+                bis.read(bytes);
+            } catch (IOException ex) {
+                assertFalse("Read the Destination recieved Message error ", false);
+                ex.printStackTrace();
+            }
+            response = IOUtils.newStringFromBytes(bytes);
+        } else {
+            StringReader reader = (StringReader)message.getContent(Reader.class);
+            char buffer[] = new char[5000];
+            try {
+                int i = reader.read(buffer);
+                response = new String(buffer, 0 , i);
+            } catch (IOException e) {
+                assertFalse("Read the Destination recieved Message error ", false);
+                e.printStackTrace();
+            }
+        }
+        assertEquals("The response content should be equal", AbstractJMSTester.MESSAGE_CONTENT, response);
+    }
+
+    private void verifyHeaders(Message msgIn, Message msgOut) {
+        JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
+            .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+
+        JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
+            .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+
+        verifyJmsHeaderEquality(outHeader, inHeader);
+
+    }
+
+    private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) {
+        /*
+         * if (outHeader.getJMSCorrelationID() != null) { // only check if the correlation id was explicitly
+         * set as // otherwise the in header will contain an automatically // generated correlation id
+         * assertEquals("The inMessage and outMessage JMS Header's CorrelationID should be equals", outHeader
+         * .getJMSCorrelationID(), inHeader.getJMSCorrelationID()); }
+         */
+        assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader
+            .getJMSPriority(), inHeader.getJMSPriority());
+        assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader
+                     .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode());
+        assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader
+            .getJMSType(), inHeader.getJMSType());
+    }
+
+    
+    @Test
+    public void testRequestQueueResponseDynamicQueue() throws Exception {
+        setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+                         "JMSSimpleService002X", "SimplePortQueueRequest");
+        sendAndReceiveMessages();
+    }
+    
+    @Test
+    public void testRequestQueueResponseStaticQueue() throws Exception {
+        setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+                         "JMSSimpleService002X", "SimplePortQueueRequestQueueResponse");
+        sendAndReceiveMessages();
+    }
+    
+    @Test
+    public void testRequestQueueResponseTopic() throws Exception {
+        setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+                         "JMSSimpleService002X", "SimplePortQueueRequestTopicResponse");
+        sendAndReceiveMessages();
+    }
+    
+    @Test
+    public void testRequestTopicResponseDynamicQueue() throws Exception {
+        setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+                         "JMSSimpleService002X", "SimplePortTopicRequest");
+        sendAndReceiveMessages();
+    }
+    
+    @Test
+    public void testRequestTopicResponseStaticQueue() throws Exception {
+        setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+                         "JMSSimpleService002X", "SimplePortTopicRequestQueueResponse");
+        sendAndReceiveMessages();
+    }
+    
+    @Test
+    public void testRequestTopicResponseTopic() throws Exception {
+        setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+                         "JMSSimpleService002X", "SimplePortTopicRequestTopicResponse");
+        sendAndReceiveMessages();
+    }
+
+    protected void sendAndReceiveMessages() throws IOException {
+        // set up the conduit send to be true
+        JMSConduit conduit = setupJMSConduit(true, false);
+        final Message outMessage = new MessageImpl();
+        setupMessageHeader(outMessage, null);
+        final JMSDestination destination = setupJMSDestination(false);
+        
+        // set up MessageObserver for handling the conduit message
+        MessageObserver observer = new MessageObserver() {
+            public void onMessage(Message m) {
+                Exchange exchange = new ExchangeImpl();
+                exchange.setInMessage(m);
+                m.setExchange(exchange);
+                verifyReceivedMessage(m);
+                verifyHeaders(m, outMessage);
+                // setup the message for
+                Conduit backConduit;
+                try {
+                    backConduit = destination.getBackChannel(m, null, null);
+                    // wait for the message to be got from the conduit
+                    Message replyMessage = new MessageImpl();
+                    sendoutMessage(backConduit, replyMessage, true);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+        destination.setMessageObserver(observer);
+        // set is oneway false for get response from destination
+        sendoutMessage(conduit, outMessage, false);
+        // wait for the message to be got from the destination,
+        // create the thread to handler the Destination incoming message
+
+        waitForReceiveInMessage();
+        verifyReceivedMessage(inMessage);
+        // wait for a while for the jms session recycling
+
+        inMessage = null;
+        // Send a second message to check for an issue
+        // Where the session was closed the second time
+        sendoutMessage(conduit, outMessage, false);
+        waitForReceiveInMessage();
+        verifyReceivedMessage(inMessage);
+
+        conduit.close();
+        destination.shutdown();
+    }
+
+
+}

Modified: cxf/branches/2.7.x-fixes/testutils/src/main/resources/wsdl/jms_spec_testsuite.wsdl
URL: http://svn.apache.org/viewvc/cxf/branches/2.7.x-fixes/testutils/src/main/resources/wsdl/jms_spec_testsuite.wsdl?rev=1517577&r1=1517576&r2=1517577&view=diff
==============================================================================
--- cxf/branches/2.7.x-fixes/testutils/src/main/resources/wsdl/jms_spec_testsuite.wsdl (original)
+++ cxf/branches/2.7.x-fixes/testutils/src/main/resources/wsdl/jms_spec_testsuite.wsdl Mon Aug 26 15:52:04 2013
@@ -232,6 +232,30 @@
             <soap:address location="jms:jndi:dynamicQueues/testqueue0001?jndiInitialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory&amp;jndiConnectionFactoryName=ConnectionFactory&amp;jndiURL=tcp://localhost:61500"/>
         </wsdl:port>
     </wsdl:service>
+	
+	<wsdl:service name="JMSSimpleService002X">
+	    <soapjms:jndiConnectionFactoryName>ConnectionFactory</soapjms:jndiConnectionFactoryName>
+        <soapjms:jndiInitialContextFactory>org.apache.activemq.jndi.ActiveMQInitialContextFactory</soapjms:jndiInitialContextFactory>
+        <soapjms:jndiURL>tcp://localhost:9001</soapjms:jndiURL>
+        <wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortQueueRequest">
+            <soap:address location="jms:queue:my.test.queue.request21"/>
+        </wsdl:port>
+		<wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortQueueRequestQueueResponse">
+            <soap:address location="jms:queue:my.test.queue.request22?replyToName=my.test.queue.response22"/>
+        </wsdl:port>
+		<wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortQueueRequestTopicResponse">
+            <soap:address location="jms:queue:my.test.queue.request23?topicReplyToName=my.test.topic.response23"/>
+        </wsdl:port>
+		<wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortTopicRequest">
+            <soap:address location="jms:topic:my.test.topic.request24"/>
+        </wsdl:port>
+		<wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortTopicRequestQueueResponse">
+            <soap:address location="jms:topic:my.test.topic.request25?replyToName=my.test.queue.response25"/>
+        </wsdl:port>
+        <wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortTopicRequestTopicResponse">
+            <soap:address location="jms:topic:my.test.topic.request26?topicReplyToName=my.test.topic.response26"/>
+        </wsdl:port>
+    </wsdl:service>
     
     <wsdl:service name="JMSSimpleService0101">
         <wsdl:port binding="tns:JMSSimpleSOAP12PortBinding" name="SimplePort">