You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/10/08 21:21:36 UTC
[activemq-artemis] branch master updated: ARTEMIS-2514 dupl cache
leak w/clustered temp q
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 2992daa ARTEMIS-2514 dupl cache leak w/clustered temp q
new 49c96c6 This closes #2858
2992daa is described below
commit 2992daaeb13f5fe33ad5db28f7210692c8283c01
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Mon Oct 7 11:09:42 2019 -0500
ARTEMIS-2514 dupl cache leak w/clustered temp q
---
.../server/cluster/impl/ClusterConnectionImpl.java | 2 +-
.../distribution/TemporaryJMSQueueClusterTest.java | 136 +++++++++++++++++++++
2 files changed, 137 insertions(+), 1 deletion(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index d6f34c9..7680780 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -1292,7 +1292,7 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
return;
}
- postOffice.removeBinding(binding.getUniqueName(), null, false);
+ postOffice.removeBinding(binding.getUniqueName(), null, true);
}
private synchronized void resetBinding(final SimpleString clusterName) throws Exception {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TemporaryJMSQueueClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TemporaryJMSQueueClusterTest.java
new file mode 100644
index 0000000..a69018c
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/TemporaryJMSQueueClusterTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TextMessage;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.junit.Test;
+
+public class TemporaryJMSQueueClusterTest extends ClusterTestBase {
+
+ @Test
+ public void testDuplicateCacheCleanupForTempQueues() throws Exception {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+ servers[0].getConfiguration().getClusterConfigurations().get(0).setDuplicateDetection(true);
+ servers[0].getAddressSettingsRepository().addMatch("#", new AddressSettings().setRedistributionDelay(0));
+
+ setupClusterConnection("cluster1", "", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+ servers[1].getConfiguration().getClusterConfigurations().get(0).setDuplicateDetection(true);
+ servers[1].getAddressSettingsRepository().addMatch("#", new AddressSettings().setRedistributionDelay(0));
+
+ startServers(0, 1);
+
+ final Map<String, TextMessage> requestMap = new HashMap<>();
+ ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61616");
+
+ for (int j = 0; j < 10; j++) {
+ try (Connection connection = cf.createConnection()) {
+ SimpleMessageListener server = new SimpleMessageListener().start();
+ Queue requestQueue = ActiveMQJMSClient.createQueue("exampleQueue");
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(requestQueue);
+ TemporaryQueue replyQueue = session.createTemporaryQueue();
+ MessageConsumer replyConsumer = session.createConsumer(replyQueue);
+
+ int numMessages = 10;
+ for (int i = 0; i < numMessages; i++) {
+
+ TextMessage requestMsg = session.createTextMessage("A request message");
+ requestMsg.setJMSReplyTo(replyQueue);
+ producer.send(requestMsg);
+ requestMap.put(requestMsg.getJMSMessageID(), requestMsg);
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage replyMessageReceived = (TextMessage) replyConsumer.receive();
+ assertNotNull(requestMap.get(replyMessageReceived.getJMSCorrelationID()));
+ }
+
+ replyConsumer.close();
+ replyQueue.delete();
+ server.shutdown();
+ }
+
+ }
+
+ assertTrue(((PostOfficeImpl) servers[0].getPostOffice()).getDuplicateIDCaches().size() <= 1);
+ assertTrue(((PostOfficeImpl) servers[1].getPostOffice()).getDuplicateIDCaches().size() <= 1);
+
+ }
+
+ public boolean isNetty() {
+ return true;
+ }
+}
+
+class SimpleMessageListener implements MessageListener {
+
+ private Session session;
+ MessageProducer replyProducer;
+ MessageConsumer requestConsumer;
+ Connection connection = null;
+
+ public SimpleMessageListener start() throws Exception {
+ Queue requestQueue = ActiveMQJMSClient.createQueue("exampleQueue");
+ ConnectionFactory cf = new ActiveMQConnectionFactory("tcp://localhost:61617");
+ connection = cf.createConnection("guest", "guest");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection.start();
+ replyProducer = session.createProducer(null);
+ requestConsumer = session.createConsumer(requestQueue);
+ requestConsumer.setMessageListener(this);
+ return this;
+ }
+
+ @Override
+ public void onMessage(final javax.jms.Message request) {
+ try {
+ Destination replyDestination = request.getJMSReplyTo();
+ TextMessage replyMessage = session.createTextMessage("A reply message");
+ replyMessage.setJMSCorrelationID(request.getJMSMessageID());
+ replyProducer.send(replyDestination, replyMessage);
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public void shutdown() throws JMSException {
+ connection.close();
+ }
+}