You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2021/09/02 19:56:10 UTC
[cxf] 02/02: CXF-8591: Temporary queues are never deleted when the
are used. Added a test case
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch 3.4.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 88ffff4af92a0ee4cdce8c4b26830b53766957a2
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Thu Sep 2 11:13:04 2021 -0400
CXF-8591: Temporary queues are never deleted when the are used. Added a test case
(cherry picked from commit d40f53ea97eea476f0c6af30f352b3e9f0bb5903)
---
.../cxf/transport/jms/JMSDestinationTest.java | 73 ++++++++++++++++++++++
1 file changed, 73 insertions(+)
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index 597d1bd..e92a4a4 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -22,6 +22,7 @@ package org.apache.cxf.transport.jms;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.jms.Connection;
@@ -38,6 +39,8 @@ import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
+import org.apache.activemq.EnhancedConnection;
+import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.util.ServiceStopper;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
@@ -53,9 +56,12 @@ import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.junit.Ignore;
import org.junit.Test;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class JMSDestinationTest extends AbstractJMSTester {
@@ -383,6 +389,73 @@ public class JMSDestinationTest extends AbstractJMSTester {
conduit.close();
destination.shutdown();
}
+
+ @Test
+ public void testTemporaryQueueDeletionUponReset() throws Exception {
+ EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
+
+ // set up the conduit send to be true
+ JMSConduit conduit = setupJMSConduitWithObserver(ei);
+ assertNull(conduit.getJmsConfig().getReplyDestination());
+
+ // Store the connection so we could check temporary queues
+ final AtomicReference<DestinationSource> destinationSource = new AtomicReference<>();
+ final Message outMessage = createMessage();
+
+ // Capture the DestinationSource instance associated with the connection
+ final JMSDestination destination = setupJMSDestination(ei, c -> new ConnectionFactory() {
+ @Override
+ public Connection createConnection() throws JMSException {
+ final Connection connection = c.createConnection();
+ destinationSource.set(((EnhancedConnection)connection).getDestinationSource());
+ return connection;
+ }
+
+ @Override
+ public Connection createConnection(String userName, String password) throws JMSException {
+ final Connection connection = c.createConnection(userName, password);
+ destinationSource.set(((EnhancedConnection)connection).getDestinationSource());
+ return connection;
+ }
+ });
+
+ // set up MessageObserver for handling the conduit message
+ final MessageObserver observer = new MessageObserver() {
+ public void onMessage(Message m) {
+ final Exchange exchange = new ExchangeImpl();
+ exchange.setInMessage(m);
+ m.setExchange(exchange);
+
+ try {
+ final Conduit backConduit = destination.getBackChannel(m);
+ sendOneWayMessage(backConduit, new MessageImpl());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ destination.setMessageObserver(observer);
+ sendMessageSync(conduit, outMessage);
+ // wait for the message to be got from the destination,
+ // create the thread to handler the Destination incoming message
+
+ Message inMessage = waitForReceiveInMessage();
+ verifyReceivedMessage(inMessage);
+
+ final DestinationSource ds = destinationSource.get();
+ assertThat(ds.getTemporaryQueues().size(), equalTo(1));
+
+ // Force manual temporary queue deletion by resetting the reply destination
+ conduit.getJmsConfig().resetCachedReplyDestination();
+ // The queue deletion events (as well as others) are propagated asynchronously
+ await()
+ .atMost(1, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertThat(ds.getTemporaryQueues().size(), equalTo(0)));
+
+ conduit.close();
+ destination.shutdown();
+ }
@Test
public void testIsMultiplexCapable() throws Exception {