You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by cs...@apache.org on 2014/02/05 22:58:24 UTC

svn commit: r1564952 [4/4] - in /cxf/trunk: parent/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ rt/transp...

Added: cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSSharedQueueTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSSharedQueueTest.java?rev=1564952&view=auto
==============================================================================
--- cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSSharedQueueTest.java (added)
+++ cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSSharedQueueTest.java Wed Feb  5 21:58:23 2014
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.systest.jms;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.xml.namespace.QName;
+import javax.xml.ws.BindingProvider;
+
+import org.apache.cxf.hello_world_jms.HelloWorldPortType;
+import org.apache.cxf.hello_world_jms.HelloWorldServiceAppCorrelationID;
+import org.apache.cxf.hello_world_jms.HelloWorldServiceAppCorrelationIDNoPrefix;
+import org.apache.cxf.hello_world_jms.HelloWorldServiceAppCorrelationIDStaticPrefix;
+import org.apache.cxf.hello_world_jms.HelloWorldServiceRuntimeCorrelationIDDynamicPrefix;
+import org.apache.cxf.hello_world_jms.HelloWorldServiceRuntimeCorrelationIDStaticPrefix;
+import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase;
+import org.apache.cxf.testutil.common.EmbeddedJMSBrokerLauncher;
+import org.apache.cxf.transport.jms.JMSConstants;
+import org.apache.cxf.transport.jms.JMSMessageHeadersType;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JMSSharedQueueTest extends AbstractBusClientServerTestBase {
+    private static final String BROKER_URI = "vm://SharedQueueTest" 
+        + "?jms.watchTopicAdvisories=false&broker.persistent=false";
+ 
+    private static EmbeddedJMSBrokerLauncher broker;
+    private String wsdlString;
+    
+    @BeforeClass
+    public static void startServers() throws Exception {
+        broker = new EmbeddedJMSBrokerLauncher(BROKER_URI);
+        launchServer(broker);
+        launchServer(new Server(broker));
+        createStaticBus();
+    }
+    
+    public URL getWSDLURL(String s) throws Exception {
+        URL u = getClass().getResource(s);
+        if (u == null) {
+            throw new IllegalArgumentException("WSDL classpath resource not found " + s);
+        }
+        wsdlString = u.toString().intern();
+        broker.updateWsdl(getBus(), wsdlString);
+        return u;
+    }
+
+    private interface CorrelationIDFactory {
+        String createCorrealtionID();
+    }
+    
+    private static class ClientRunnable implements Runnable {
+        private HelloWorldPortType port;
+        private CorrelationIDFactory corrFactory;
+        private String prefix;
+        private Throwable ex;
+
+        public ClientRunnable(HelloWorldPortType port) {
+            this.port = port;
+        }
+
+        public ClientRunnable(HelloWorldPortType port, String prefix) {
+            this.port = port;
+            this.prefix = prefix;
+        }
+
+        public ClientRunnable(HelloWorldPortType port, CorrelationIDFactory factory) {
+            this.port = port;
+            this.corrFactory = factory;
+        }
+        
+        public Throwable getException() {
+            return ex;
+        }
+        
+        public void run() {
+            try {
+                for (int idx = 0; idx < 5; idx++) {
+                    callGreetMe();
+                }
+            } catch (Throwable e) {
+                ex = e;
+            }
+        }
+
+        private void callGreetMe() {
+            BindingProvider  bp = (BindingProvider)port;
+            Map<String, Object> requestContext = bp.getRequestContext();
+            JMSMessageHeadersType requestHeader = new JMSMessageHeadersType();
+            requestContext.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, requestHeader);
+            String request = "World" + ((prefix != null) ? ":" + prefix : "");
+            String correlationID = null;
+            if (corrFactory != null) {
+                correlationID = corrFactory.createCorrealtionID();
+                requestHeader.setJMSCorrelationID(correlationID);
+                request +=  ":" + correlationID;
+            }
+            String expected = "Hello " + request;
+            String response = port.greetMe(request);
+            assertEquals("Response didn't match expected request", expected, response);
+            if (corrFactory != null) {
+                Map<String, Object> responseContext = bp.getResponseContext();
+                JMSMessageHeadersType responseHeader = 
+                    (JMSMessageHeadersType)responseContext.get(
+                            JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
+                assertEquals("Request and Response CorrelationID didn't match", 
+                              correlationID, responseHeader.getJMSCorrelationID());
+            }
+        }
+    }
+    
+    private void executeAsync(ClientRunnable... clients) throws Throwable {
+        executeAsync(Arrays.asList(clients));
+    }
+    
+    private void executeAsync(Collection<ClientRunnable> clients) throws Throwable {
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (ClientRunnable client : clients) {
+            executor.execute(client);
+        }
+        executor.shutdown();
+        for (ClientRunnable client : clients) {
+            if (client.getException() != null) {
+                throw client.getException();
+            }
+        }
+    }
+    
+    @Test
+    public void testTwoWayQueueAppCorrelationID() throws Throwable {
+        QName serviceName = new QName("http://cxf.apache.org/hello_world_jms", "HelloWorldServiceAppCorrelationID");
+        QName portNameEng = new QName("http://cxf.apache.org/hello_world_jms", "HelloWorldPortAppCorrelationIDEng");
+        QName portNameSales = 
+            new QName("http://cxf.apache.org/hello_world_jms", "HelloWorldPortAppCorrelationIDSales");
+
+        URL wsdl = getWSDLURL("/wsdl/jms_test.wsdl");
+        HelloWorldServiceAppCorrelationID service = 
+            new HelloWorldServiceAppCorrelationID(wsdl, serviceName);
+        assertNotNull(service);
+
+        ClientRunnable engClient = 
+            new ClientRunnable(service.getPort(portNameEng, HelloWorldPortType.class),
+                new CorrelationIDFactory() {
+                    private int counter;
+                    public String createCorrealtionID() {
+                        return "com.mycompany.eng:" + counter++;
+                    }
+                });
+        
+        ClientRunnable salesClient = 
+             new ClientRunnable(service.getPort(portNameSales, HelloWorldPortType.class),
+                new CorrelationIDFactory() {
+                    private int counter;
+                    public String createCorrealtionID() {
+                        return "com.mycompany.sales:" + counter++;
+                    }
+                });
+     
+        executeAsync(engClient, salesClient);
+    }
+    
+    @Test
+    public void testTwoWayQueueAppCorrelationIDStaticPrefix() throws Throwable {
+        QName serviceName = new QName("http://cxf.apache.org/hello_world_jms", 
+            "HelloWorldServiceAppCorrelationIDStaticPrefix");
+        QName portNameEng = new QName("http://cxf.apache.org/hello_world_jms", 
+            "HelloWorldPortAppCorrelationIDStaticPrefixEng");
+        QName portNameSales = new QName("http://cxf.apache.org/hello_world_jms", 
+            "HelloWorldPortAppCorrelationIDStaticPrefixSales");
+
+        URL wsdl = getWSDLURL("/wsdl/jms_test.wsdl");
+        HelloWorldServiceAppCorrelationIDStaticPrefix service = 
+            new HelloWorldServiceAppCorrelationIDStaticPrefix(wsdl, serviceName);
+
+        ClientRunnable engClient = 
+            new ClientRunnable(service.getPort(portNameEng, HelloWorldPortType.class));
+        
+        ClientRunnable salesClient = 
+             new ClientRunnable(service.getPort(portNameSales, HelloWorldPortType.class));
+
+        executeAsync(engClient, salesClient);
+    }
+
+    /* TO DO:
+     * This tests shows a missing QoS. When CXF clients share a named (persistent) reply queue
+     *  with an application provided correlationID there will be a guaranteed response
+     * message loss. 
+     * 
+     * A large number of threads is used to ensure message loss and avoid a false 
+     * positive assertion
+     */
+    @Test
+    public void testTwoWayQueueAppCorrelationIDNoPrefix() throws Throwable {
+        QName serviceName = new QName("http://cxf.apache.org/hello_world_jms", 
+            "HelloWorldServiceAppCorrelationIDNoPrefix");
+        QName portName = new QName("http://cxf.apache.org/hello_world_jms", 
+            "HelloWorldPortAppCorrelationIDNoPrefix");
+        URL wsdl = getWSDLURL("/wsdl/jms_test.wsdl");
+        HelloWorldServiceAppCorrelationIDNoPrefix service = 
+            new HelloWorldServiceAppCorrelationIDNoPrefix(wsdl, serviceName);
+        
+        HelloWorldPortType port = service.getPort(portName, HelloWorldPortType.class);
+
+        Collection<ClientRunnable> clients = new ArrayList<ClientRunnable>();
+        for (int i = 0; i < 1; ++i) {
+            clients.add(new ClientRunnable(port));            
+        }
+        executeAsync(clients);
+    }
+
+    /*
+     * This tests a use case where there is a shared request and reply queues between
+     * two servers (Eng and Sales). However each server has a design time provided selector
+     * which allows them to share the same queue and do not consume the other's
+     * messages. 
+     * 
+     * The clients to these two servers use the same request and reply queues.
+     * An Eng client uses a design time selector prefix to form request message 
+     * correlationID and to form a reply consumer that filters only reply
+     * messages originated from the Eng server. To differentiate between
+     * one Eng client instance from another this suffix is supplemented by
+     * a runtime value of ConduitId which has 1-1 relation to a client instance
+     * This guarantees that an Eng client instance will only consume its own reply 
+     * messages. 
+     * 
+     * In case of a single client instance being shared among multiple threads
+     * the third portion of the request message correlationID, 
+     * an atomic rolling message counter, ensures that each message gets a unique ID
+     *  
+     * So the model is:
+     * 
+     * Many concurrent Sales clients to a single request and reply queues (Q1, Q2) 
+     * to a single Sales server
+     * Many concurrent Eng clients to a single request and reply queues (Q1, Q2) 
+     * to a single Eng server
+     */
+    @Test
+    public void testTwoWayQueueRuntimeCorrelationIDStaticPrefix() throws Throwable {
+        QName serviceName = new QName("http://cxf.apache.org/hello_world_jms", 
+            "HelloWorldServiceRuntimeCorrelationIDStaticPrefix");
+        
+        QName portNameEng = new QName("http://cxf.apache.org/hello_world_jms", 
+            "HelloWorldPortRuntimeCorrelationIDStaticPrefixEng");
+        QName portNameSales = new QName("http://cxf.apache.org/hello_world_jms", 
+            "HelloWorldPortRuntimeCorrelationIDStaticPrefixSales");
+
+        URL wsdl = getWSDLURL("/wsdl/jms_test.wsdl");
+        HelloWorldServiceRuntimeCorrelationIDStaticPrefix service = 
+            new HelloWorldServiceRuntimeCorrelationIDStaticPrefix(wsdl, serviceName);
+        
+        HelloWorldPortType portEng = service.getPort(portNameEng, HelloWorldPortType.class);
+        HelloWorldPortType portSales = service.getPort(portNameSales, HelloWorldPortType.class);
+
+        Collection<ClientRunnable> clients = new ArrayList<ClientRunnable>();
+        for (int i = 0; i < 10; ++i) {
+            clients.add(new ClientRunnable(portEng, "com.mycompany.eng:"));
+            clients.add(new ClientRunnable(portSales, "com.mycompany.sales:"));
+        }
+        executeAsync(clients);
+    }
+
+
+
+    @Test
+    public void testTwoWayQueueRuntimeCorrelationDynamicPrefix() throws Throwable {
+        QName serviceName = new QName("http://cxf.apache.org/hello_world_jms", 
+            "HelloWorldServiceRuntimeCorrelationIDDynamicPrefix");
+        
+        QName portName = new QName("http://cxf.apache.org/hello_world_jms", 
+            "HelloWorldPortRuntimeCorrelationIDDynamicPrefix");
+        
+        URL wsdl = getWSDLURL("/wsdl/jms_test.wsdl");
+        HelloWorldServiceRuntimeCorrelationIDDynamicPrefix service = 
+            new HelloWorldServiceRuntimeCorrelationIDDynamicPrefix(wsdl, serviceName);
+        HelloWorldPortType port = service.getPort(portName, HelloWorldPortType.class);
+
+        Collection<ClientRunnable> clients = new ArrayList<ClientRunnable>();        
+        for (int i = 0; i < 10; ++i) {
+            clients.add(new ClientRunnable(port));
+        }
+        executeAsync(clients);
+    }
+    
+}

