You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ve...@apache.org on 2008/08/23 18:31:01 UTC

svn commit: r688360 - in /synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport: jms/ mail/ nhttp/ testkit/tests/misc/ vfs/

Author: veithen
Date: Sat Aug 23 09:31:00 2008
New Revision: 688360

URL: http://svn.apache.org/viewvc?rev=688360&view=rev
Log:
Added a test case that checks the concurrency level provided by a transport listener. The test succeeds for JMS and HTTP but fails for the mail transport. This provides evidence for SYNAPSE-434.

Added:
    synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/
    synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/MinConcurrencyTest.java
Modified:
    synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSAsyncChannel.java
    synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSChannel.java
    synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSListenerTest.java
    synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSRequestResponseChannel.java
    synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/mail/MailTransportListenerTest.java
    synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListenerTest.java
    synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportListenerTest.java

Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSAsyncChannel.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSAsyncChannel.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSAsyncChannel.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSAsyncChannel.java Sat Aug 23 09:31:00 2008
@@ -22,6 +22,9 @@
 import org.apache.synapse.transport.testkit.listener.AsyncChannel;
 
 public class JMSAsyncChannel extends JMSChannel implements AsyncChannel {
+    public JMSAsyncChannel(String name, String destinationType) {
+        super(name, destinationType);
+    }
     public JMSAsyncChannel(String destinationType) {
         super(destinationType);
     }

Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSChannel.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSChannel.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSChannel.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSChannel.java Sat Aug 23 09:31:00 2008
@@ -34,19 +34,38 @@
 import org.apache.synapse.transport.testkit.name.Key;
 
 public abstract class JMSChannel extends AbstractChannel {
+    private final String name;
     private final String destinationType;
     protected JMSTestEnvironment env;
     private String destinationName;
     private Destination destination;
     
-    public JMSChannel(String destinationType) {
+    public JMSChannel(String name, String destinationType) {
+        this.name = name;
         this.destinationType = destinationType;
     }
     
+    public JMSChannel(String destinationType) {
+        this(null, destinationType);
+    }
+    
+    protected String buildDestinationName(String direction, String destinationType) {
+        StringBuilder destinationName = new StringBuilder();
+        if (name != null) {
+            destinationName.append(name);
+            destinationName.append(Character.toUpperCase(direction.charAt(0)));
+            destinationName.append(direction.substring(1));
+        } else {
+            destinationName.append(direction);
+        }
+        destinationName.append(destinationType == JMSConstants.DESTINATION_TYPE_QUEUE ? 'Q' : 'T');
+        return destinationName.toString();
+    }
+    
     @SuppressWarnings("unused")
     private void setUp(JMSTestEnvironment env) throws Exception {
         this.env = env;
-        destinationName = "request" + destinationType;
+        destinationName = buildDestinationName("request", destinationType);
         destination = env.createDestination(destinationType, destinationName);
         env.getContext().bind(destinationName, destination);
     }

Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSListenerTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSListenerTest.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSListenerTest.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSListenerTest.java Sat Aug 23 09:31:00 2008
@@ -33,6 +33,7 @@
 import org.apache.synapse.transport.testkit.client.AsyncTestClient;
 import org.apache.synapse.transport.testkit.client.axis2.AxisAsyncTestClient;
 import org.apache.synapse.transport.testkit.client.axis2.AxisRequestResponseTestClient;
+import org.apache.synapse.transport.testkit.listener.AsyncChannel;
 import org.apache.synapse.transport.testkit.listener.ContentTypeMode;
 import org.apache.synapse.transport.testkit.listener.MessageTestData;
 import org.apache.synapse.transport.testkit.message.MessageConverter;
@@ -40,6 +41,7 @@
 import org.apache.synapse.transport.testkit.server.axis2.AxisAsyncEndpointFactory;
 import org.apache.synapse.transport.testkit.server.axis2.AxisEchoEndpointFactory;
 import org.apache.synapse.transport.testkit.server.axis2.AxisServer;
+import org.apache.synapse.transport.testkit.tests.misc.MinConcurrencyTest;
 
 public class JMSListenerTest extends TestCase {
     public static TestSuite suite() {
@@ -85,6 +87,10 @@
                 suite.addBinaryTest(channel, bytesMessageClient, adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_BYTE), contentTypeMode, env, server, tdf);
             }
         }
+        suite.addTest(new MinConcurrencyTest(server, new AsyncChannel[] {
+                new JMSAsyncChannel("endpoint1", JMSConstants.DESTINATION_TYPE_QUEUE),
+                new JMSAsyncChannel("endpoint2", JMSConstants.DESTINATION_TYPE_QUEUE) },
+                2, false, env, tdf));
         return suite;
     }
 }

Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSRequestResponseChannel.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSRequestResponseChannel.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSRequestResponseChannel.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSRequestResponseChannel.java Sat Aug 23 09:31:00 2008
@@ -32,14 +32,18 @@
     private String replyDestinationName;
     private Destination replyDestination;
     
