You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by re...@apache.org on 2021/09/02 20:55:39 UTC
[cxf] branch 3.3.x-fixes updated (6c7bfa7 -> 4350c91)
This is an automated email from the ASF dual-hosted git repository.
reta pushed a change to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git.
from 6c7bfa7 Recording .gitmergeinfo Changes
new 4f1ba2b Delete temporary queue when it is used (#826)
new 4350c91 CXF-8591: Temporary queues are never deleted when the are used. Added a test case
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../org/apache/cxf/transport/jms/JMSConduit.java | 5 +-
.../apache/cxf/transport/jms/JMSConfiguration.java | 13 ++++
.../cxf/transport/jms/JMSDestinationTest.java | 73 ++++++++++++++++++++++
3 files changed, 89 insertions(+), 2 deletions(-)
[cxf] 01/02: Delete temporary queue when it is used (#826)
Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 4f1ba2b78b2f53db6cb78aa93b00aa1c63281e3b
Author: valentin-matignon <87...@users.noreply.github.com>
AuthorDate: Thu Sep 2 16:38:59 2021 +0200
Delete temporary queue when it is used (#826)
* Delete temporary queue when it is used
* Delete temporary queues in JMSConfiguration
* handle JMSException directly in resetCachedReplyDestination method
* Call JMSConfiguration.resetCachedReplyDestination() after shutdownListeners()
* Call JMSConfiguration.resetCachedReplyDestination() before ResourceCloser.close(...)
* Checkstyle correction
---
.../main/java/org/apache/cxf/transport/jms/JMSConduit.java | 5 +++--
.../java/org/apache/cxf/transport/jms/JMSConfiguration.java | 13 +++++++++++++
2 files changed, 16 insertions(+), 2 deletions(-)
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
index abc9650..030ce3e 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
@@ -127,7 +127,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me
private void trySetExListener(Connection conn) {
try {
conn.setExceptionListener(new ExceptionListener() {
-
+
@Override
public void onException(JMSException exception) {
jmsConfig.resetCachedReplyDestination();
@@ -186,9 +186,9 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me
if (exchange.get(JMSUtil.JMS_MESSAGE_CONSUMER) != null) {
ResourceCloser.close(exchange.get(JMSUtil.JMS_MESSAGE_CONSUMER));
}
+ jmsConfig.resetCachedReplyDestination();
ResourceCloser.close(connection);
this.connection = null;
- jmsConfig.resetCachedReplyDestination();
}
this.staticReplyDestination = null;
try {
@@ -511,6 +511,7 @@ public class JMSConduit extends AbstractConduit implements JMSExchangeSender, Me
}
public synchronized void close() {
shutdownListeners();
+ jmsConfig.resetCachedReplyDestination();
ResourceCloser.close(connection);
connection = null;
LOG.log(Level.FINE, "JMSConduit closed ");
diff --git a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
index a1034ba..44e7477 100644
--- a/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
+++ b/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
@@ -19,16 +19,20 @@
package org.apache.cxf.transport.jms;
import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
+import javax.jms.TemporaryQueue;
import javax.naming.NamingException;
import javax.transaction.TransactionManager;
import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.common.logging.LogUtils;
import org.apache.cxf.transport.jms.util.DestinationResolver;
import org.apache.cxf.transport.jms.util.JMSDestinationResolver;
import org.apache.cxf.transport.jms.util.JndiHelper;
@@ -40,6 +44,8 @@ public class JMSConfiguration {
*/
public static final int DEFAULT_VALUE = -1;
+ private static final Logger LOG = LogUtils.getL7dLogger(JMSConfiguration.class);
+
private volatile ConnectionFactory connectionFactory;
private Properties jndiEnvironment;
private String connectionFactoryName;
@@ -489,6 +495,13 @@ public class JMSConfiguration {
public void resetCachedReplyDestination() {
synchronized (this) {
+ if (replyDestinationDest instanceof TemporaryQueue) {
+ try {
+ ((TemporaryQueue) replyDestinationDest).delete();
+ } catch (JMSException exception) {
+ LOG.log(Level.WARNING, "Exception on temporary queue deletion", exception);
+ }
+ }
this.replyDestinationDest = null;
}
}
[cxf] 02/02: CXF-8591: Temporary queues are never deleted when the
are used. Added a test case
Posted by re...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
reta pushed a commit to branch 3.3.x-fixes
in repository https://gitbox.apache.org/repos/asf/cxf.git
commit 4350c910770a1fe60f329a66183265ce7f7b4acb
Author: Andriy Redko <dr...@gmail.com>
AuthorDate: Thu Sep 2 11:13:04 2021 -0400
CXF-8591: Temporary queues are never deleted when the are used. Added a test case
(cherry picked from commit d40f53ea97eea476f0c6af30f352b3e9f0bb5903)
(cherry picked from commit 88ffff4af92a0ee4cdce8c4b26830b53766957a2)
---
.../cxf/transport/jms/JMSDestinationTest.java | 73 ++++++++++++++++++++++
1 file changed, 73 insertions(+)
diff --git a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
index ab8d156..a92a562 100644
--- a/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
+++ b/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
@@ -22,6 +22,7 @@ package org.apache.cxf.transport.jms;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import javax.jms.Connection;
@@ -38,6 +39,8 @@ import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
+import org.apache.activemq.EnhancedConnection;
+import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.util.ServiceStopper;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
@@ -53,9 +56,12 @@ import org.apache.cxf.transport.jms.util.ResourceCloser;
import org.junit.Ignore;
import org.junit.Test;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class JMSDestinationTest extends AbstractJMSTester {
@@ -383,6 +389,73 @@ public class JMSDestinationTest extends AbstractJMSTester {
conduit.close();
destination.shutdown();
}
+
+ @Test
+ public void testTemporaryQueueDeletionUponReset() throws Exception {
+ EndpointInfo ei = setupServiceInfo("HelloWorldService", "HelloWorldPort");
+
+ // set up the conduit send to be true
+ JMSConduit conduit = setupJMSConduitWithObserver(ei);
+ assertNull(conduit.getJmsConfig().getReplyDestination());
+
+ // Store the connection so we could check temporary queues
+ final AtomicReference<DestinationSource> destinationSource = new AtomicReference<>();
+ final Message outMessage = createMessage();
+
+ // Capture the DestinationSource instance associated with the connection
+ final JMSDestination destination = setupJMSDestination(ei, c -> new ConnectionFactory() {
+ @Override
+ public Connection createConnection() throws JMSException {
+ final Connection connection = c.createConnection();
+ destinationSource.set(((EnhancedConnection)connection).getDestinationSource());
+ return connection;
+ }
+
+ @Override
+ public Connection createConnection(String userName, String password) throws JMSException {
+ final Connection connection = c.createConnection(userName, password);
+ destinationSource.set(((EnhancedConnection)connection).getDestinationSource());
+ return connection;
+ }
+ });
+
+ // set up MessageObserver for handling the conduit message
+ final MessageObserver observer = new MessageObserver() {
+ public void onMessage(Message m) {
+ final Exchange exchange = new ExchangeImpl();
+ exchange.setInMessage(m);
+ m.setExchange(exchange);
+
+ try {
+ final Conduit backConduit = destination.getBackChannel(m);
+ sendOneWayMessage(backConduit, new MessageImpl());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ destination.setMessageObserver(observer);
+ sendMessageSync(conduit, outMessage);
+ // wait for the message to be got from the destination,
+ // create the thread to handler the Destination incoming message
+
+ Message inMessage = waitForReceiveInMessage();
+ verifyReceivedMessage(inMessage);
+
+ final DestinationSource ds = destinationSource.get();
+ assertThat(ds.getTemporaryQueues().size(), equalTo(1));
+
+ // Force manual temporary queue deletion by resetting the reply destination
+ conduit.getJmsConfig().resetCachedReplyDestination();
+ // The queue deletion events (as well as others) are propagated asynchronously
+ await()
+ .atMost(1, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertThat(ds.getTemporaryQueues().size(), equalTo(0)));
+
+ conduit.close();
+ destination.shutdown();
+ }
@Test
public void testIsMultiplexCapable() throws Exception {