You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/06/30 21:37:34 UTC

[activemq-artemis] 01/02: ARTEMIS-3815 proper retry through IOCompletion when message not found on target queue on Mirror

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit d90179b99cc217a2611a7d439ddcc9a27d79dd12
Author: iliya <il...@gmail.com>
AuthorDate: Thu Jun 30 14:40:46 2022 -0400

    ARTEMIS-3815 proper retry through IOCompletion when message not found on target queue on Mirror
    
    co-authored Clebert Suconic
---
 .../activemq/artemis/core/io/RunnableCallback.java |  64 ++++++
 .../protocol/amqp/broker/AMQPSessionCallback.java  |   4 +
 .../connect/mirror/AMQPMirrorControllerTarget.java |  59 +++--
 .../amqp/connect/AMQPMirrorFastACKTest.java        | 239 +++++++++++++++++++++
 4 files changed, 344 insertions(+), 22 deletions(-)

diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/RunnableCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/RunnableCallback.java
new file mode 100644
index 0000000000..cb4344a626
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/RunnableCallback.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io;
+
+import org.jboss.logging.Logger;
+
+public class RunnableCallback implements IOCallback {
+   private static final Logger logger = Logger.getLogger(RunnableCallback.class);
+
+   Runnable okCallback;
+   Runnable errorCallback;
+
+   public RunnableCallback(Runnable ok, Runnable error) {
+      if (ok == null) {
+         throw new NullPointerException("ok = null");
+      }
+      if (ok == null) {
+         throw new NullPointerException("error = null");
+      }
+      okCallback = ok;
+      errorCallback = error;
+   }
+
+   public RunnableCallback(Runnable ok) {
+      if (ok == null) {
+         throw new NullPointerException("ok = null");
+      }
+      okCallback = ok;
+      errorCallback = ok;
+   }
+
+   @Override
+   public void done() {
+      try {
+         okCallback.run();
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+
+   @Override
+   public void onError(int errorCode, String errorMessage) {
+      try {
+         errorCallback.run();
+      } catch (Throwable e) {
+         logger.warn(e.getMessage(), e);
+      }
+   }
+}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index c78883afff..adb61c107d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -177,6 +177,10 @@ public class AMQPSessionCallback implements SessionCallback {
       }
    }
 
+   public OperationContext getSessionContext() {
+      return serverSession.getSessionContext();
+   }
+
    @Override
    public void browserFinished(ServerConsumer consumer) {
 
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 4be5968123..2c652e352b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -26,7 +26,9 @@ import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.RunnableCallback;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -138,7 +140,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
    }
 
    // in a regular case we should not have more than amqpCredits on the pool, that's the max we would need
-   private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new MpscPool<>(amqpCredits, ACKMessageOperation::reset, () -> new ACKMessageOperation());
+   private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new MpscPool<>(amqpCredits, ACKMessageOperation::reset, ACKMessageOperation::new);
 
    final RoutingContextImpl routingContext = new RoutingContextImpl(null);
 
@@ -151,6 +153,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
 
    private final ReferenceNodeStore referenceNodeStore;
 
+   OperationContext mirrorContext;
+
    public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
                                      AMQPConnectionContext connection,
                                      AMQPSessionContext protonSession,
@@ -161,6 +165,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
       this.basicController.setLink(receiver);
       this.server = server;
       this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
+      mirrorContext = protonSession.getSessionSPI().getSessionContext();
    }
 
    @Override
@@ -224,7 +229,6 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
                }
                deleteQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName));
             } else if (eventType.equals(POST_ACK)) {
-               String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS);
                String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, BROKER_ID);
 
                AckReason ackReason = AMQPMessageBrokerAccessor.getMessageAnnotationAckReason(message);
@@ -236,9 +240,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
                AmqpValue value = (AmqpValue) message.getBody();
                Long messageID = (Long) value.getValue();
                if (logger.isDebugEnabled()) {
-                  logger.debug(server + " Post ack address=" + address + " queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
+                  logger.debug(server + " Post ack queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
                }
-               if (postAcknowledge(address, queueName, nodeID, messageID, messageAckOperation, ackReason)) {
+               if (postAcknowledge(queueName, nodeID, messageID, messageAckOperation, ackReason)) {
                   messageAckOperation = null;
                }
             }
@@ -336,7 +340,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
       }
    }
 