-    public JMSRequestResponseChannel(String destinationType, String replyDestinationType) {
-        super(destinationType);
+    public JMSRequestResponseChannel(String name, String destinationType, String replyDestinationType) {
+        super(name, destinationType);
         this.replyDestinationType = replyDestinationType;
     }
     
+    public JMSRequestResponseChannel(String destinationType, String replyDestinationType) {
+        this(null, destinationType, replyDestinationType);
+    }
+    
     @SuppressWarnings("unused")
     private void setUp(JMSTestEnvironment env) throws Exception {
-        replyDestinationName = "response" + replyDestinationType;
+        replyDestinationName = buildDestinationName("response", replyDestinationType);
         replyDestination = env.createDestination(replyDestinationType, replyDestinationName);
         env.getContext().bind(replyDestinationName, replyDestination);
     }

Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/mail/MailTransportListenerTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/mail/MailTransportListenerTest.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/mail/MailTransportListenerTest.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/mail/MailTransportListenerTest.java Sat Aug 23 09:31:00 2008
@@ -37,6 +37,7 @@
 import org.apache.synapse.transport.testkit.server.axis2.AxisAsyncEndpointFactory;
 import org.apache.synapse.transport.testkit.server.axis2.AxisEchoEndpointFactory;
 import org.apache.synapse.transport.testkit.server.axis2.AxisServer;
+import org.apache.synapse.transport.testkit.tests.misc.MinConcurrencyTest;
 
 public class MailTransportListenerTest extends TestCase {
     public static TestSuite suite() throws Exception {
@@ -50,6 +51,8 @@
         suite.addExclude("(test=AsyncSwA)");
         suite.addExclude("(test=AsyncBinary)");
         suite.addExclude("(&(test=AsyncTextPlain)(!(data=ASCII)))");
+        // SYNAPSE-434
+        suite.addExclude("(test=MinConcurrency)");
         
         MailTestEnvironment env = new GreenMailTestEnvironment();
         
@@ -73,7 +76,7 @@
         suite.addPOXTests(channel, adapt(axisClient, MessageConverter.XML_TO_AXIS), asyncEndpointFactory, ContentTypeMode.TRANSPORT, env, axisServer);
         suite.addTextPlainTests(channel, adapt(axisClient, MessageConverter.TEXT_WRAPPER), AdapterUtils.adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_STRING), ContentTypeMode.TRANSPORT, env, axisServer);
         suite.addBinaryTest(channel, adapt(axisClient, MessageConverter.BINARY_WRAPPER), adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_BYTE), ContentTypeMode.TRANSPORT, env, axisServer);
-//        suite.addTest(new MinConcurrencyTest(axisServer, new MailChannel[] { new MailChannel(), new MailChannel() }, 2, env));
+        suite.addTest(new MinConcurrencyTest(axisServer, new MailChannel[] { new MailChannel(), new MailChannel() }, 2, true, env));
         return suite;
     }
 }

Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListenerTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListenerTest.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListenerTest.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListenerTest.java Sat Aug 23 09:31:00 2008
@@ -37,11 +37,13 @@
 import org.apache.synapse.transport.testkit.TransportTestSuite;
 import org.apache.synapse.transport.testkit.client.AsyncTestClient;
 import org.apache.synapse.transport.testkit.client.axis2.AxisAsyncTestClient;
+import org.apache.synapse.transport.testkit.listener.AsyncChannel;
 import org.apache.synapse.transport.testkit.listener.ContentTypeMode;
 import org.apache.synapse.transport.testkit.message.MessageConverter;
 import org.apache.synapse.transport.testkit.message.XMLMessage;
 import org.apache.synapse.transport.testkit.server.axis2.AxisAsyncEndpointFactory;
 import org.apache.synapse.transport.testkit.server.axis2.AxisServer;
+import org.apache.synapse.transport.testkit.tests.misc.MinConcurrencyTest;
 
 public class HttpCoreNIOListenerTest extends TestCase {
     public static TestSuite suite() {
@@ -84,6 +86,7 @@
         suite.addTextPlainTests(channel, adapt(javaNetClient, MessageConverter.STRING_TO_BYTE), adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_STRING), ContentTypeMode.TRANSPORT, env, axisServer, tdf);
         suite.addBinaryTest(channel, javaNetClient, adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_BYTE), ContentTypeMode.TRANSPORT, env, axisServer, tdf);
         suite.addRESTTests(channel, new JavaNetRESTClient(), asyncEndpointFactory, env, axisServer, tdf);
+        suite.addTest(new MinConcurrencyTest(axisServer, new AsyncChannel[] { new HttpChannel(), new HttpChannel() }, 2, false, env, tdf));
         return suite;
     }
 }

