You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2021/09/02 20:55:39 UTC

[cxf] branch 3.3.x-fixes updated (6c7bfa7 -> 4350c91)

This is an automated email from the ASF dual-hosted git repository.

reta pushed a change to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git.


    from 6c7bfa7  Recording .gitmergeinfo Changes
     new 4f1ba2b  Delete temporary queue when it is used (#826)
     new 4350c91  CXF-8591: Temporary queues are never deleted when the are used. Added a test case

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/cxf/transport/jms/JMSConduit.java   |  5 +-
 .../apache/cxf/transport/jms/JMSConfiguration.java | 13 ++++
 .../cxf/transport/jms/JMSDestinationTest.java      | 73 ++++++++++++++++++++++
 3 files changed, 89 insertions(+), 2 deletions(-)

[cxf] 01/02: Delete temporary queue when it is used (#826)

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 4f1ba2b78b2f53db6cb78aa93b00aa1c63281e3b
Author: valentin-matignon <87...@users.noreply.github.com>
AuthorDate: Thu Sep 2 16:38:59 2021 +0200

    Delete temporary queue when it is used (#826)
    
    * Delete temporary queue when it is used
    
    * Delete temporary queues in JMSConfiguration
    
    * handle JMSException directly in resetCachedReplyDestination method
    
    * Call JMSConfiguration.resetCachedReplyDestination() after shutdownListeners()
    
    * Call JMSConfiguration.resetCachedReplyDestination() before ResourceCloser.close(...)
    
    * Checkstyle correction
---
 .../main/java/org/apache/cxf/transport/jms/JMSConduit.java  |  5 +++--
 .../java/org/apache/cxf/transport/jms/JMSConfiguration.java | 13 +++++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)

diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
index abc9650..030ce3e 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
@@ -127,7 +127,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me
     private void trySetExListener(Connection conn) {
         try {
             conn.setExceptionListener(new ExceptionListener() {
-                
+
                 @Override
                 public void onException(JMSException exception) {
                     jmsConfig.resetCachedReplyDestination();
@@ -186,9 +186,9 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me
                 if (exchange.get(JMSUtil.JMS_MESSAGE_CONSUMER) != null) {
                     ResourceCloser.close(exchange.get(JMSUtil.JMS_MESSAGE_CONSUMER));
                 }
+                jmsConfig.resetCachedReplyDestination();
                 ResourceCloser.close(connection);
                 this.connection = null;
-                jmsConfig.resetCachedReplyDestination();
             }
             this.staticReplyDestination = null;
             try {
@@ -511,6 +511,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me
     }
     public synchronized void close() {
         shutdownListeners();
+        jmsConfig.resetCachedReplyDestination();
         ResourceCloser.close(connection);
         connection = null;
         LOG.log(Level.FINE, "JMSConduit closed ");
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index a1034ba..44e7477 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -19,16 +19,20 @@
 package org.apache.cxf.transport.jms;
 
 import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
 import javax.naming.NamingException;
 import javax.transaction.TransactionManager;
 
 import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.transport.jms.util.DestinationResolver;
 import org.apache.cxf.transport.jms.util.JMSDestinationResolver;
 import org.apache.cxf.transport.jms.util.JndiHelper;
@@ -40,6 +44,8 @@ public class JMSConfiguration {
      */
     public static final int DEFAULT_VALUE = -1;
 
+    private static final Logger LOG = LogUtils.getL7dLogger(JMSConfiguration.class);
+
     private volatile ConnectionFactory connectionFactory;
     private Properties jndiEnvironment;
     private String connectionFactoryName;
@@ -489,6 +495,13 @@ public class JMSConfiguration {
 
     public void resetCachedReplyDestination() {
         synchronized (this) {
+            if (replyDestinationDest instanceof TemporaryQueue) {
+                try {
+                    ((TemporaryQueue) replyDestinationDest).delete();
+                } catch (JMSException exception) {
+                    LOG.log(Level.WARNING, "Exception on temporary queue deletion", exception);
+                }
+            }
             this.replyDestinationDest = null;
         }
     }

[cxf] 02/02: CXF-8591: Temporary queues are never deleted when the are used. Added a test case

Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git

commit 4350c910770a1fe60f329a66183265ce7f7b4acb
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Thu Sep 2 11:13:04 2021 -0400

    CXF-8591: Temporary queues are never deleted when the are used. Added a test case
    
    (cherry picked from commit d40f53ea97eea476f0c6af30f352b3e9f0bb5903)
    (cherry picked from commit 88ffff4af92a0ee4cdce8c4b26830b53766957a2)
---
 .../cxf/transport/jms/JMSDestinationTest.java      | 73 ++++++++++++++++++++++
 1 file changed, 73 insertions(+)

diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index ab8d156..a92a562 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -22,6 +22,7 @@ package org.apache.cxf.transport.jms;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
 import javax.jms.Connection;
@@ -38,6 +39,8 @@ import javax.jms.ServerSessionPool;
 import javax.jms.Session;
 import javax.jms.Topic;
 
+import org.apache.activemq.EnhancedConnection;
+import org.apache.activemq.advisory.DestinationSource;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.ExchangeImpl;
@@ -53,9 +56,12 @@ import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 public class JMSDestinationTest extends AbstractJMSTester {
@@ -383,6 +389,73 @@ public class JMSDestinationTest extends AbstractJMSTester {
         conduit.close();
         destination.shutdown();
     }
+    
+    @Test
+    public void testTemporaryQueueDeletionUponReset() throws Exception {
+        EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
+
+        // set up the conduit send to be true
+        JMSConduit conduit = setupJMSConduitWithObserver(ei);
+        assertNull(conduit.getJmsConfig().getReplyDestination());
+
+        // Store the connection so we could check temporary queues
+        final AtomicReference<DestinationSource> destinationSource = new AtomicReference<>();
+        final Message outMessage = createMessage();
+        
+        // Capture the DestinationSource instance associated with the connection
+        final JMSDestination destination = setupJMSDestination(ei, c -> new ConnectionFactory() {
+            @Override
+            public Connection createConnection() throws JMSException {
+                final Connection connection = c.createConnection();
+                destinationSource.set(((EnhancedConnection)connection).getDestinationSource());
+                return connection;
+            }
+
+            @Override
+            public Connection createConnection(String userName, String password) throws JMSException {
+                final Connection connection = c.createConnection(userName, password);
+                destinationSource.set(((EnhancedConnection)connection).getDestinationSource());
+                return connection;
+            }
+        });
+
+        // set up MessageObserver for handling the conduit message
+        final MessageObserver observer = new MessageObserver() {
+            public void onMessage(Message m) {
+                final Exchange exchange = new ExchangeImpl();
+                exchange.setInMessage(m);
+                m.setExchange(exchange);
+
+                try {
+                    final Conduit backConduit = destination.getBackChannel(m);
+                    sendOneWayMessage(backConduit, new MessageImpl());
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+        
+        destination.setMessageObserver(observer);
+        sendMessageSync(conduit, outMessage);
+        // wait for the message to be got from the destination,
+        // create the thread to handler the Destination incoming message
+
+        Message inMessage = waitForReceiveInMessage();
+        verifyReceivedMessage(inMessage);
+
+        final DestinationSource ds = destinationSource.get();
+        assertThat(ds.getTemporaryQueues().size(), equalTo(1));
+        
+        // Force manual temporary queue deletion by resetting the reply destination
+        conduit.getJmsConfig().resetCachedReplyDestination();
+        // The queue deletion events (as well as others) are propagated asynchronously
+        await()
+            .atMost(1, TimeUnit.SECONDS)
+            .untilAsserted(() -> assertThat(ds.getTemporaryQueues().size(), equalTo(0)));
+        
+        conduit.close();
+        destination.shutdown();
+    }
 
     @Test
     public void testIsMultiplexCapable() throws Exception {