You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by bu...@apache.org on 2019/01/04 17:55:00 UTC
[cxf] branch master updated: cxf-rt-transports-jms: improve test
stability
This is an automated email from the ASF dual-hosted git repository.
buhhunyx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git
The following commit(s) were added to refs/heads/master by this push:
new 7225b3a cxf-rt-transports-jms: improve test stability
7225b3a is described below
commit 7225b3a8c2a52218d2c2efdd0daa1556ffbf321b
Author: amarkevich <am...@talend.com>
AuthorDate: Thu Dec 27 16:37:58 2018 +0300
cxf-rt-transports-jms: improve test stability
---
.../cxf/transport/jms/AbstractJMSTester.java | 164 +++++++++++++--------
.../apache/cxf/transport/jms/JMSConduitTest.java | 11 +-
.../cxf/transport/jms/JMSConfigFactoryTest.java | 4 +-
.../cxf/transport/jms/JMSDestinationTest.java | 131 ++++------------
.../MessageIdAsCorrelationIdJMSConduitTest.java | 31 ++--
.../jms/PooledConnectionTempQueueTest.java | 29 ++--
.../cxf/transport/jms/RequestResponseTest.java | 54 +------
.../jms/uri/URIConfiguredConduitTest.java | 18 +--
.../transport/jms/util/MessageListenerTest.java | 40 ++---
9 files changed, 190 insertions(+), 292 deletions(-)
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
index 9340ef9..37acc66 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
@@ -18,16 +18,16 @@
*/
package org.apache.cxf.transport.jms;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
-import java.io.StringReader;
import java.io.Writer;
import java.net.URL;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
import javax.xml.namespace.QName;
import org.apache.activemq.ActiveMQConnectionFactory;
@@ -39,36 +39,35 @@ import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
+import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.service.Service;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.testutil.common.TestUtil;
import org.apache.cxf.transport.Conduit;
import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl11.WSDLServiceFactory;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public abstract class AbstractJMSTester {
protected static final String WSDL = "/jms_test.wsdl";
protected static final String SERVICE_NS = "http://cxf.apache.org/hello_world_jms";
protected static final int MAX_RECEIVE_TIME = 10;
- protected static final String MESSAGE_CONTENT = "HelloWorld";
protected static Bus bus;
protected static ActiveMQConnectionFactory cf1;
protected static ConnectionFactory cf;
protected static BrokerService broker;
+ private static final String MESSAGE_CONTENT = "HelloWorld";
protected enum ExchangePattern { oneway, requestReply };
- protected EndpointReferenceType target;
- protected Message inMessage;
- protected Message destMessage;
+ private final AtomicReference<Message> inMessage = new AtomicReference<>();
+ private final AtomicReference<Message> destMessage = new AtomicReference<>();
@BeforeClass
public static void startSerices() throws Exception {
@@ -92,12 +91,12 @@ public abstract class AbstractJMSTester {
broker.stop();
}
- protected EndpointInfo setupServiceInfo(String serviceName, String portName) {
+ protected static EndpointInfo setupServiceInfo(String serviceName, String portName) {
return setupServiceInfo(SERVICE_NS, WSDL, serviceName, portName);
}
- protected EndpointInfo setupServiceInfo(String ns, String wsdl, String serviceName, String portName) {
- URL wsdlUrl = getClass().getResource(wsdl);
+ protected static EndpointInfo setupServiceInfo(String ns, String wsdl, String serviceName, String portName) {
+ URL wsdlUrl = AbstractJMSTester.class.getResource(wsdl);
if (wsdlUrl == null) {
throw new IllegalArgumentException("Wsdl file not found on class path " + wsdl);
}
@@ -113,64 +112,72 @@ public abstract class AbstractJMSTester {
protected MessageObserver createMessageObserver() {
return new MessageObserver() {
public void onMessage(Message m) {
- Exchange exchange = new ExchangeImpl();
- exchange.setInMessage(m);
- m.setExchange(exchange);
- destMessage = m;
+// Exchange exchange = new ExchangeImpl();
+// exchange.setInMessage(m);
+// m.setExchange(exchange);
+ destMessage.set(m);
+ synchronized (destMessage) {
+ destMessage.notifyAll();
+ }
}
};
}
- protected void sendMessageAsync(Conduit conduit, Message message) throws IOException {
+ protected static void sendMessageAsync(Conduit conduit, Message message) throws IOException {
sendoutMessage(conduit, message, false, false);
}
- protected void sendMessageSync(Conduit conduit, Message message) throws IOException {
+ protected static void sendMessageSync(Conduit conduit, Message message) throws IOException {
sendoutMessage(conduit, message, false, true);
}
- protected void sendMessage(Conduit conduit, Message message, boolean synchronous) throws IOException {
+ protected static void sendMessage(Conduit conduit, Message message, boolean synchronous) throws IOException {
sendoutMessage(conduit, message, false, synchronous);
}
- protected void sendOneWayMessage(Conduit conduit, Message message) throws IOException {
+ protected static void sendOneWayMessage(Conduit conduit, Message message) throws IOException {
sendoutMessage(conduit, message, true, true);
}
- private void sendoutMessage(Conduit conduit,
+ private static void sendoutMessage(Conduit conduit,
Message message,
boolean isOneWay,
boolean synchronous) throws IOException {
-
- Exchange exchange = new ExchangeImpl();
+ final Exchange exchange = new ExchangeImpl();
exchange.setOneWay(isOneWay);
exchange.setSynchronous(synchronous);
message.setExchange(exchange);
exchange.setOutMessage(message);
conduit.prepare(message);
- OutputStream os = message.getContent(OutputStream.class);
- Writer writer = message.getContent(Writer.class);
- assertTrue("The OutputStream and Writer should not both be null ", os != null || writer != null);
- if (os != null) {
- os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding
- os.close();
- } else {
- writer.write(MESSAGE_CONTENT);
- writer.close();
+ try (OutputStream os = message.getContent(OutputStream.class)) {
+ if (os != null) {
+ os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding
+ return;
+ }
+ }
+ try (Writer writer = message.getContent(Writer.class)) {
+ if (writer != null) {
+ writer.write(MESSAGE_CONTENT);
+ return;
+ }
}
+ fail("The OutputStream and Writer should not both be null");
}
- protected JMSConduit setupJMSConduit(EndpointInfo ei) throws IOException {
+ protected static JMSConduit setupJMSConduit(EndpointInfo ei) throws IOException {
JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
jmsConfig.setConnectionFactory(cf);
- return new JMSConduit(target, jmsConfig, bus);
+ return new JMSConduit(null, jmsConfig, bus);
}
protected JMSConduit setupJMSConduitWithObserver(EndpointInfo ei) throws IOException {
JMSConduit jmsConduit = setupJMSConduit(ei);
MessageObserver observer = new MessageObserver() {
public void onMessage(Message m) {
- inMessage = m;
+ inMessage.set(m);
+ synchronized (inMessage) {
+ inMessage.notifyAll();
+ }
}
};
jmsConduit.setMessageObserver(observer);
@@ -183,58 +190,85 @@ public abstract class AbstractJMSTester {
return new JMSDestination(bus, ei, jmsConfig);
}
- protected String getContent(Message message) {
- ByteArrayInputStream bis = (ByteArrayInputStream)message.getContent(InputStream.class);
+ protected static Message createMessage() {
+ return createMessage(null);
+ }
+
+ protected static Message createMessage(String correlationId) {
+ Message outMessage = new MessageImpl();
+ JMSMessageHeadersType header = new JMSMessageHeadersType();
+ header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
+ header.setJMSPriority(1);
+ header.setTimeToLive(1000L);
+ outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
+ outMessage.put(Message.ENCODING, "US-ASCII");
+ return outMessage;
+ }
+
+ protected static void verifyReceivedMessage(Message message) {
String response = "<not found>";
+ InputStream bis = message.getContent(InputStream.class);
if (bis != null) {
- byte[] bytes = new byte[bis.available()];
try {
+ byte[] bytes = new byte[bis.available()];
bis.read(bytes);
+ response = IOUtils.newStringFromBytes(bytes);
} catch (IOException ex) {
- assertFalse("Read the Destination recieved Message error ", false);
- ex.printStackTrace();
+ fail("Read the Destination recieved Message error: " + ex.getMessage());
}
- response = IOUtils.newStringFromBytes(bytes);
} else {
- StringReader reader = (StringReader)message.getContent(Reader.class);
+ Reader reader = message.getContent(Reader.class);
char[] buffer = new char[5000];
try {
int i = reader.read(buffer);
response = new String(buffer, 0, i);
} catch (IOException e) {
- assertFalse("Read the Destination recieved Message error ", false);
- e.printStackTrace();
+ fail("Read the Destination recieved Message error: " + e.getMessage());
}
}
- return response;
+ assertEquals("The response content should be equal", MESSAGE_CONTENT, response);
}
- protected void waitForReceiveInMessage() {
- int waitTime = 0;
- while (inMessage == null && waitTime < MAX_RECEIVE_TIME * 10) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // do nothing here
+ protected static void verifyHeaders(Message msgIn, Message msgOut) {
+ JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
+ .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+
+ JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
+ .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+
+ verifyJmsHeaderEquality(outHeader, inHeader);
+
+ }
+
+ protected static void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) {
+ assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader
+ .getJMSPriority(), inHeader.getJMSPriority());
+ assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader
+ .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode());
+ assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader
+ .getJMSType(), inHeader.getJMSType());
+ }
+
+
+ protected Message waitForReceiveInMessage() throws InterruptedException {
+ if (null == inMessage.get()) {
+ synchronized (inMessage) {
+ inMessage.wait(MAX_RECEIVE_TIME * 1000L);
}
- waitTime++;
+ assertNotNull("Can't receive the Conduit Message in " + MAX_RECEIVE_TIME + " seconds", inMessage.get());
}
- assertTrue("Can't receive the Conduit Message in " + MAX_RECEIVE_TIME + " seconds",
- inMessage != null);
+ return inMessage.getAndSet(null);
}
- protected void waitForReceiveDestMessage() {
- int waitTime = 0;
- while (destMessage == null && waitTime < MAX_RECEIVE_TIME * 10) {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- // do nothing here
+ protected Message waitForReceiveDestMessage() throws InterruptedException {
+ if (null == destMessage.get()) {
+ synchronized (destMessage) {
+ destMessage.wait(MAX_RECEIVE_TIME * 1000L);
}
- waitTime++;
+ assertNotNull("Can't receive the Destination message in " + MAX_RECEIVE_TIME + " seconds",
+ destMessage.get());
}
- assertNotNull("Can't receive the Destination message in " + MAX_RECEIVE_TIME
- + " seconds", destMessage);
+ return destMessage.getAndSet(null);
}
}
\ No newline at end of file
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
index 6a6ee45..bf34bd3 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
@@ -36,8 +36,7 @@ public class JMSConduitTest extends AbstractJMSTester {
@Test
public void testGetConfiguration() throws Exception {
- EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", WSDL,
- "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
+ EndpointInfo ei = setupServiceInfo("HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
JMSConduit conduit = setupJMSConduit(ei);
assertEquals("Can't get the right ClientReceiveTimeout", 500L, conduit.getJmsConfig()
.getReceiveTimeout().longValue());
@@ -46,8 +45,7 @@ public class JMSConduitTest extends AbstractJMSTester {
@Test
public void testPrepareSend() throws Exception {
- EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", WSDL,
- "HelloWorldService", "HelloWorldPort");
+ EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
JMSConduit conduit = setupJMSConduit(ei);
Message message = new MessageImpl();
@@ -66,12 +64,11 @@ public class JMSConduitTest extends AbstractJMSTester {
*/
@Test
public void testTimeoutOnReceive() throws Exception {
- EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/hello_world_jms", WSDL,
- "HelloWorldServiceLoop", "HelloWorldPortLoop");
+ EndpointInfo ei = setupServiceInfo("HelloWorldServiceLoop", "HelloWorldPortLoop");
JMSConduit conduit = setupJMSConduitWithObserver(ei);
// If the system is extremely fast. The message could still get through
- conduit.getJmsConfig().setReceiveTimeout(Long.valueOf(1));
+ conduit.getJmsConfig().setReceiveTimeout(1L);
Message message = new MessageImpl();
try {
sendMessageSync(conduit, message);
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java
index c15940f..f785a0f 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigFactoryTest.java
@@ -39,7 +39,7 @@ public class JMSConfigFactoryTest extends AbstractJMSTester {
@Test
public void testUsernameAndPassword() throws Exception {
EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
- JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, target);
+ JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
Assert.assertEquals("User name does not match.", "testUser", config.getUserName());
Assert.assertEquals("Password does not match.", "testPassword", config.getPassword());
}
@@ -92,7 +92,7 @@ public class JMSConfigFactoryTest extends AbstractJMSTester {
@Test
public void testMessageSelectorIsSet() {
EndpointInfo ei = setupServiceInfo("HelloWorldSelectorService", "HelloWorldPort");
- JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, target);
+ JMSConfiguration config = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
Assert.assertEquals("customJMSAttribute=helloWorld", config.getMessageSelector());
}
}
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index f216f6b..22a770c 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -19,12 +19,6 @@
package org.apache.cxf.transport.jms;
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Reader;
-import java.io.StringReader;
-
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
@@ -33,7 +27,6 @@ import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.Topic;
-import org.apache.cxf.helpers.IOUtils;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
@@ -49,7 +42,6 @@ import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -68,17 +60,15 @@ public class JMSDestinationTest extends AbstractJMSTester {
@Test
public void testDurableSubscriber() throws Exception {
- destMessage = null;
EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
JMSConduit conduit = setupJMSConduitWithObserver(ei);
- Message outMessage = new MessageImpl();
- setupMessageHeader(outMessage);
+ Message outMessage = createMessage();
JMSDestination destination = setupJMSDestination(ei);
destination.setMessageObserver(createMessageObserver());
// The JMSBroker (ActiveMQ 5.x) need to take some time to setup the DurableSubscriber
- Thread.sleep(500);
+ Thread.sleep(500L);
sendOneWayMessage(conduit, outMessage);
- waitForReceiveDestMessage();
+ Message destMessage = waitForReceiveDestMessage();
assertNotNull("The destiantion should have got the message ", destMessage);
verifyReceivedMessage(destMessage);
@@ -86,7 +76,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
conduit.close();
destination.shutdown();
}
-
+
@Test(expected = InvalidClientIDException.class)
public void testDurableInvalidClientId() throws Throwable {
Connection con = cf1.createConnection();
@@ -94,7 +84,6 @@ public class JMSDestinationTest extends AbstractJMSTester {
try {
con.setClientID("testClient");
con.start();
- destMessage = null;
EndpointInfo ei = setupServiceInfo("HelloWorldPubSubService", "HelloWorldPubSubPort");
JMSConfiguration jmsConfig = JMSConfigFactory.createFromEndpointInfo(bus, ei, null);
jmsConfig.setDurableSubscriptionClientId("testClient");
@@ -117,12 +106,11 @@ public class JMSDestinationTest extends AbstractJMSTester {
destination.setMessageObserver(createMessageObserver());
JMSConduit conduit = setupJMSConduitWithObserver(ei);
- Message outMessage = new MessageImpl();
- setupMessageHeader(outMessage);
+ Message outMessage = createMessage();
sendOneWayMessage(conduit, outMessage);
// wait for the message to be get from the destination
- waitForReceiveDestMessage();
+ Message destMessage = waitForReceiveDestMessage();
// just verify the Destination inMessage
assertNotNull("The destiantion should have got the message ", destMessage);
verifyReceivedMessage(destMessage);
@@ -131,51 +119,17 @@ public class JMSDestinationTest extends AbstractJMSTester {
destination.shutdown();
}
- private void setupMessageHeader(Message outMessage, String correlationId, String replyTo) {
+ private static void setupMessageHeader(Message outMessage, String correlationId, String replyTo) {
JMSMessageHeadersType header = new JMSMessageHeadersType();
header.setJMSCorrelationID(correlationId);
header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
header.setJMSPriority(1);
- header.setTimeToLive(1000);
- header.setJMSReplyTo(replyTo != null ? replyTo : null);
+ header.setTimeToLive(1000L);
+ header.setJMSReplyTo(replyTo);
outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
outMessage.put(Message.ENCODING, "US-ASCII");
}
- private void setupMessageHeader(Message outMessage) {
- setupMessageHeader(outMessage, "Destination test", null);
- }
-
- private void setupMessageHeader(Message outMessage, String correlationId) {
- setupMessageHeader(outMessage, correlationId, null);
- }
-
- private void verifyReceivedMessage(Message message) {
- ByteArrayInputStream bis = (ByteArrayInputStream)message.getContent(InputStream.class);
- String response = "<not found>";
- if (bis != null) {
- byte[] bytes = new byte[bis.available()];
- try {
- bis.read(bytes);
- } catch (IOException ex) {
- assertFalse("Read the Destination recieved Message error ", false);
- ex.printStackTrace();
- }
- response = IOUtils.newStringFromBytes(bytes);
- } else {
- StringReader reader = (StringReader)message.getContent(Reader.class);
- char[] buffer = new char[5000];
- try {
- int i = reader.read(buffer);
- response = new String(buffer, 0, i);
- } catch (IOException e) {
- assertFalse("Read the Destination recieved Message error ", false);
- e.printStackTrace();
- }
- }
- assertEquals("The response content should be equal", AbstractJMSTester.MESSAGE_CONTENT, response);
- }
-
private void verifyRequestResponseHeaders(Message msgIn, Message msgOut) {
JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
@@ -188,27 +142,6 @@ public class JMSDestinationTest extends AbstractJMSTester {
.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
verifyJmsHeaderEquality(outHeader, inHeader);
-
- }
-
- private void verifyHeaders(Message msgIn, Message msgOut) {
- JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
- .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-
- JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
- .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
-
- verifyJmsHeaderEquality(outHeader, inHeader);
-
- }
-
- private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) {
- assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader
- .getJMSPriority(), inHeader.getJMSPriority());
- assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader
- .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode());
- assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader
- .getJMSType(), inHeader.getJMSType());
}
@Test
@@ -233,8 +166,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
JMSConduit conduit = setupJMSConduitWithObserver(ei);
conduit.getJmsConfig().setCreateSecurityContext(createSecurityContext);
- final Message outMessage = new MessageImpl();
- setupMessageHeader(outMessage, null);
+ final Message outMessage = createMessage();
final JMSDestination destination = setupJMSDestination(ei);
@@ -263,19 +195,17 @@ public class JMSDestinationTest extends AbstractJMSTester {
// wait for the message to be got from the destination,
// create the thread to handler the Destination incoming message
- waitForReceiveInMessage();
- verifyReceivedMessage(inMessage);
+ verifyReceivedMessage(waitForReceiveInMessage());
// wait for a while for the jms session recycling
- inMessage = null;
// Send a second message to check for an issue
// Where the session was closed the second time
sendMessageSync(conduit, outMessage);
- waitForReceiveInMessage();
+ Message inMessage = waitForReceiveInMessage();
verifyReceivedMessage(inMessage);
// wait for a while for the jms session recycling
- Thread.sleep(1000);
+// Thread.sleep(1000L);
conduit.close();
destination.shutdown();
@@ -289,8 +219,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
// set up the conduit send to be true
JMSConduit conduit = setupJMSConduitWithObserver(ei);
- final Message outMessage = new MessageImpl();
- setupMessageHeader(outMessage, null);
+ final Message outMessage = createMessage();
JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage
.get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
@@ -325,7 +254,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
// wait for the message to be got from the destination,
// create the thread to handler the Destination incoming message
- waitForReceiveInMessage();
+ Message inMessage = waitForReceiveInMessage();
verifyReceivedMessage(inMessage);
verifyRequestResponseHeaders(inMessage, outMessage);
@@ -336,7 +265,7 @@ public class JMSDestinationTest extends AbstractJMSTester {
// TODO we need to check the SOAP JMS transport properties here
// wait for a while for the jms session recycling
- Thread.sleep(1000);
+// Thread.sleep(1000L);
conduit.close();
destination.shutdown();
}
@@ -371,10 +300,9 @@ public class JMSDestinationTest extends AbstractJMSTester {
destination.setMessageObserver(createMessageObserver());
// set up the conduit send to be true
JMSConduit conduit = setupJMSConduitWithObserver(ei);
- final Message outMessage = new MessageImpl();
- setupMessageHeader(outMessage, null);
+ final Message outMessage = createMessage();
sendOneWayMessage(conduit, outMessage);
- waitForReceiveDestMessage();
+ Message destMessage = waitForReceiveDestMessage();
SecurityContext securityContext = destMessage.get(SecurityContext.class);
conduit.close();
@@ -390,30 +318,26 @@ public class JMSDestinationTest extends AbstractJMSTester {
/* 1. Test that replyTo destination set in WSDL is NOT used
* in spec compliant mode */
- destMessage = null;
EndpointInfo ei = setupServiceInfo(
"HWStaticReplyQBinMsgService", "HWStaticReplyQBinMsgPort");
JMSConduit conduit = setupJMSConduitWithObserver(ei);
- Message outMessage = new MessageImpl();
- setupMessageHeader(outMessage);
+ Message outMessage = createMessage();
JMSDestination destination = setupJMSDestination(ei);
destination.setMessageObserver(createMessageObserver());
sendOneWayMessage(conduit, outMessage);
- waitForReceiveDestMessage();
+ Message destMessage = waitForReceiveDestMessage();
// just verify the Destination inMessage
assertNotNull("The destination should have got the message ", destMessage);
verifyReplyToNotSet(destMessage);
- destMessage = null;
/* 2. Test that replyTo destination set in WSDL IS used
* in spec non-compliant mode */
sendOneWayMessage(conduit, outMessage);
- waitForReceiveDestMessage();
+ destMessage = waitForReceiveDestMessage();
assertNotNull("The destination should have got the message ", destMessage);
String exName = getQueueName(conduit.getJmsConfig().getReplyDestination());
verifyReplyToSet(destMessage, Queue.class, exName);
- destMessage = null;
/* 3. Test that replyTo destination provided via invocation context
* overrides the value set in WSDL and IS used in spec non-compliant mode */
@@ -422,34 +346,31 @@ public class JMSDestinationTest extends AbstractJMSTester {
exName += ".context";
setupMessageHeader(outMessage, "cidValue", contextReplyTo);
sendOneWayMessage(conduit, outMessage);
- waitForReceiveDestMessage();
+ destMessage = waitForReceiveDestMessage();
assertNotNull("The destiantion should have got the message ", destMessage);
verifyReplyToSet(destMessage, Queue.class, exName);
- destMessage = null;
/* 4. Test that replyTo destination provided via invocation context
* and the value set in WSDL are NOT used in spec non-compliant mode
* when JMSConstants.JMS_SET_REPLY_TO == false */
- setupMessageHeader(outMessage);
+ setupMessageHeader(outMessage, null, null);
outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.FALSE);
sendOneWayMessage(conduit, outMessage);
- waitForReceiveDestMessage();
+ destMessage = waitForReceiveDestMessage();
assertNotNull("The destiantion should have got the message ", destMessage);
verifyReplyToNotSet(destMessage);
- destMessage = null;
/* 5. Test that replyTo destination set in WSDL IS used in spec non-compliant
* mode when JMSConstants.JMS_SET_REPLY_TO == true */
- setupMessageHeader(outMessage);
+ setupMessageHeader(outMessage, null, null);
outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.TRUE);
sendOneWayMessage(conduit, outMessage);
- waitForReceiveDestMessage();
+ destMessage = waitForReceiveDestMessage();
assertNotNull("The destiantion should have got the message ", destMessage);
exName = getQueueName(conduit.getJmsConfig().getReplyDestination());
verifyReplyToSet(destMessage, Queue.class, exName);
- destMessage = null;
conduit.close();
destination.shutdown();
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
index 9d3e37a..3df49fb 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
@@ -21,7 +21,6 @@ package org.apache.cxf.transport.jms;
import javax.jms.ConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
-import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
@@ -30,9 +29,11 @@ import org.apache.cxf.message.MessageImpl;
import org.apache.cxf.transport.jms.util.TestReceiver;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
-import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
/**
* Checks if a CXF client works correlates requests and responses correctly if the server sets the message id
* as correlation id on the response message
@@ -40,7 +41,8 @@ import org.junit.Test;
public class MessageIdAsCorrelationIdJMSConduitTest {
private static final String SERVICE_QUEUE = "test";
private static final String BROKER_URI = "vm://localhost?broker.persistent=false";
- private ConnectionFactory connectionFactory;
+
+ private ConnectionFactory connectionFactory = new PooledConnectionFactory(BROKER_URI);
@Test
public void testSendReceiveWithTempReplyQueue() throws Exception {
@@ -52,13 +54,9 @@ public class MessageIdAsCorrelationIdJMSConduitTest {
sendAndReceive(true, "testreply");
}
- public void sendAndReceive(boolean synchronous, String replyDestination) throws Exception {
- BusFactory bf = BusFactory.newInstance();
- Bus bus = bf.createBus();
- BusFactory.setDefaultBus(bus);
+ private void sendAndReceive(boolean synchronous, String replyDestination) throws InterruptedException {
EndpointReferenceType target = new EndpointReferenceType();
- connectionFactory = new PooledConnectionFactory(BROKER_URI);
TestReceiver receiver = new TestReceiver(connectionFactory, SERVICE_QUEUE, true);
receiver.runAsync();
@@ -68,7 +66,7 @@ public class MessageIdAsCorrelationIdJMSConduitTest {
jmsConfig.setUseConduitIdSelector(false);
jmsConfig.setReplyDestination(replyDestination);
- JMSConduit conduit = new JMSConduit(target, jmsConfig, bus);
+ JMSConduit conduit = new JMSConduit(target, jmsConfig, BusFactory.getDefaultBus());
Exchange exchange = new ExchangeImpl();
exchange.setSynchronous(synchronous);
Message message = new MessageImpl();
@@ -76,21 +74,16 @@ public class MessageIdAsCorrelationIdJMSConduitTest {
conduit.sendExchange(exchange, "Request");
waitForAsyncReply(exchange);
receiver.close();
- if (exchange.getInMessage() == null) {
- throw new RuntimeException("No reply received within 2 seconds");
- }
+ assertNotNull("No reply received within 2 seconds", exchange.getInMessage());
JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)exchange.getInMessage()
.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
- Assert.assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID());
+ assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID());
conduit.close();
- bus.shutdown(true);
}
- private void waitForAsyncReply(Exchange exchange) throws InterruptedException {
- int count = 0;
- while (exchange.getInMessage() == null && count <= 20) {
- Thread.sleep(100);
- count++;
+ private static void waitForAsyncReply(Exchange exchange) throws InterruptedException {
+ for (int count = 0; exchange.getInMessage() == null && count <= 20; count++) {
+ Thread.sleep(100L);
}
}
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
index a60ef7a..8cccc26 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
@@ -18,8 +18,6 @@
*/
package org.apache.cxf.transport.jms;
-import java.util.concurrent.Executors;
-
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
@@ -32,16 +30,17 @@ import javax.jms.TextMessage;
import org.apache.activemq.pool.PooledConnectionFactory;
-import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.assertNotNull;
+
public class PooledConnectionTempQueueTest {
protected static final String SERVICE_QUEUE = "queue1";
@Test
public void testTempQueueIssue() throws JMSException, InterruptedException {
- final PooledConnectionFactory cf = new PooledConnectionFactory("vm://localhost?broker.persistent=false");
+ final ConnectionFactory cf = new PooledConnectionFactory("vm://localhost?broker.persistent=false");
Connection con1 = cf.createConnection();
con1.start();
@@ -49,20 +48,18 @@ public class PooledConnectionTempQueueTest {
// This order seems to matter to reproduce the issue
con1.close();
- Executors.newSingleThreadExecutor().execute(new Runnable() {
- public void run() {
- try {
- receiveAndRespondWithMessageIdAsCorrelationId(cf, SERVICE_QUEUE);
- } catch (JMSException e) {
- e.printStackTrace();
- }
+ new Thread(() -> {
+ try {
+ receiveAndRespondWithMessageIdAsCorrelationId(cf, SERVICE_QUEUE);
+ } catch (Exception e) {
+ e.printStackTrace();
}
- });
+ }).start();
sendWithReplyToTemp(cf, SERVICE_QUEUE);
}
- private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException,
+ private static void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException,
InterruptedException {
Connection con = cf.createConnection();
con.start();
@@ -74,11 +71,11 @@ public class PooledConnectionTempQueueTest {
producer.send(msg);
// This sleep also seems to matter
- Thread.sleep(500);
+ Thread.sleep(500L);
MessageConsumer consumer = session.createConsumer(tempQueue);
Message replyMsg = consumer.receive();
- Assert.assertNotNull(replyMsg);
+ assertNotNull(replyMsg);
//System.out.println(replyMsg.getJMSCorrelationID());
consumer.close();
@@ -88,7 +85,7 @@ public class PooledConnectionTempQueueTest {
con.close();
}
- public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory,
+ public static void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory,
String queueName) throws JMSException {
Connection con = connectionFactory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
index ab77644..053e78e 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
@@ -21,8 +21,6 @@ package org.apache.cxf.transport.jms;
import java.io.IOException;
-import javax.jms.DeliveryMode;
-
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
import org.apache.cxf.message.Message;
@@ -33,36 +31,8 @@ import org.apache.cxf.transport.MessageObserver;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
public class RequestResponseTest extends AbstractJMSTester {
- private void verifyReceivedMessage(Message message) {
- String response = getContent(message);
- assertEquals("The response content should be equal", AbstractJMSTester.MESSAGE_CONTENT, response);
- }
-
- private void verifyHeaders(Message msgIn, Message msgOut) {
- JMSMessageHeadersType outHeader = (JMSMessageHeadersType)msgOut
- .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
-
- JMSMessageHeadersType inHeader = (JMSMessageHeadersType)msgIn
- .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
-
- verifyJmsHeaderEquality(outHeader, inHeader);
-
- }
-
- private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType inHeader) {
- assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader
- .getJMSPriority(), inHeader.getJMSPriority());
- assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader
- .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode());
- assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader
- .getJMSType(), inHeader.getJMSType());
- }
-
-
@Test
public void testRequestQueueResponseTempQueue() throws Exception {
EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
@@ -94,19 +64,8 @@ public class RequestResponseTest extends AbstractJMSTester {
sendAndReceiveMessages(ei, false);
}
- private Message createMessage() {
- Message outMessage = new MessageImpl();
- JMSMessageHeadersType header = new JMSMessageHeadersType();
- header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
- header.setJMSPriority(1);
- header.setTimeToLive(1000);
- outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
- outMessage.put(Message.ENCODING, "US-ASCII");
- return outMessage;
- }
-
- protected void sendAndReceiveMessages(EndpointInfo ei, boolean synchronous) throws IOException {
- inMessage = null;
+ private void sendAndReceiveMessages(EndpointInfo ei, boolean synchronous)
+ throws IOException, InterruptedException {
// set up the conduit send to be true
JMSConduit conduit = setupJMSConduitWithObserver(ei);
final Message outMessage = createMessage();
@@ -120,9 +79,8 @@ public class RequestResponseTest extends AbstractJMSTester {
verifyReceivedMessage(m);
verifyHeaders(m, outMessage);
// setup the message for
- Conduit backConduit;
try {
- backConduit = destination.getBackChannel(m);
+ Conduit backConduit = destination.getBackChannel(m);
// wait for the message to be got from the conduit
Message replyMessage = new MessageImpl();
sendOneWayMessage(backConduit, replyMessage);
@@ -138,13 +96,11 @@ public class RequestResponseTest extends AbstractJMSTester {
// wait for the message to be got from the destination,
// create the thread to handler the Destination incoming message
- waitForReceiveInMessage();
- verifyReceivedMessage(inMessage);
+ verifyReceivedMessage(waitForReceiveInMessage());
} finally {
conduit.close();
destination.shutdown();
}
}
-
-}
\ No newline at end of file
+}
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java
index b97a265..d9f9e9c 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/uri/URIConfiguredConduitTest.java
@@ -36,10 +36,12 @@ import org.apache.cxf.transport.jms.JMSMessageHeadersType;
import org.apache.cxf.transport.jms.util.TestReceiver;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
-import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
/**
* Checks if a CXF client works correlates requests and responses correctly if the server sets the message id
* as correlation id on the response message
@@ -102,20 +104,16 @@ public class URIConfiguredConduitTest {
waitForAsyncReply(exchange);
receiver.close();
- if (exchange.getInMessage() == null) {
- throw new RuntimeException("No reply received within 2 seconds");
- }
+ assertNotNull("No reply received within 2 seconds", exchange.getInMessage());
JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)exchange.getInMessage()
.get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
- Assert.assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID());
+ assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID());
conduit.close();
}
- private void waitForAsyncReply(Exchange exchange) throws InterruptedException {
- int count = 0;
- while (exchange.getInMessage() == null && count <= 20) {
- Thread.sleep(100);
- count++;
+ private static void waitForAsyncReply(Exchange exchange) throws InterruptedException {
+ for (int count = 0; exchange.getInMessage() == null && count <= 20; count++) {
+ Thread.sleep(100L);
}
}
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
index ccdb450..937f0b5 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/util/MessageListenerTest.java
@@ -40,7 +40,6 @@ import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
import org.awaitility.Awaitility;
import org.easymock.Capture;
-import org.junit.Assert;
import org.junit.Test;
@@ -50,6 +49,7 @@ import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.newCapture;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
public class MessageListenerTest {
@@ -77,7 +77,7 @@ public class MessageListenerTest {
Awaitility.await().until(() -> !container.isRunning());
verify(exListener);
JMSException ex = captured.getValue();
- Assert.assertEquals("The connection is already closed", ex.getMessage());
+ assertEquals("The connection is already closed", ex.getMessage());
}
@Test
@@ -106,7 +106,7 @@ public class MessageListenerTest {
verify(exListener);
JMSException ex = captured.getValue();
// Closing the pooled connection will result in a NPE when using it
- Assert.assertEquals("Wrapped exception. null", ex.getMessage());
+ assertEquals("Wrapped exception. null", ex.getMessage());
}
@Test
@@ -147,14 +147,14 @@ public class MessageListenerTest {
container.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
container.start();
- assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0);
+ assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0L);
sendMessage(connection, dest, OK);
- assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000);
+ assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000L);
sendMessage(connection, dest, FAIL);
assertNumMessagesInQueue("Even when an exception occurs the message should be committed", connection,
- dest, 0, 1000);
+ dest, 0, 1000L);
container.stop();
connection.close();
@@ -178,17 +178,17 @@ public class MessageListenerTest {
private void testTransactionalBehaviour(Connection connection, Queue dest) throws JMSException,
InterruptedException {
Queue dlq = JMSUtil.createQueue(connection, "ActiveMQ.DLQ");
- assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0);
- assertNumMessagesInQueue("At the start the DLQ should be empty", connection, dlq, 0, 0);
+ assertNumMessagesInQueue("At the start the queue should be empty", connection, dest, 0, 0L);
+ assertNumMessagesInQueue("At the start the DLQ should be empty", connection, dlq, 0, 0L);
sendMessage(connection, dest, OK);
- assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000);
+ assertNumMessagesInQueue("This message should be committed", connection, dest, 0, 1000L);
sendMessage(connection, dest, FAILFIRST);
- assertNumMessagesInQueue("Should succeed on second try", connection, dest, 0, 2000);
+ assertNumMessagesInQueue("Should succeed on second try", connection, dest, 0, 2000L);
sendMessage(connection, dest, FAIL);
- assertNumMessagesInQueue("Should be rolled back", connection, dlq, 1, 2500);
+ assertNumMessagesInQueue("Should be rolled back", connection, dlq, 1, 2500L);
}
private Connection createConnection(String name) throws JMSException {
@@ -227,25 +227,27 @@ public class MessageListenerTest {
}
consumer.close();
session.close();
- assertNumMessagesInQueue("", connection, dest, 0, 0);
+ assertNumMessagesInQueue("", connection, dest, 0, 0L);
}
- private void assertNumMessagesInQueue(String message, Connection connection, Queue queue,
- int expectedNum, int timeout) throws JMSException,
+ private static void assertNumMessagesInQueue(String message, Connection connection, Queue queue,
+ int expectedNum, long timeout) throws JMSException,
InterruptedException {
long startTime = System.currentTimeMillis();
int actualNum;
do {
actualNum = JMSUtil.getNumMessages(connection, queue);
-
+ if (actualNum == expectedNum) {
+ break;
+ }
//System.out.println("Messages in queue " + queue.getQueueName() + ": " + actualNum
// + ", expecting: " + expectedNum);
- Thread.sleep(100);
+ Thread.sleep(100L);
} while ((System.currentTimeMillis() - startTime < timeout) && expectedNum != actualNum);
- Assert.assertEquals(message + " -> number of messages on queue", expectedNum, actualNum);
+ assertEquals(message + " -> number of messages on queue", expectedNum, actualNum);
}
- private void sendMessage(Connection connection, Destination dest, String content) throws JMSException,
+ private static void sendMessage(Connection connection, Destination dest, String content) throws JMSException,
InterruptedException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(dest);
@@ -253,7 +255,7 @@ public class MessageListenerTest {
prod.send(message);
prod.close();
session.close();
- Thread.sleep(500); // Give receiver some time to process
+// Thread.sleep(500L); // Give receiver some time to process
}
private static final class TestMessageListener implements MessageListener {