Added: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/MinConcurrencyTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/MinConcurrencyTest.java?rev=688360&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/MinConcurrencyTest.java (added)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/MinConcurrencyTest.java Sat Aug 23 09:31:00 2008
@@ -0,0 +1,134 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.transport.testkit.tests.misc;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.synapse.transport.testkit.client.ClientOptions;
+import org.apache.synapse.transport.testkit.client.axis2.AxisAsyncTestClient;
+import org.apache.synapse.transport.testkit.listener.AsyncChannel;
+import org.apache.synapse.transport.testkit.message.AxisMessage;
+import org.apache.synapse.transport.testkit.name.Name;
+import org.apache.synapse.transport.testkit.server.Endpoint;
+import org.apache.synapse.transport.testkit.server.axis2.AxisServer;
+import org.apache.synapse.transport.testkit.tests.TestResourceSet;
+import org.apache.synapse.transport.testkit.tests.TransportTestCase;
+
+/**
+ * Generic test case to check whether a transport listener processes messages with the expected
+ * level of concurrency. This test case is used to verify that the listener is able to
+ * process messages simultaneously.
+ * <p>
+ * The test case deploys a given number of services and sends a configurable number of messages
+ * to each of these services. The services are configured with a custom message receiver that
+ * blocks until the expected level of concurrency (given by the number of endpoints times the
+ * number of messages) is reached. If after some timeout the concurrency level is not reached,
+ * the test fails.
+ */
+@Name("MinConcurrency")
+public class MinConcurrencyTest extends TransportTestCase {
+    private final AxisServer server;
+    private final AsyncChannel[] channels;
+    private final int messages;
+    private final boolean preloadMessages;
+    
+    public MinConcurrencyTest(AxisServer server, AsyncChannel[] channels, int messages,
+            boolean preloadMessages, Object... resources) {
+        super(resources);
+        this.server = server;
+        addResource(server);
+        this.channels = channels;
+        this.messages = messages;
+        this.preloadMessages = preloadMessages;
+    }
+    
+    @Override
+    protected void runTest() throws Throwable {
+        int endpointCount = channels.length;
+        int expectedConcurrency = endpointCount * messages;
+        
+        final CountDownLatch shutdownLatch = new CountDownLatch(1);
+        final CountDownLatch concurrencyReachedLatch = new CountDownLatch(expectedConcurrency);
+        
+        MessageReceiver messageReceiver = new MessageReceiver() {
+            public void receive(MessageContext msgContext) throws AxisFault {
+                concurrencyReachedLatch.countDown();
+                try {
+                    shutdownLatch.await();
+                } catch (InterruptedException ex) {
+                }
+            }
+        };
+        
+        TestResourceSet[] resourceSets = new TestResourceSet[endpointCount];
+        Endpoint[] endpoints = new Endpoint[endpointCount];
+        try {
+            for (int i=0; i<endpointCount; i++) {
+                TestResourceSet resources = new TestResourceSet(getResourceSet());
+                AsyncChannel channel = channels[i];
+                resources.addResource(channel);
+                AxisAsyncTestClient client = new AxisAsyncTestClient();
+                resources.addResource(client);
+                resources.setUp();
+                resourceSets[i] = resources;
+                if (!preloadMessages) {
+                    // TODO: we need to support transports that use static Content-Type
+                    endpoints[i] = server.createAsyncEndpoint(channel, messageReceiver, null);
+                }
+                for (int j=0; j<messages; j++) {
+                    ClientOptions options = new ClientOptions("UTF-8");
+                    AxisMessage message = new AxisMessage();
+                    message.setMessageType(SOAP11Constants.SOAP_11_CONTENT_TYPE);
+                    SOAPFactory factory = OMAbstractFactory.getSOAP11Factory();
+                    SOAPEnvelope envelope = factory.getDefaultEnvelope();
+                    message.setEnvelope(envelope);
+                    client.sendMessage(options, message);
+                }
+                if (preloadMessages) {
+                    endpoints[i] = server.createAsyncEndpoint(channel, messageReceiver, null);
+                }
+            }
+        
+            if (!concurrencyReachedLatch.await(5, TimeUnit.SECONDS)) {
+                fail("Concurrency reached is " + (expectedConcurrency -
+                        concurrencyReachedLatch.getCount()) + ", but expected " +
+                        expectedConcurrency);
+            }
+        } finally {
+            shutdownLatch.countDown();
+            for (int i=0; i<endpointCount; i++) {
+                if (endpoints[i] != null) {
+                    endpoints[i].remove();
+                }
+                if (resourceSets[i] != null) {
+                    resourceSets[i].tearDown();
+                }
+            }
+        }
+    }
+}

Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportListenerTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportListenerTest.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportListenerTest.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportListenerTest.java Sat Aug 23 09:31:00 2008
@@ -64,6 +64,7 @@
         }
         suite.addTextPlainTests(channel, adapt(vfsClient, MessageConverter.STRING_TO_BYTE), adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_STRING), ContentTypeMode.SERVICE, env, server, tdf);
         suite.addBinaryTest(channel, vfsClient, adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_BYTE), ContentTypeMode.SERVICE, env, server, tdf);
+//        suite.addTest(new MinConcurrencyTest(server, new AsyncChannel[] { new VFSFileChannel("req/in1"), new VFSFileChannel("req/in2") }, 1, true, env, tdf));
         return suite;
     }
 }