You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2021/07/15 18:20:58 UTC

[activemq-artemis] branch main updated: ARTEMIS-3384 Fix bridge duplicate messages after reconnection

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 2d07d0d  ARTEMIS-3384 Fix bridge duplicate messages after reconnection
     new 595af7f  This closes #3652
2d07d0d is described below

commit 2d07d0d84419698c0bc6e5c87a228e6b3ac9f26f
Author: Domenico Francesco Bruscino <br...@apache.org>
AuthorDate: Thu Jul 15 10:58:28 2021 +0200

    ARTEMIS-3384 Fix bridge duplicate messages after reconnection
---
 .../postoffice/impl/InMemoryDuplicateIDCache.java  |  19 +--
 .../impl/PersistentDuplicateIDCache.java           |  19 +--
 .../cluster/bridge/BridgeReconnectTest.java        | 133 +++++++++++++++++++++
 3 files changed, 155 insertions(+), 16 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
index f1f8614..0105186 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/InMemoryDuplicateIDCache.java
@@ -75,13 +75,15 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
 
    @Override
    public void deleteFromCache(byte[] duplicateID) {
+      deleteFromCache(new ByteArray(duplicateID));
+   }
+
+   private void deleteFromCache(final ByteArray duplicateID) {
       if (LOGGER.isTraceEnabled()) {
-         LOGGER.tracef("deleting id = %s", describeID(duplicateID));
+         LOGGER.tracef("deleting id = %s", describeID(duplicateID.bytes));
       }
 
-      ByteArray bah = new ByteArray(duplicateID);
-
-      Integer posUsed = cache.remove(bah);
+      Integer posUsed = cache.remove(duplicateID);
 
       if (posUsed != null) {
          ByteArray id;
@@ -90,10 +92,10 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
             final int index = posUsed.intValue();
             id = ids.get(index);
 
-            if (id.equals(bah)) {
+            if (id.equals(duplicateID)) {
                ids.set(index, null);
                if (LOGGER.isTraceEnabled()) {
-                  LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID));
+                  LOGGER.tracef("address = %s deleting id=", address, describeID(duplicateID.bytes));
                }
             }
          }