-   public boolean postAcknowledge(String address, String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception {
+   public boolean postAcknowledge(String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception {
       final Queue targetQueue = server.locateQueue(queue);
 
       if (targetQueue == null) {
@@ -355,32 +359,50 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
          logger.trace("Server " + server.getIdentity() + " with queue = " + queue + " being acked for " + messageID + " coming from " + messageID + " targetQueue = " + targetQueue);
       }
 
-      performAck(nodeID, messageID, targetQueue, ackMessage, reason, true);
+      performAck(nodeID, messageID, targetQueue, ackMessage, reason, (short)0);
       return true;
    }
 
-
    public void performAckOnPage(String nodeID, long messageID, Queue targetQueue, IOCallback ackMessageOperation) {
       PageAck pageAck = new PageAck(targetQueue, nodeID, messageID, ackMessageOperation);
       targetQueue.getPageSubscription().scanAck(pageAck, pageAck, pageAck, pageAck);
    }
 
-   private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, boolean retry) {
+   private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) {
       if (logger.isTraceEnabled()) {
          logger.trace("performAck (nodeID=" + nodeID + ", messageID=" + messageID + ")" + ", targetQueue=" + targetQueue.getName());
       }
       MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
 
-      if (reference == null && retry) {
+      if (reference == null) {
          if (logger.isDebugEnabled()) {
-            logger.debug("Retrying Reference not found on messageID=" + messageID + " nodeID=" + nodeID);
+            logger.debug("Retrying Reference not found on messageID=" + messageID + " nodeID=" + nodeID + ", currentRetry=" + retry);
+         }
+         switch (retry) {
+            case 0:
+               // first retry, after IO Operations
+               sessionSPI.getSessionContext().executeOnCompletion(new RunnableCallback(() -> performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, (short) 1)));
+               return;
+            case 1:
+               // second retry after the queue is flushed the temporary adds
+               targetQueue.flushOnIntermediate(() -> {
+                  recoverContext();
+                  performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, (short)2);
+               });
+               return;
+            case 2:
+               // third retry, on paging
+               if (reason != AckReason.EXPIRED) {
+                  // if expired, we don't need to check on paging
+                  // as the message will expire again when depaged (if on paging)
+                  performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
+                  return;
+               } else {
+                  ackMessageOperation.run();
+               }
          }
-         targetQueue.flushOnIntermediate(() -> {
-            recoverContext();
-            performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, false);
-         });
-         return;
       }
+
       if (reference != null) {
          if (logger.isTraceEnabled()) {
             logger.trace("Post ack Server " + server + " worked well for messageID=" + messageID + " nodeID=" + nodeID);
@@ -398,14 +420,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
          } catch (Exception e) {
             logger.warn(e.getMessage(), e);
          }
-      } else {
-         if (reason != AckReason.EXPIRED) {
-            // if expired, we don't need to check on paging
-            // as the message will expire again when depaged (if on paging)
-            performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
-         }
       }
-
    }
 
    /**
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest.java
new file mode 100644
index 0000000000..3fda73e495
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQPMirrorFastACKTest extends AmqpClientTestSupport {
+
+   private static final String SLOW_SERVER_NAME = "slow";
+   private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+   private static final int ENCODE_DELAY = 10;
+
+   private ActiveMQServer slowServer;
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      slowServer = createSlowServer();
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception {
+      try {
+         if (slowServer != null) {
+            slowServer.stop();
+         }
+      } finally {
+         super.tearDown();
+      }
+   }
+
+   @Test
+   public void testMirrorTargetFastACK() throws Exception {
+      final int NUMBER_OF_MESSAGES = 10;
+      CountDownLatch done = new CountDownLatch(NUMBER_OF_MESSAGES);
+
+      AMQPMirrorBrokerConnectionElement replication = configureMirrorTowardsSlow(server);
+
+      slowServer.start();
+      server.start();
+
+      waitForServerToStart(slowServer);
+      waitForServerToStart(server);
+
+      server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+      try (Connection connection = factory.createConnection()) {
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+         MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
+         MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
+
+         connection.start();
+
+         consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+               try {
+                  message.acknowledge();
+                  done.countDown();
+               } catch (Exception ignore) {
+                  // Ignore
+               }
+            }
+         });
+
+         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            producer.send(session.createTextMessage("i=" + i));
+         }
+
+         Assert.assertTrue(done.await(5000, TimeUnit.MILLISECONDS));
+      }
+
+      Queue snf = server.locateQueue(replication.getMirrorSNF());
+      Queue queue = slowServer.locateQueue(getQueueName());
+
+      Wait.waitFor(() -> snf.getMessageCount() == 0 && snf.getMessagesAdded() > NUMBER_OF_MESSAGES);
+      Wait.assertTrue("Expected mirrored target queue " + getQueueName() + " to be empty", () -> queue.getMessageCount() == 0 && queue.getMessagesAdded() == NUMBER_OF_MESSAGES);
+   }
+
+   @Override
+   protected ActiveMQServer createServer() throws Exception {
+      return createServer(AMQP_PORT, false);
+   }
+
+   private AMQPMirrorBrokerConnectionElement configureMirrorTowardsSlow(ActiveMQServer source) {
+      AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:" + SLOW_SERVER_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true);
+      connection.addElement(replication);
+
+      source.getConfiguration().addAMQPConnection(connection);
+      return replication;
+   }
+
+   private ActiveMQServer createSlowServer() throws Exception {
+      ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
+      ActiveMQServer server = new ActiveMQServerImpl(createBasicConfig(SLOW_SERVER_PORT), mBeanServer, securityManager) {
+         @Override
+         protected StorageManager createStorageManager() {
+            return AMQPMirrorFastACKTest.this.createStorageManager(getConfiguration(), getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener);
+         }
+      };
+
+      server.getConfiguration().setName(SLOW_SERVER_NAME);
+      server.getConfiguration().getAcceptorConfigurations().clear();
+      server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(slowServer, SLOW_SERVER_PORT));
+
+      server.getConfiguration().setJMXManagementEnabled(true);
+      server.getConfiguration().setMessageExpiryScanPeriod(100);
+
+      configureAddressPolicy(server);
+      configureBrokerSecurity(server);
+
+      return server;
+   }
+
+   private StorageManager createStorageManager(Configuration configuration,
+                                               CriticalAnalyzer criticalAnalyzer,
+                                               ExecutorFactory executorFactory,
+                                               ScheduledExecutorService scheduledPool,
+                                               ExecutorFactory ioExecutorFactory,
+                                               IOCriticalErrorListener ioCriticalErrorListener) {
+      return new JournalStorageManager(configuration, criticalAnalyzer, executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
+         @Override
+         protected Journal createMessageJournal(Configuration config,
+                                                IOCriticalErrorListener criticalErrorListener,
+                                                int fileSize) {
+            return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()) {
+               @Override
+               public void appendAddRecordTransactional(long txID,
+                                                        long id,
+                                                        byte recordType,
+                                                        Persister persister,
+                                                        Object record) throws Exception {
+                  super.appendAddRecordTransactional(txID, id, recordType, record instanceof AMQPStandardMessage ? new SlowMessagePersister<>(persister) : persister, record);
+               }
+            };
+         }
+      };
+   }
+
+   static class SlowMessagePersister<T> implements Persister<T> {
+
+      private final Persister<T> delegate;
+
+      SlowMessagePersister(Persister<T> delegate) {
+         this.delegate = delegate;
+      }
+
+      @Override
+      public byte getID() {
+         return delegate.getID();
+      }
+
+      @Override
+      public int getEncodeSize(T record) {
+         return delegate.getEncodeSize(record);
+      }
+
+      @Override
+      public void encode(ActiveMQBuffer buffer, T record) {
+         try {
+            // This will slow down IO completion for transactional message write
+            Thread.sleep(ENCODE_DELAY);
+         } catch (Exception ignore) {
+            // ignore
+         }
+         delegate.encode(buffer, record);
+      }
+
+      @Override
+      public T decode(ActiveMQBuffer buffer, T record, CoreMessageObjectPools pool) {
+         return delegate.decode(buffer, record, pool);
+      }
+   }
+}