You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/07/16 13:19:15 UTC

[activemq-artemis] 02/03: ARTEMIS-3384 Adding tests around duplicate detection

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit c479cb558a85b2c162183c78d6f3121de92a99e9
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Thu Jul 15 22:51:43 2021 -0400

    ARTEMIS-3384 Adding tests around duplicate detection
---
 .../tests/integration/DuplicateDetectionTest.java  | 25 ++++++++-
 .../tests/integration/amqp/AmqpSenderTest.java     | 62 ++++++++++++++++++++++
 .../cluster/bridge/BridgeReconnectTest.java        | 43 ++++++++++++++-
 3 files changed, 128 insertions(+), 2 deletions(-)

diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
index 75bfc79..045fde9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/DuplicateDetectionTest.java
@@ -20,6 +20,9 @@ import javax.transaction.xa.XAException;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 
+import java.util.Arrays;
+import java.util.Collection;
+
 import org.apache.activemq.artemis.api.core.ActiveMQDuplicateIdException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
@@ -42,11 +45,27 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.jboss.logging.Logger;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class DuplicateDetectionTest extends ActiveMQTestBase {
 
+   @Parameterized.Parameters(name = "persistentCache={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {true}, {false}
+      });
+   }
+
+   @Parameterized.Parameter(0)
+   public boolean persistCache;
+
+
+
    private final Logger log = Logger.getLogger(this.getClass());
 
    private ActiveMQServer server;
@@ -217,6 +236,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
    // we would eventually have a higher number of caches while we couldn't have time to clear previous ones
    @Test
    public void testShrinkCache() throws Exception {
+      Assume.assumeTrue("This test would restart the server", persistCache);
       server.stop();
       server.getConfiguration().setIDCacheSize(150);
       server.start();
@@ -1454,6 +1474,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
 
    @Test
    public void testPersistTransactional() throws Exception {
+      Assume.assumeTrue("This test would restart the server", persistCache);
       ClientSession session = sf.createSession(false, false, false);
 
       session.start();
@@ -1709,6 +1730,8 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
 
    @Test
    public void testPersistXA1() throws Exception {
+      Assume.assumeTrue("This test would restart the server", persistCache);
+
       ClientSession session = addClientSession(sf.createSession(true, false, false));
 
       Xid xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
@@ -1802,7 +1825,7 @@ public class DuplicateDetectionTest extends ActiveMQTestBase {
    public void setUp() throws Exception {
       super.setUp();
 
-      config = createDefaultInVMConfig().setIDCacheSize(cacheSize);
+      config = createDefaultInVMConfig().setIDCacheSize(cacheSize).setPersistIDCache(persistCache);
 
       server = createServer(true, config);
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
index f556bdf..62b9d4d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSenderTest.java
@@ -22,6 +22,8 @@ import javax.jms.DeliveryMode;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -45,12 +47,30 @@ import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Sender;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 /**
  * Test broker behavior when creating AMQP senders
  */
+@RunWith(Parameterized.class)
 public class AmqpSenderTest extends AmqpClientTestSupport {
 
+   @Parameterized.Parameters(name = "persistentCache={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {true}, {false}
+      });
+   }
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) {
+      server.getConfiguration().setPersistIDCache(persistCache);
+   }
+
+   @Parameterized.Parameter(0)
+   public boolean persistCache;
+
    @Override
    protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
    }
@@ -253,6 +273,48 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
+   public void testDuplicateDetectionRollback() throws Exception {
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672");
+      try (Connection connection = factory.createConnection();
+           Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
+         javax.jms.Queue producerQueue = session.createQueue(getQueueName());
+
+         MessageProducer producer = session.createProducer(producerQueue);
+         javax.jms.Message message = session.createTextMessage("test");
+         message.setStringProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123");
+         producer.send(message);
+         session.rollback();
+
+         producer.send(message);
+         session.commit();
+
+         connection.start();
+
+         MessageConsumer consumer = session.createConsumer(producerQueue);
+         Assert.assertNotNull(consumer.receive(5000));
+         Assert.assertNull(consumer.receiveNoWait());
+         session.commit();
+
+         Queue serverQueue = server.locateQueue(getQueueName());
+         Wait.assertEquals(0, serverQueue::getMessageCount);
+
+         message = session.createTextMessage("test");
+         message.setStringProperty(Message.HDR_DUPLICATE_DETECTION_ID.toString(), "123");
+         producer.send(message);
+         boolean error = false;
+         try {
+            session.commit();
+         } catch (Exception e) {
+            error = true;
+         }
+         Assert.assertTrue(error);
+
+
+      }
+   }
+
+   @Test(timeout = 60000)
    public void testSenderCreditReplenishment() throws Exception {
       AtomicInteger counter = new AtomicInteger();
       CountDownLatch initialCredit = new CountDownLatch(1);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java
index 1d9c566..f733356 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.artemis.tests.integration.cluster.bridge;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -25,6 +27,7 @@ import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@@ -43,6 +46,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
 import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.HandleStatus;
@@ -60,11 +64,26 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.jboss.logging.Logger;
+import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class BridgeReconnectTest extends BridgeTestBase {
 
+   @Parameterized.Parameters(name = "persistentCache={0}")
+   public static Collection<Object[]> parameters() {
+      return Arrays.asList(new Object[][] {
+         {true}, {false}
+      });
+   }
+
+   @Parameterized.Parameter(0)
+   public boolean persistCache;
+
    private static final Logger log = Logger.getLogger(BridgeReconnectTest.class);
 
    private static final int NUM_MESSAGES = 100;
@@ -412,8 +431,9 @@ public class BridgeReconnectTest extends BridgeTestBase {
    }
 
    // Fail bridge and reconnect same node, no backup specified
+   // It will keep a send blocking as if CPU was making it creep
    @Test
-   public void testReconnectSameNodeAfterDelivery() throws Exception {
+   public void testReconnectSameNodeAfterDeliveryWithBlocking() throws Exception {
       server0 = createActiveMQServer(0, isNetty(), server0Params);
 
       TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
@@ -530,6 +550,15 @@ public class BridgeReconnectTest extends BridgeTestBase {
       closeServers();
 
       assertNoMoreConnections();
+
+      HashMap<Integer, AtomicInteger> counts = countJournal(server1.getConfiguration());
+      if (persistCache) {
+         // There should be one record per message
+         Assert.assertEquals(numMessages, counts.get(new Integer(JournalRecordIds.DUPLICATE_ID)).intValue());
+      } else {
+         // no cache means there shouldn't be an id anywhere
+         Assert.assertNull(counts.get(new Integer(JournalRecordIds.DUPLICATE_ID)));
+      }
    }
 
    // We test that we can pause more than client failure check period (to prompt the pinger to failing)
@@ -545,6 +574,7 @@ public class BridgeReconnectTest extends BridgeTestBase {
    }
 
    private void testShutdownServerCleanlyAndReconnectSameNode(final boolean sleep) throws Exception {
+      Assume.assumeTrue(persistCache);
       server0 = createActiveMQServer(0, isNetty(), server0Params);
       TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
 
@@ -842,4 +872,15 @@ public class BridgeReconnectTest extends BridgeTestBase {
       throw new IllegalStateException("Failed to get forwarding connection");
    }
 
+
+   @Override
+   protected ActiveMQServer createActiveMQServer(final int id,
+                        final Map<String, Object> params,
+                        final boolean netty,
+                        final NodeManager nodeManager) throws Exception {
+      ActiveMQServer server = super.createActiveMQServer(id, params, netty, nodeManager);
+      server.getConfiguration().setPersistIDCache(persistCache);
+      return server;
+   }
+
 }