@@ -158,6 +160,7 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
          }
 
          if (instantAdd) {
+            addToCacheInMemory(holder);
             tx.addOperation(new AddDuplicateIDOperation(holder, false));
          } else {
             // For a tx, it's important that the entry is not added to the cache until commit
@@ -262,9 +265,9 @@ final class InMemoryDuplicateIDCache implements DuplicateIDCache {
       }
 
       @Override
-      public void beforeCommit(Transaction tx) throws Exception {
+      public void beforeRollback(Transaction tx) throws Exception {
          if (!afterCommit) {
-            process();
+            deleteFromCache(id);
          }
       }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
index 3e3758d..cfdb972 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PersistentDuplicateIDCache.java
@@ -143,24 +143,26 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache {
 
    @Override
    public void deleteFromCache(byte[] duplicateID) throws Exception {
+      deleteFromCache(new ByteArray(duplicateID));
+   }
+
+   private void deleteFromCache(final ByteArray duplicateID) throws Exception {
       if (LOGGER.isTraceEnabled()) {
-         LOGGER.tracef("deleting id = %s", describeID(duplicateID));
+         LOGGER.tracef("deleting id = %s", describeID(duplicateID.bytes));
       }
 
-      final ByteArray bah = new ByteArray(duplicateID);
-
-      final Integer posUsed = cache.remove(bah);
+      final Integer posUsed = cache.remove(duplicateID);
 
       if (posUsed != null) {
          synchronized (this) {
             final ObjLongPair<ByteArray> id = ids.get(posUsed.intValue());
 
-            if (id.getA().equals(bah)) {
+            if (id.getA().equals(duplicateID)) {
                final long recordID = id.getB();
                id.setA(null);
                id.setB(NIL);
                if (LOGGER.isTraceEnabled()) {
-                  LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID, id.getB()));
+                  LOGGER.tracef("address = %s deleting id = %s", address, describeID(duplicateID.bytes, id.getB()));
                }
                storageManager.deleteDuplicateID(recordID);
             }
@@ -240,6 +242,7 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache {
          }
 
          if (instantAdd) {
+            addToCacheInMemory(holder, recordID);
             tx.addOperation(new AddDuplicateIDOperation(holder, recordID, false));
          } else {
             // For a tx, it's important that the entry is not added to the cache until commit
@@ -379,9 +382,9 @@ final class PersistentDuplicateIDCache implements DuplicateIDCache {
       }
 
       @Override
-      public void beforeCommit(Transaction tx) throws Exception {
+      public void beforeRollback(Transaction tx) throws Exception {
          if (!afterCommit) {
-            process();
+            deleteFromCache(holder);
          }
       }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java
index 49680f2f3..1d9c566 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeReconnectTest.java
@@ -17,13 +17,19 @@
 package org.apache.activemq.artemis.tests.integration.cluster.bridge;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -39,12 +45,18 @@ import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.HandleStatus;
+import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.cluster.Bridge;
 import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
 import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin;
+import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.jboss.logging.Logger;
@@ -399,6 +411,127 @@ public class BridgeReconnectTest extends BridgeTestBase {
       assertNoMoreConnections();
    }
 
+   // Fail bridge and reconnect same node, no backup specified
+   @Test
+   public void testReconnectSameNodeAfterDelivery() throws Exception {
+      server0 = createActiveMQServer(0, isNetty(), server0Params);
+
+      TransportConfiguration server0tc = new TransportConfiguration(getConnector(), server0Params, "server0tc");
+
+      server0.getConfiguration().setConnectorConfigurations(connectors);
+      server1.getConfiguration().setConnectorConfigurations(connectors);
+
+      BridgeConfiguration bridgeConfiguration = createBridgeConfig();
+
+      List<BridgeConfiguration> bridgeConfigs = new ArrayList<>();
+      bridgeConfigs.add(bridgeConfiguration);
+      server0.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+      QueueConfiguration queueConfig0 = new QueueConfiguration(queueName).setAddress(testAddress);
+      List<QueueConfiguration> queueConfigs0 = new ArrayList<>();
+      queueConfigs0.add(queueConfig0);
+      server0.getConfiguration().setQueueConfigs(queueConfigs0);
+
+      QueueConfiguration queueConfig1 = new QueueConfiguration(queueName).setAddress(forwardAddress);
+      List<QueueConfiguration> queueConfigs1 = new ArrayList<>();
+      queueConfigs1.add(queueConfig1);
+      server1.getConfiguration().setQueueConfigs(queueConfigs1);
+
+      startServers();
+
+      locator = addServerLocator(ActiveMQClient.createServerLocatorWithHA(server0tc, server1tc));
+      ClientSessionFactory csf0 = locator.createSessionFactory(server0tc);
+      session0 = csf0.createSession(false, true, true);
+
+      ClientSessionFactory csf1 = locator.createSessionFactory(server1tc);
+      session1 = csf1.createSession(false, true, true);
+
+      ClientProducer prod0 = session0.createProducer(testAddress);
+
+      ClientConsumer cons1 = session1.createConsumer(queueName);
+
+      session1.start();
+
+      final ManagementService managementService = server0.getManagementService();
+      QueueControl coreQueueControl = (QueueControl) managementService.getResource(ResourceNames.QUEUE + queueName);
+      assertEquals(0, coreQueueControl.getDeliveringCount());
+
+      final int numMessages = NUM_MESSAGES;
+
+      SimpleString propKey = new SimpleString("propkey");
+
+      CyclicBarrier routingBarrier = new CyclicBarrier(2);
+      CountDownLatch deliveryBeforeFailureLatch = new CountDownLatch(numMessages);
+      CountDownLatch deliveryAfterFailureLatch = new CountDownLatch(2 * numMessages);
+      List<Message> sendingMessages = Collections.synchronizedList(new ArrayList<>());
+      Map<Integer, ClientMessage> clientMessages = new ConcurrentHashMap<>();
+
+      server0.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() {
+         @Override
+         public void afterDeliverBridge(Bridge bridge, MessageReference ref, HandleStatus status) throws ActiveMQException {
+            ActiveMQServerPlugin.super.afterDeliverBridge(bridge, ref, status);
+
+            deliveryBeforeFailureLatch.countDown();
+            deliveryAfterFailureLatch.countDown();
+         }
+      });
+
+
+      server1.getConfiguration().registerBrokerPlugin(new ActiveMQServerPlugin() {
+         @Override
+         public void beforeSend(ServerSession session, Transaction tx, Message message, boolean direct, boolean noAutoCreateQueue) throws ActiveMQException {
+            sendingMessages.add(message);
+            try {
+               // Simulate CPU load until bridge delivery after failure
+               deliveryAfterFailureLatch.await();
+            } catch (InterruptedException e) {
+               log.debug(e);
+            }
+         }
+
+         @Override
+         public void beforeMessageRoute(Message message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws ActiveMQException {
+            if (sendingMessages.contains(message)) {
+               try {
+                  // Force duplicateID atomicVerify of messages delivered again by the bridge after failure
+                  // before routing messages delivered by bridge before failure
+                  routingBarrier.await();
+               } catch (InterruptedException e) {
+                  log.debug(e);
+               } catch (BrokenBarrierException e) {
+                  log.debug(e);
+               }
+            }
+         }
+      });
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage message = session0.createMessage(false);
+         message.putIntProperty(propKey, i);
+
+         prod0.send(message);
+      }
+
+      deliveryBeforeFailureLatch.await();
+
+      assertEquals(numMessages, coreQueueControl.getDeliveringCount());
+
+      // Now we will simulate a failure of the bridge connection between server0 and server1
+      Bridge bridge = server0.getClusterManager().getBridges().get(bridgeName);
+      assertNotNull(bridge);
+      RemotingConnection forwardingConnection = getForwardingConnection(bridge);
+      forwardingConnection.fail(new ActiveMQNotConnectedException());
+
+      for (int i = 0; i < numMessages; i++) {
+         ClientMessage r1 = cons1.receive(1500);
+         assertNotNull(r1);
+         assertNull(clientMessages.putIfAbsent(r1.getIntProperty(propKey), r1));
+      }
+      closeServers();
+
+      assertNoMoreConnections();
+   }
+
    // We test that we can pause more than client failure check period (to prompt the pinger to failing)
    // before reconnecting
    @Test