You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2021/09/02 16:09:46 UTC

[cxf] branch master updated: CXF-8591: Temporary queues are never deleted when the are used. Added a test case

This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cxf.git


The following commit(s) were added to refs/heads/master by this push:
     new d40f53e  CXF-8591: Temporary queues are never deleted when the are used. Added a test case
d40f53e is described below

commit d40f53ea97eea476f0c6af30f352b3e9f0bb5903
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Thu Sep 2 11:13:04 2021 -0400

    CXF-8591: Temporary queues are never deleted when the are used. Added a test case
---
 .../cxf/transport/jms/JMSDestinationTest.java      | 73 ++++++++++++++++++++++
 1 file changed, 73 insertions(+)

diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index 597d1bd..e92a4a4 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -22,6 +22,7 @@ package org.apache.cxf.transport.jms;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
 import javax.jms.Connection;
@@ -38,6 +39,8 @@ import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 import javax.jms.Topic;
 
+import org.apache.activemq.EnhancedConnection;
+import org.apache.activemq.advisory.DestinationSource;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
@@ -53,9 +56,12 @@ import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 public class JMSDestinationTest extends AbstractJMSTester {
@@ -383,6 +389,73 @@ public class JMSDestinationTest extends AbstractJMSTester {
         conduit.close();
         destination.shutdown();
     }
+    
+    @Test
+    public void testTemporaryQueueDeletionUponReset() throws Exception {
+        EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
+
+        // set up the conduit send to be true
+        JMSConduit conduit = setupJMSConduitWithObserver(ei);
+        assertNull(conduit.getJmsConfig().getReplyDestination());
+
+        // Store the connection so we could check temporary queues
+        final AtomicReference<DestinationSource> destinationSource = new AtomicReference<>();
+        final Message outMessage = createMessage();
+        
+        // Capture the DestinationSource instance associated with the connection
+        final JMSDestination destination = setupJMSDestination(ei, c -> new ConnectionFactory() {
+            @Override
+            public Connection createConnection() throws JMSException {
+                final Connection connection = c.createConnection();
+                destinationSource.set(((EnhancedConnection)connection).getDestinationSource());
+                return connection;
+            }
+
+            @Override
+            public Connection createConnection(String userName, String password) throws JMSException {
+                final Connection connection = c.createConnection(userName, password);
+                destinationSource.set(((EnhancedConnection)connection).getDestinationSource());
+                return connection;
+            }
+        });
+
+        // set up MessageObserver for handling the conduit message
+        final MessageObserver observer = new MessageObserver() {
+            public void onMessage(Message m) {
+                final Exchange exchange = new ExchangeImpl();
+                exchange.setInMessage(m);
+                m.setExchange(exchange);
+
+                try {
+                    final Conduit backConduit = destination.getBackChannel(m);
+                    sendOneWayMessage(backConduit, new MessageImpl());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+        
+        destination.setMessageObserver(observer);
+        sendMessageSync(conduit, outMessage);
+        // wait for the message to be got from the destination,
+        // create the thread to handler the Destination incoming message
+
+        Message inMessage = waitForReceiveInMessage();
+        verifyReceivedMessage(inMessage);
+
+        final DestinationSource ds = destinationSource.get();
+        assertThat(ds.getTemporaryQueues().size(), equalTo(1));
+        
+        // Force manual temporary queue deletion by resetting the reply destination
+        conduit.getJmsConfig().resetCachedReplyDestination();
+        // The queue deletion events (as well as others) are propagated asynchronously
+        await()
+            .atMost(1, TimeUnit.SECONDS)
+            .untilAsserted(() -> assertThat(ds.getTemporaryQueues().size(), equalTo(0)));
+        
+        conduit.close();
+        destination.shutdown();
+    }
 
     @Test
     public void testIsMultiplexCapable() throws Exception {