Propchange: cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSSharedQueueTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/swa/ClientServerSwaTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/swa/ClientServerSwaTest.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/swa/ClientServerSwaTest.java (original)
+++ cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/swa/ClientServerSwaTest.java Wed Feb  5 21:58:23 2014
@@ -67,7 +67,8 @@ public class ClientServerSwaTest extends
 
     @BeforeClass
     public static void startServers() throws Exception {
-        broker = new EmbeddedJMSBrokerLauncher("vm://ClientServerSwaTest?jms.watchTopicAdvisories=false");
+        broker = new EmbeddedJMSBrokerLauncher("vm://ClientServerSwaTest" 
+            + "?jms.watchTopicAdvisories=false&broker.persistent=false");
         System.setProperty("EmbeddedBrokerURL", broker.getBrokerURL());
         launchServer(broker);
         launchServer(new Server());

Modified: cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java (original)
+++ cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java Wed Feb  5 21:58:23 2014
@@ -37,13 +37,18 @@ import org.apache.hello_world_doc_lit.Pi
 import org.apache.hello_world_doc_lit.SOAPService2;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.springframework.context.support.ClassPathXmlApplicationContext;
 import org.springframework.jms.connection.JmsTransactionManager;
 
-
+/**
+ * Test transactions based on spring transactions.
+ * These will not be supported anymore in cxf >= 3
+ */
 public class JMSTransactionClientServerTest extends AbstractBusClientServerTestBase {
-    static EmbeddedJMSBrokerLauncher broker;
+    private static final String BROKER_URI = "vm://JMSTransactionClientServerTest?broker.persistent=false";
+    private static EmbeddedJMSBrokerLauncher broker;
 
     public static class Server extends AbstractBusTestServerBase {
         ClassPathXmlApplicationContext context;
@@ -80,7 +85,7 @@ public class JMSTransactionClientServerT
 
     @BeforeClass
     public static void startServers() throws Exception {
-        broker = new EmbeddedJMSBrokerLauncher("vm://JMSTransactionClientServerTest");
+        broker = new EmbeddedJMSBrokerLauncher(BROKER_URI);
         System.setProperty("EmbeddedBrokerURL", broker.getBrokerURL());
         launchServer(broker);
         launchServer(new Server());
@@ -100,6 +105,7 @@ public class JMSTransactionClientServerT
         return q;
     }
     
+    @Ignore
     @Test
     public void testDocBasicConnection() throws Exception {
         QName serviceName = getServiceName(new QName("http://apache.org/hello_world_doc_lit", 
@@ -115,6 +121,8 @@ public class JMSTransactionClientServerT
         Greeter greeter = service.getPort(portName, Greeter.class);
         doService(greeter, true);
     }
+    
+    @Ignore
     @Test
     public void testNonAopTransaction() throws Exception {
         JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();

Modified: cxf/trunk/testutils/src/main/java/org/apache/cxf/testutil/common/EmbeddedJMSBrokerLauncher.java
URL: http://svn.apache.org/viewvc/cxf/trunk/testutils/src/main/java/org/apache/cxf/testutil/common/EmbeddedJMSBrokerLauncher.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/testutils/src/main/java/org/apache/cxf/testutil/common/EmbeddedJMSBrokerLauncher.java (original)
+++ cxf/trunk/testutils/src/main/java/org/apache/cxf/testutil/common/EmbeddedJMSBrokerLauncher.java Wed Feb  5 21:58:23 2014
@@ -20,6 +20,7 @@ package org.apache.cxf.testutil.common;
 
 import java.io.File;
 import java.lang.reflect.Field;
+import java.net.URL;
 import java.util.List;
 import java.util.Map;
 
@@ -69,6 +70,11 @@ public class EmbeddedJMSBrokerLauncher e
         }
         return b.toString();
     }
+    
+    public void updateWsdl(Bus b, URL wsdlLocation) { 
+        updateWsdl(b, wsdlLocation.toString());
+    }
+    
     public void updateWsdl(Bus b, String wsdlLocation) { 
         updateWsdlExtensors(b, wsdlLocation, brokerUrl1, getEncodedBrokerURL());
     }