You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2013/08/26 17:52:05 UTC
svn commit: r1517577 - in /cxf/branches/2.7.x-fixes:
rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/
rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/
testutils/src/main/resources/wsdl/
Author: dkulp
Date: Mon Aug 26 15:52:04 2013
New Revision: 1517577
URL: http://svn.apache.org/r1517577
Log:
Merged revisions 1517549 via git cherry-pick from
https://svn.apache.org/repos/asf/cxf/trunk
........
r1517549 | dkulp | 2013-08-26 10:44:47 -0400 (Mon, 26 Aug 2013) | 2 lines
[CXF-5233] Allow responses to TOPIC's. In the case of multiple services listening on a TOPIC, the client may get multiple responses (it will really just get the first one delivered to it), but it should be an allowable usecase as you could have a single service on the TOPIC
........
Added:
cxf/branches/2.7.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
Modified:
cxf/branches/2.7.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
cxf/branches/2.7.x-fixes/testutils/src/main/resources/wsdl/jms_spec_testsuite.wsdl
Modified: cxf/branches/2.7.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.7.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=1517577&r1=1517576&r2=1517577&view=diff
==============================================================================
--- cxf/branches/2.7.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/branches/2.7.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Mon Aug 26 15:52:04 2013
@@ -281,15 +281,7 @@ public class JMSDestination extends Abst
}
Message inMessage = exchange.getInMessage();
final Message outMessage = exchange.getOutMessage();
- if (jmsConfig.isPubSubDomain()) {
- // we will never receive a non-oneway invocation in pub-sub
- // domain from CXF client - however a mis-behaving pure JMS
- // client could conceivably make suce an invocation, in which
- // case we silently discard the reply
- getLogger().log(Level.WARNING, "discarding reply for non-oneway invocation ",
- "with 'topic' destinationStyle");
- return;
- }
+
try {
final JMSMessageHeadersType messageProperties = (JMSMessageHeadersType)outMessage
.get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
Added: cxf/branches/2.7.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.7.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java?rev=1517577&view=auto
==============================================================================
--- cxf/branches/2.7.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java (added)
+++ cxf/branches/2.7.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java Mon Aug 26 15:52:04 2013
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.transport.jms;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Reader;
+import java.io.StringReader;
+
+import javax.jms.DeliveryMode;
+
+import org.apache.cxf.helpers.IOUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.ExchangeImpl;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
+import org.apache.cxf.transport.Conduit;
+import org.apache.cxf.transport.MessageObserver;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class RequestResponseTest extends AbstractJMSTester {
+ private static final int MAX_RECEIVE_TIME = 10;
+
+ public RequestResponseTest() {
+ }
+
+ @BeforeClass
+ public static void createAndStartBroker() throws Exception {
+ startBroker(new JMSBrokerSetup("tcp://localhost:" + JMS_PORT));
+ }
+
+ private void waitForReceiveInMessage() {
+ int waitTime = 0;
+ while (inMessage == null && waitTime < MAX_RECEIVE_TIME) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // do nothing here
+ }
+ waitTime++;
+ }
+ assertTrue("Can't receive the Conduit Message in " + MAX_RECEIVE_TIME + " seconds",
+ inMessage != null);
+ }
+
+ private JMSDestination setupJMSDestination(boolean send) throws IOException {
+
+ adjustEndpointInfoURL();
+ JMSConfiguration jmsConfig = new JMSOldConfigHolder()
+ .createJMSConfigurationFromEndpointInfo(bus, endpointInfo, null, false);
+
+ JMSDestination jmsDestination = new JMSDestination(bus, endpointInfo, jmsConfig);
+
+ if (send) {
+ // setMessageObserver
+ observer = new MessageObserver() {
+ public void onMessage(Message m) {
+ Exchange exchange = new ExchangeImpl();
+ exchange.setInMessage(m);
+ m.setExchange(exchange);
+ }
+ };
+ jmsDestination.setMessageObserver(observer);
+ }
+ return jmsDestination;
+ }
+
+ private void setupMessageHeader(Message outMessage, String correlationId, String replyTo) {
+ JMSMessageHeadersType header = new JMSMessageHeadersType();
+ header.setJMSCorrelationID(correlationId);
+ header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+ header.setJMSPriority(1);
+ header.setTimeToLive(1000);
+ header.setJMSReplyTo(replyTo != null ? replyTo : null);
+ outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
+ outMessage.put(Message.ENCODING, "US-ASCII");
+ }
+
+ private void setupMessageHeader(Message outMessage, String correlationId) {
+ setupMessageHeader(outMessage, correlationId, null);
+ }
+
+ private void verifyReceivedMessage(Message message) {
+ ByteArrayInputStream bis = (ByteArrayInputStream)message.getContent(InputStream.class);
+ String response = "<not found>";
+ if (bis != null) {
+ byte bytes[] = new byte[bis.available()];
+ try {
+ bis.read(bytes);
+ } catch (IOException ex) {
+ assertFalse("Read the Destination recieved Message error ", false);
+ ex.printStackTrace();
+ }
+ response = IOUtils.newStringFromBytes(bytes);
+ } else {
+ StringReader reader = (StringReader)message.getContent(Reader.class);
+ char buffer[] = new char[5000];
+ try {
+ int i = reader.read(buffer);
+ response = new String(buffer, 0 , i);
+ } catch (IOException e) {
+ assertFalse("Read the Destination recieved Message error ", false);
+ e.printStackTrace();
+ }
+ }
+ assertEquals("The response content should be equal", AbstractJMSTester.MESSAGE_CONTENT, response);
+ }
+
+ private void verifyHeaders(Message msgIn, Message msgOut) {
+ JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
+ .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+
+ JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
+ .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+
+ verifyJmsHeaderEquality(outHeader, inHeader);
+
+ }
+
+ private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) {
+ /*
+ * if (outHeader.getJMSCorrelationID() != null) { // only check if the correlation id was explicitly
+ * set as // otherwise the in header will contain an automatically // generated correlation id
+ * assertEquals("The inMessage and outMessage JMS Header's CorrelationID should be equals", outHeader
+ * .getJMSCorrelationID(), inHeader.getJMSCorrelationID()); }
+ */
+ assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader
+ .getJMSPriority(), inHeader.getJMSPriority());
+ assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader
+ .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode());
+ assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader
+ .getJMSType(), inHeader.getJMSType());
+ }
+
+
+ @Test
+ public void testRequestQueueResponseDynamicQueue() throws Exception {
+ setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+ "JMSSimpleService002X", "SimplePortQueueRequest");
+ sendAndReceiveMessages();
+ }
+
+ @Test
+ public void testRequestQueueResponseStaticQueue() throws Exception {
+ setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+ "JMSSimpleService002X", "SimplePortQueueRequestQueueResponse");
+ sendAndReceiveMessages();
+ }
+
+ @Test
+ public void testRequestQueueResponseTopic() throws Exception {
+ setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+ "JMSSimpleService002X", "SimplePortQueueRequestTopicResponse");
+ sendAndReceiveMessages();
+ }
+
+ @Test
+ public void testRequestTopicResponseDynamicQueue() throws Exception {
+ setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+ "JMSSimpleService002X", "SimplePortTopicRequest");
+ sendAndReceiveMessages();
+ }
+
+ @Test
+ public void testRequestTopicResponseStaticQueue() throws Exception {
+ setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+ "JMSSimpleService002X", "SimplePortTopicRequestQueueResponse");
+ sendAndReceiveMessages();
+ }
+
+ @Test
+ public void testRequestTopicResponseTopic() throws Exception {
+ setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
+ "JMSSimpleService002X", "SimplePortTopicRequestTopicResponse");
+ sendAndReceiveMessages();
+ }
+
+ protected void sendAndReceiveMessages() throws IOException {
+ // set up the conduit send to be true
+ JMSConduit conduit = setupJMSConduit(true, false);
+ final Message outMessage = new MessageImpl();
+ setupMessageHeader(outMessage, null);
+ final JMSDestination destination = setupJMSDestination(false);
+
+ // set up MessageObserver for handling the conduit message
+ MessageObserver observer = new MessageObserver() {
+ public void onMessage(Message m) {
+ Exchange exchange = new ExchangeImpl();
+ exchange.setInMessage(m);
+ m.setExchange(exchange);
+ verifyReceivedMessage(m);
+ verifyHeaders(m, outMessage);
+ // setup the message for
+ Conduit backConduit;
+ try {
+ backConduit = destination.getBackChannel(m, null, null);
+ // wait for the message to be got from the conduit
+ Message replyMessage = new MessageImpl();
+ sendoutMessage(backConduit, replyMessage, true);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ destination.setMessageObserver(observer);
+ // set is oneway false for get response from destination
+ sendoutMessage(conduit, outMessage, false);
+ // wait for the message to be got from the destination,
+ // create the thread to handler the Destination incoming message
+
+ waitForReceiveInMessage();
+ verifyReceivedMessage(inMessage);
+ // wait for a while for the jms session recycling
+
+ inMessage = null;
+ // Send a second message to check for an issue
+ // Where the session was closed the second time
+ sendoutMessage(conduit, outMessage, false);
+ waitForReceiveInMessage();
+ verifyReceivedMessage(inMessage);
+
+ conduit.close();
+ destination.shutdown();
+ }
+
+
+}
Modified: cxf/branches/2.7.x-fixes/testutils/src/main/resources/wsdl/jms_spec_testsuite.wsdl
URL: http://svn.apache.org/viewvc/cxf/branches/2.7.x-fixes/testutils/src/main/resources/wsdl/jms_spec_testsuite.wsdl?rev=1517577&r1=1517576&r2=1517577&view=diff
==============================================================================
--- cxf/branches/2.7.x-fixes/testutils/src/main/resources/wsdl/jms_spec_testsuite.wsdl (original)
+++ cxf/branches/2.7.x-fixes/testutils/src/main/resources/wsdl/jms_spec_testsuite.wsdl Mon Aug 26 15:52:04 2013
@@ -232,6 +232,30 @@
<soap:address location="jms:jndi:dynamicQueues/testqueue0001?jndiInitialContextFactory=org.apache.activemq.jndi.ActiveMQInitialContextFactory&jndiConnectionFactoryName=ConnectionFactory&jndiURL=tcp://localhost:61500"/>
</wsdl:port>
</wsdl:service>
+
+ <wsdl:service name="JMSSimpleService002X">
+ <soapjms:jndiConnectionFactoryName>ConnectionFactory</soapjms:jndiConnectionFactoryName>
+ <soapjms:jndiInitialContextFactory>org.apache.activemq.jndi.ActiveMQInitialContextFactory</soapjms:jndiInitialContextFactory>
+ <soapjms:jndiURL>tcp://localhost:9001</soapjms:jndiURL>
+ <wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortQueueRequest">
+ <soap:address location="jms:queue:my.test.queue.request21"/>
+ </wsdl:port>
+ <wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortQueueRequestQueueResponse">
+ <soap:address location="jms:queue:my.test.queue.request22?replyToName=my.test.queue.response22"/>
+ </wsdl:port>
+ <wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortQueueRequestTopicResponse">
+ <soap:address location="jms:queue:my.test.queue.request23?topicReplyToName=my.test.topic.response23"/>
+ </wsdl:port>
+ <wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortTopicRequest">
+ <soap:address location="jms:topic:my.test.topic.request24"/>
+ </wsdl:port>
+ <wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortTopicRequestQueueResponse">
+ <soap:address location="jms:topic:my.test.topic.request25?replyToName=my.test.queue.response25"/>
+ </wsdl:port>
+ <wsdl:port binding="tns:JMSSimplePortBinding" name="SimplePortTopicRequestTopicResponse">
+ <soap:address location="jms:topic:my.test.topic.request26?topicReplyToName=my.test.topic.response26"/>
+ </wsdl:port>
+ </wsdl:service>
<wsdl:service name="JMSSimpleService0101">
<wsdl:port binding="tns:JMSSimpleSOAP12PortBinding" name="SimplePort">