You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by ve...@apache.org on 2008/08/23 18:31:01 UTC
svn commit: r688360 - in
/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport:
jms/ mail/ nhttp/ testkit/tests/misc/ vfs/
Author: veithen
Date: Sat Aug 23 09:31:00 2008
New Revision: 688360
URL: http://svn.apache.org/viewvc?rev=688360&view=rev
Log:
Added a test case that checks the concurrency level provided by a transport listener. The test succeeds for JMS and HTTP but fails for the mail transport. This provides evidence for SYNAPSE-434.
Added:
synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/
synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/MinConcurrencyTest.java
Modified:
synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSAsyncChannel.java
synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSChannel.java
synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSListenerTest.java
synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSRequestResponseChannel.java
synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/mail/MailTransportListenerTest.java
synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListenerTest.java
synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportListenerTest.java
Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSAsyncChannel.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSAsyncChannel.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSAsyncChannel.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSAsyncChannel.java Sat Aug 23 09:31:00 2008
@@ -22,6 +22,9 @@
import org.apache.synapse.transport.testkit.listener.AsyncChannel;
public class JMSAsyncChannel extends JMSChannel implements AsyncChannel {
+ public JMSAsyncChannel(String name, String destinationType) {
+ super(name, destinationType);
+ }
public JMSAsyncChannel(String destinationType) {
super(destinationType);
}
Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSChannel.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSChannel.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSChannel.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSChannel.java Sat Aug 23 09:31:00 2008
@@ -34,19 +34,38 @@
import org.apache.synapse.transport.testkit.name.Key;
public abstract class JMSChannel extends AbstractChannel {
+ private final String name;
private final String destinationType;
protected JMSTestEnvironment env;
private String destinationName;
private Destination destination;
- public JMSChannel(String destinationType) {
+ public JMSChannel(String name, String destinationType) {
+ this.name = name;
this.destinationType = destinationType;
}
+ public JMSChannel(String destinationType) {
+ this(null, destinationType);
+ }
+
+ protected String buildDestinationName(String direction, String destinationType) {
+ StringBuilder destinationName = new StringBuilder();
+ if (name != null) {
+ destinationName.append(name);
+ destinationName.append(Character.toUpperCase(direction.charAt(0)));
+ destinationName.append(direction.substring(1));
+ } else {
+ destinationName.append(direction);
+ }
+ destinationName.append(destinationType == JMSConstants.DESTINATION_TYPE_QUEUE ? 'Q' : 'T');
+ return destinationName.toString();
+ }
+
@SuppressWarnings("unused")
private void setUp(JMSTestEnvironment env) throws Exception {
this.env = env;
- destinationName = "request" + destinationType;
+ destinationName = buildDestinationName("request", destinationType);
destination = env.createDestination(destinationType, destinationName);
env.getContext().bind(destinationName, destination);
}
Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSListenerTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSListenerTest.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSListenerTest.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSListenerTest.java Sat Aug 23 09:31:00 2008
@@ -33,6 +33,7 @@
import org.apache.synapse.transport.testkit.client.AsyncTestClient;
import org.apache.synapse.transport.testkit.client.axis2.AxisAsyncTestClient;
import org.apache.synapse.transport.testkit.client.axis2.AxisRequestResponseTestClient;
+import org.apache.synapse.transport.testkit.listener.AsyncChannel;
import org.apache.synapse.transport.testkit.listener.ContentTypeMode;
import org.apache.synapse.transport.testkit.listener.MessageTestData;
import org.apache.synapse.transport.testkit.message.MessageConverter;
@@ -40,6 +41,7 @@
import org.apache.synapse.transport.testkit.server.axis2.AxisAsyncEndpointFactory;
import org.apache.synapse.transport.testkit.server.axis2.AxisEchoEndpointFactory;
import org.apache.synapse.transport.testkit.server.axis2.AxisServer;
+import org.apache.synapse.transport.testkit.tests.misc.MinConcurrencyTest;
public class JMSListenerTest extends TestCase {
public static TestSuite suite() {
@@ -85,6 +87,10 @@
suite.addBinaryTest(channel, bytesMessageClient, adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_BYTE), contentTypeMode, env, server, tdf);
}
}
+ suite.addTest(new MinConcurrencyTest(server, new AsyncChannel[] {
+ new JMSAsyncChannel("endpoint1", JMSConstants.DESTINATION_TYPE_QUEUE),
+ new JMSAsyncChannel("endpoint2", JMSConstants.DESTINATION_TYPE_QUEUE) },
+ 2, false, env, tdf));
return suite;
}
}
Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSRequestResponseChannel.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSRequestResponseChannel.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSRequestResponseChannel.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/jms/JMSRequestResponseChannel.java Sat Aug 23 09:31:00 2008
@@ -32,14 +32,18 @@
private String replyDestinationName;
private Destination replyDestination;
- public JMSRequestResponseChannel(String destinationType, String replyDestinationType) {
- super(destinationType);
+ public JMSRequestResponseChannel(String name, String destinationType, String replyDestinationType) {
+ super(name, destinationType);
this.replyDestinationType = replyDestinationType;
}
+ public JMSRequestResponseChannel(String destinationType, String replyDestinationType) {
+ this(null, destinationType, replyDestinationType);
+ }
+
@SuppressWarnings("unused")
private void setUp(JMSTestEnvironment env) throws Exception {
- replyDestinationName = "response" + replyDestinationType;
+ replyDestinationName = buildDestinationName("response", replyDestinationType);
replyDestination = env.createDestination(replyDestinationType, replyDestinationName);
env.getContext().bind(replyDestinationName, replyDestination);
}
Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/mail/MailTransportListenerTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/mail/MailTransportListenerTest.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/mail/MailTransportListenerTest.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/mail/MailTransportListenerTest.java Sat Aug 23 09:31:00 2008
@@ -37,6 +37,7 @@
import org.apache.synapse.transport.testkit.server.axis2.AxisAsyncEndpointFactory;
import org.apache.synapse.transport.testkit.server.axis2.AxisEchoEndpointFactory;
import org.apache.synapse.transport.testkit.server.axis2.AxisServer;
+import org.apache.synapse.transport.testkit.tests.misc.MinConcurrencyTest;
public class MailTransportListenerTest extends TestCase {
public static TestSuite suite() throws Exception {
@@ -50,6 +51,8 @@
suite.addExclude("(test=AsyncSwA)");
suite.addExclude("(test=AsyncBinary)");
suite.addExclude("(&(test=AsyncTextPlain)(!(data=ASCII)))");
+ // SYNAPSE-434
+ suite.addExclude("(test=MinConcurrency)");
MailTestEnvironment env = new GreenMailTestEnvironment();
@@ -73,7 +76,7 @@
suite.addPOXTests(channel, adapt(axisClient, MessageConverter.XML_TO_AXIS), asyncEndpointFactory, ContentTypeMode.TRANSPORT, env, axisServer);
suite.addTextPlainTests(channel, adapt(axisClient, MessageConverter.TEXT_WRAPPER), AdapterUtils.adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_STRING), ContentTypeMode.TRANSPORT, env, axisServer);
suite.addBinaryTest(channel, adapt(axisClient, MessageConverter.BINARY_WRAPPER), adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_BYTE), ContentTypeMode.TRANSPORT, env, axisServer);
-// suite.addTest(new MinConcurrencyTest(axisServer, new MailChannel[] { new MailChannel(), new MailChannel() }, 2, env));
+ suite.addTest(new MinConcurrencyTest(axisServer, new MailChannel[] { new MailChannel(), new MailChannel() }, 2, true, env));
return suite;
}
}
Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListenerTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListenerTest.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListenerTest.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/nhttp/HttpCoreNIOListenerTest.java Sat Aug 23 09:31:00 2008
@@ -37,11 +37,13 @@
import org.apache.synapse.transport.testkit.TransportTestSuite;
import org.apache.synapse.transport.testkit.client.AsyncTestClient;
import org.apache.synapse.transport.testkit.client.axis2.AxisAsyncTestClient;
+import org.apache.synapse.transport.testkit.listener.AsyncChannel;
import org.apache.synapse.transport.testkit.listener.ContentTypeMode;
import org.apache.synapse.transport.testkit.message.MessageConverter;
import org.apache.synapse.transport.testkit.message.XMLMessage;
import org.apache.synapse.transport.testkit.server.axis2.AxisAsyncEndpointFactory;
import org.apache.synapse.transport.testkit.server.axis2.AxisServer;
+import org.apache.synapse.transport.testkit.tests.misc.MinConcurrencyTest;
public class HttpCoreNIOListenerTest extends TestCase {
public static TestSuite suite() {
@@ -84,6 +86,7 @@
suite.addTextPlainTests(channel, adapt(javaNetClient, MessageConverter.STRING_TO_BYTE), adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_STRING), ContentTypeMode.TRANSPORT, env, axisServer, tdf);
suite.addBinaryTest(channel, javaNetClient, adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_BYTE), ContentTypeMode.TRANSPORT, env, axisServer, tdf);
suite.addRESTTests(channel, new JavaNetRESTClient(), asyncEndpointFactory, env, axisServer, tdf);
+ suite.addTest(new MinConcurrencyTest(axisServer, new AsyncChannel[] { new HttpChannel(), new HttpChannel() }, 2, false, env, tdf));
return suite;
}
}
Added: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/MinConcurrencyTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/MinConcurrencyTest.java?rev=688360&view=auto
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/MinConcurrencyTest.java (added)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/testkit/tests/misc/MinConcurrencyTest.java Sat Aug 23 09:31:00 2008
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.transport.testkit.tests.misc;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.soap.SOAP11Constants;
+import org.apache.axiom.soap.SOAPEnvelope;
+import org.apache.axiom.soap.SOAPFactory;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.engine.MessageReceiver;
+import org.apache.synapse.transport.testkit.client.ClientOptions;
+import org.apache.synapse.transport.testkit.client.axis2.AxisAsyncTestClient;
+import org.apache.synapse.transport.testkit.listener.AsyncChannel;
+import org.apache.synapse.transport.testkit.message.AxisMessage;
+import org.apache.synapse.transport.testkit.name.Name;
+import org.apache.synapse.transport.testkit.server.Endpoint;
+import org.apache.synapse.transport.testkit.server.axis2.AxisServer;
+import org.apache.synapse.transport.testkit.tests.TestResourceSet;
+import org.apache.synapse.transport.testkit.tests.TransportTestCase;
+
+/**
+ * Generic test case to check whether a transport listener processes messages with the expected
+ * level of concurrency. This test case is used to verify that the listener is able to
+ * process messages simultaneously.
+ * <p>
+ * The test case deploys a given number of services and sends a configurable number of messages
+ * to each of these services. The services are configured with a custom message receiver that
+ * blocks until the expected level of concurrency (given by the number of endpoints times the
+ * number of messages) is reached. If after some timeout the concurrency level is not reached,
+ * the test fails.
+ */
+@Name("MinConcurrency")
+public class MinConcurrencyTest extends TransportTestCase {
+ private final AxisServer server;
+ private final AsyncChannel[] channels;
+ private final int messages;
+ private final boolean preloadMessages;
+
+ public MinConcurrencyTest(AxisServer server, AsyncChannel[] channels, int messages,
+ boolean preloadMessages, Object... resources) {
+ super(resources);
+ this.server = server;
+ addResource(server);
+ this.channels = channels;
+ this.messages = messages;
+ this.preloadMessages = preloadMessages;
+ }
+
+ @Override
+ protected void runTest() throws Throwable {
+ int endpointCount = channels.length;
+ int expectedConcurrency = endpointCount * messages;
+
+ final CountDownLatch shutdownLatch = new CountDownLatch(1);
+ final CountDownLatch concurrencyReachedLatch = new CountDownLatch(expectedConcurrency);
+
+ MessageReceiver messageReceiver = new MessageReceiver() {
+ public void receive(MessageContext msgContext) throws AxisFault {
+ concurrencyReachedLatch.countDown();
+ try {
+ shutdownLatch.await();
+ } catch (InterruptedException ex) {
+ }
+ }
+ };
+
+ TestResourceSet[] resourceSets = new TestResourceSet[endpointCount];
+ Endpoint[] endpoints = new Endpoint[endpointCount];
+ try {
+ for (int i=0; i<endpointCount; i++) {
+ TestResourceSet resources = new TestResourceSet(getResourceSet());
+ AsyncChannel channel = channels[i];
+ resources.addResource(channel);
+ AxisAsyncTestClient client = new AxisAsyncTestClient();
+ resources.addResource(client);
+ resources.setUp();
+ resourceSets[i] = resources;
+ if (!preloadMessages) {
+ // TODO: we need to support transports that use static Content-Type
+ endpoints[i] = server.createAsyncEndpoint(channel, messageReceiver, null);
+ }
+ for (int j=0; j<messages; j++) {
+ ClientOptions options = new ClientOptions("UTF-8");
+ AxisMessage message = new AxisMessage();
+ message.setMessageType(SOAP11Constants.SOAP_11_CONTENT_TYPE);
+ SOAPFactory factory = OMAbstractFactory.getSOAP11Factory();
+ SOAPEnvelope envelope = factory.getDefaultEnvelope();
+ message.setEnvelope(envelope);
+ client.sendMessage(options, message);
+ }
+ if (preloadMessages) {
+ endpoints[i] = server.createAsyncEndpoint(channel, messageReceiver, null);
+ }
+ }
+
+ if (!concurrencyReachedLatch.await(5, TimeUnit.SECONDS)) {
+ fail("Concurrency reached is " + (expectedConcurrency -
+ concurrencyReachedLatch.getCount()) + ", but expected " +
+ expectedConcurrency);
+ }
+ } finally {
+ shutdownLatch.countDown();
+ for (int i=0; i<endpointCount; i++) {
+ if (endpoints[i] != null) {
+ endpoints[i].remove();
+ }
+ if (resourceSets[i] != null) {
+ resourceSets[i].tearDown();
+ }
+ }
+ }
+ }
+}
Modified: synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportListenerTest.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportListenerTest.java?rev=688360&r1=688359&r2=688360&view=diff
==============================================================================
--- synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportListenerTest.java (original)
+++ synapse/trunk/java/modules/transports/src/test/java/org/apache/synapse/transport/vfs/VFSTransportListenerTest.java Sat Aug 23 09:31:00 2008
@@ -64,6 +64,7 @@
}
suite.addTextPlainTests(channel, adapt(vfsClient, MessageConverter.STRING_TO_BYTE), adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_STRING), ContentTypeMode.SERVICE, env, server, tdf);
suite.addBinaryTest(channel, vfsClient, adapt(asyncEndpointFactory, MessageConverter.AXIS_TO_BYTE), ContentTypeMode.SERVICE, env, server, tdf);
+// suite.addTest(new MinConcurrencyTest(server, new AsyncChannel[] { new VFSFileChannel("req/in1"), new VFSFileChannel("req/in2") }, 1, true, env, tdf));
return suite;
}
}