You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/07/16 13:19:15 UTC
[activemq-artemis] 02/03: ARTEMIS-3384 Adding tests around
duplicate detection
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit c479cb558a85b2c162183c78d6f3121de92a99e9
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Jul 15 22:51:43 2021 -0400
ARTEMIS-3384 Adding tests around duplicate detection
---
.../tests/integration/DuplicateDetectionTest.java | 25 ++++++++-
.../tests/integration/amqp/AmqpSenderTest.java | 62 ++++++++++++++++++++++
.../cluster/bridge/BridgeReconnectTest.java | 43 ++++++++++++++-
3 files changed, 128 insertions(+), 2 deletions(-)
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
index 75bfc79..045fde9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
@@ -20,6 +20,9 @@ import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import java.util.Arrays;
+import java.util.Collection;
+
import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
@@ -42,11 +45,27 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class DuplicateDetectionTest extends ActiveMQTestBase {
+ @Parameterized.Parameters(name = "persistentCache={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ {true}, {false}
+ });
+ }
+
+ @Parameterized.Parameter(0)
+ public boolean persistCache;
+
+
+
private final Logger log = Logger.getLogger(this.getClass());
private ActiveMQServer server;
@@ -217,6 +236,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
// we would eventually have a higher number of caches while we couldn't have time to clear previous ones
@Test
public void testShrinkCache() throws Exception {
+ Assume.assumeTrue("This test would restart the server", persistCache);
server.stop();
server.getConfiguration().setIDCacheSize(150);
server.start();
@@ -1454,6 +1474,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
@Test
public void testPersistTransactional() throws Exception {
+ Assume.assumeTrue("This test would restart the server", persistCache);
ClientSession session = sf.createSession(false, false, false);
session.start();
@@ -1709,6 +1730,8 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
@Test
public void testPersistXA1() throws Exception {
+ Assume.assumeTrue("This test would restart the server", persistCache);
+
ClientSession session = addClientSession(sf.createSession(true, false, false));
Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
@@ -1802,7 +1825,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
public void setUp() throws Exception {
super.setUp();
- config = createDefaultInVMConfig().setIDCacheSize(cacheSize);
+ config = createDefaultInVMConfig().setIDCacheSize(cacheSize).setPersistIDCache(persistCache);
server = createServer(true, config);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
index f556bdf..62b9d4d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
@@ -22,6 +22,8 @@ import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -45,12 +47,30 @@ import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Sender;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
/**
* Test broker behavior when creating AMQP senders
*/
+@RunWith(Parameterized.class)
public class AmqpSenderTest extends AmqpClientTestSupport {
+ @Parameterized.Parameters(name = "persistentCache={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ {true}, {false}
+ });
+ }
+
+ @Override
+ protected void addConfiguration(ActiveMQServer server) {
+ server.getConfiguration().setPersistIDCache(persistCache);
+ }
+
+ @Parameterized.Parameter(0)
+ public boolean persistCache;
+
@Override
protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
}
@@ -253,6 +273,48 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
+ public void testDuplicateDetectionRollback() throws Exception {
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
+ try (Connection connection = factory.createConnection();
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
+ javax.jms.Queue producerQueue = session.createQueue(getQueueName());
+
+ MessageProducer producer = session.createProducer(producerQueue);
+ javax.jms.Message message = session.createTextMessage("test");
+ message.setStringProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123");
+ producer.send(message);
+ session.rollback();
+
+ producer.send(message);
+ session.commit();
+
+ connection.start();
+
+ MessageConsumer consumer = session.createConsumer(producerQueue);
+ Assert.assertNotNull(consumer.receive(5000));
+ Assert.assertNull(consumer.receiveNoWait());
+ session.commit();
+
+ Queue serverQueue = server.locateQueue(getQueueName());
+ Wait.assertEquals(0, serverQueue::getMessageCount);
+
+ message = session.createTextMessage("test");
+ message.setStringProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123");
+ producer.send(message);
+ boolean error = false;
+ try {
+ session.commit();
+ } catch (Exception e) {
+ error = true;
+ }
+ Assert.assertTrue(error);
+
+
+ }
+ }
+
+ @Test(timeout = 60000)
public void testSenderCreditReplenishment() throws Exception {
AtomicInteger counter = new AtomicInteger();
CountDownLatch initialCredit = new CountDownLatch(1);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java
index 1d9c566..f733356 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java
@@ -17,6 +17,8 @@
package org.apache.activemq.artemis.tests.integration.cluster.bridge;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -25,6 +27,7 @@ import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -43,6 +46,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.HandleStatus;
@@ -60,11 +64,26 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.util.Wait;
import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class BridgeReconnectTest extends BridgeTestBase {
+ @Parameterized.Parameters(name = "persistentCache={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ {true}, {false}
+ });
+ }
+
+ @Parameterized.Parameter(0)
+ public boolean persistCache;
+
private static final Logger log = Logger.getLogger(BridgeReconnectTest.class);
private static final int NUM_MESSAGES = 100;
@@ -412,8 +431,9 @@ public class BridgeReconnectTest extends BridgeTestBase {
}
// Fail bridge and reconnect same node, no backup specified
+ // It will keep a send blocking as if CPU was making it creep
@Test
- public void testReconnectSameNodeAfterDelivery() throws Exception {
+ public void testReconnectSameNodeAfterDeliveryWithBlocking() throws Exception {
server0 = createActiveMQServer(0, isNetty(), server0Params);
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
@@ -530,6 +550,15 @@ public class BridgeReconnectTest extends BridgeTestBase {
closeServers();
assertNoMoreConnections();
+
+ HashMap<Integer, AtomicInteger> counts = countJournal(server1.getConfiguration());
+ if (persistCache) {
+ // There should be one record per message
+ Assert.assertEquals(numMessages, counts.get(new Integer(JournalRecordIds.DUPLICATE_ID)).intValue());
+ } else {
+ // no cache means there shouldn't be an id anywhere
+ Assert.assertNull(counts.get(new Integer(JournalRecordIds.DUPLICATE_ID)));
+ }
}
// We test that we can pause more than client failure check period (to prompt the pinger to failing)
@@ -545,6 +574,7 @@ public class BridgeReconnectTest extends BridgeTestBase {
}
private void testShutdownServerCleanlyAndReconnectSameNode(final boolean sleep) throws Exception {
+ Assume.assumeTrue(persistCache);
server0 = createActiveMQServer(0, isNetty(), server0Params);
TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
@@ -842,4 +872,15 @@ public class BridgeReconnectTest extends BridgeTestBase {
throw new IllegalStateException("Failed to get forwarding connection");
}
+
+ @Override
+ protected ActiveMQServer createActiveMQServer(final int id,
+ final Map<String, Object> params,
+ final boolean netty,
+ final NodeManager nodeManager) throws Exception {
+ ActiveMQServer server = super.createActiveMQServer(id, params, netty, nodeManager);
+ server.getConfiguration().setPersistIDCache(persistCache);
+ return server;
+ }
+
}