You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/06/30 21:37:34 UTC
[activemq-artemis] 01/02: ARTEMIS-3815 proper retry through IOCompletion when message not found on target queue on Mirror
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit d90179b99cc217a2611a7d439ddcc9a27d79dd12
Author: iliya <il...@gmail.com>
AuthorDate: Thu Jun 30 14:40:46 2022 -0400
ARTEMIS-3815 proper retry through IOCompletion when message not found on target queue on Mirror
co-authored Clebert Suconic
---
.../activemq/artemis/core/io/RunnableCallback.java | 64 ++++++
.../protocol/amqp/broker/AMQPSessionCallback.java | 4 +
.../connect/mirror/AMQPMirrorControllerTarget.java | 59 +++--
.../amqp/connect/AMQPMirrorFastACKTest.java | 239 +++++++++++++++++++++
4 files changed, 344 insertions(+), 22 deletions(-)
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/RunnableCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/RunnableCallback.java
new file mode 100644
index 0000000000..cb4344a626
--- /dev/null
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/io/RunnableCallback.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.io;
+
+import org.jboss.logging.Logger;
+
+public class RunnableCallback implements IOCallback {
+ private static final Logger logger = Logger.getLogger(RunnableCallback.class);
+
+ Runnable okCallback;
+ Runnable errorCallback;
+
+ public RunnableCallback(Runnable ok, Runnable error) {
+ if (ok == null) {
+ throw new NullPointerException("ok = null");
+ }
+ if (ok == null) {
+ throw new NullPointerException("error = null");
+ }
+ okCallback = ok;
+ errorCallback = error;
+ }
+
+ public RunnableCallback(Runnable ok) {
+ if (ok == null) {
+ throw new NullPointerException("ok = null");
+ }
+ okCallback = ok;
+ errorCallback = ok;
+ }
+
+ @Override
+ public void done() {
+ try {
+ okCallback.run();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void onError(int errorCode, String errorMessage) {
+ try {
+ errorCallback.run();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+}
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index c78883afff..adb61c107d 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -177,6 +177,10 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
+ public OperationContext getSessionContext() {
+ return serverSession.getSessionContext();
+ }
+
@Override
public void browserFinished(ServerConsumer consumer) {
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 4be5968123..2c652e352b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -26,7 +26,9 @@ import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.io.RunnableCallback;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -138,7 +140,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
// in a regular case we should not have more than amqpCredits on the pool, that's the max we would need
- private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new MpscPool<>(amqpCredits, ACKMessageOperation::reset, () -> new ACKMessageOperation());
+ private final MpscPool<ACKMessageOperation> ackMessageMpscPool = new MpscPool<>(amqpCredits, ACKMessageOperation::reset, ACKMessageOperation::new);
final RoutingContextImpl routingContext = new RoutingContextImpl(null);
@@ -151,6 +153,8 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
private final ReferenceNodeStore referenceNodeStore;
+ OperationContext mirrorContext;
+
public AMQPMirrorControllerTarget(AMQPSessionCallback sessionSPI,
AMQPConnectionContext connection,
AMQPSessionContext protonSession,
@@ -161,6 +165,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
this.basicController.setLink(receiver);
this.server = server;
this.referenceNodeStore = sessionSPI.getProtocolManager().getReferenceIDSupplier();
+ mirrorContext = protonSession.getSessionSPI().getSessionContext();
}
@Override
@@ -224,7 +229,6 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
deleteQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName));
} else if (eventType.equals(POST_ACK)) {
- String address = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, ADDRESS);
String nodeID = (String) AMQPMessageBrokerAccessor.getMessageAnnotationProperty(message, BROKER_ID);
AckReason ackReason = AMQPMessageBrokerAccessor.getMessageAnnotationAckReason(message);
@@ -236,9 +240,9 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
AmqpValue value = (AmqpValue) message.getBody();
Long messageID = (Long) value.getValue();
if (logger.isDebugEnabled()) {
- logger.debug(server + " Post ack address=" + address + " queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
+ logger.debug(server + " Post ack queueName = " + queueName + " messageID=" + messageID + ", nodeID=" + nodeID);
}
- if (postAcknowledge(address, queueName, nodeID, messageID, messageAckOperation, ackReason)) {
+ if (postAcknowledge(queueName, nodeID, messageID, messageAckOperation, ackReason)) {
messageAckOperation = null;
}
}
@@ -336,7 +340,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
}
}
- public boolean postAcknowledge(String address, String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception {
+ public boolean postAcknowledge(String queue, String nodeID, long messageID, ACKMessageOperation ackMessage, AckReason reason) throws Exception {
final Queue targetQueue = server.locateQueue(queue);
if (targetQueue == null) {
@@ -355,32 +359,50 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
logger.trace("Server " + server.getIdentity() + " with queue = " + queue + " being acked for " + messageID + " coming from " + messageID + " targetQueue = " + targetQueue);
}
- performAck(nodeID, messageID, targetQueue, ackMessage, reason, true);
+ performAck(nodeID, messageID, targetQueue, ackMessage, reason, (short)0);
return true;
}
-
public void performAckOnPage(String nodeID, long messageID, Queue targetQueue, IOCallback ackMessageOperation) {
PageAck pageAck = new PageAck(targetQueue, nodeID, messageID, ackMessageOperation);
targetQueue.getPageSubscription().scanAck(pageAck, pageAck, pageAck, pageAck);
}
- private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, boolean retry) {
+ private void performAck(String nodeID, long messageID, Queue targetQueue, ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) {
if (logger.isTraceEnabled()) {
logger.trace("performAck (nodeID=" + nodeID + ", messageID=" + messageID + ")" + ", targetQueue=" + targetQueue.getName());
}
MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, messageID, referenceNodeStore);
- if (reference == null && retry) {
+ if (reference == null) {
if (logger.isDebugEnabled()) {
- logger.debug("Retrying Reference not found on messageID=" + messageID + " nodeID=" + nodeID);
+ logger.debug("Retrying Reference not found on messageID=" + messageID + " nodeID=" + nodeID + ", currentRetry=" + retry);
+ }
+ switch (retry) {
+ case 0:
+ // first retry, after IO Operations
+ sessionSPI.getSessionContext().executeOnCompletion(new RunnableCallback(() -> performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, (short) 1)));
+ return;
+ case 1:
+ // second retry after the queue is flushed the temporary adds
+ targetQueue.flushOnIntermediate(() -> {
+ recoverContext();
+ performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, (short)2);
+ });
+ return;
+ case 2:
+ // third retry, on paging
+ if (reason != AckReason.EXPIRED) {
+ // if expired, we don't need to check on paging
+ // as the message will expire again when depaged (if on paging)
+ performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
+ return;
+ } else {
+ ackMessageOperation.run();
+ }
}
- targetQueue.flushOnIntermediate(() -> {
- recoverContext();
- performAck(nodeID, messageID, targetQueue, ackMessageOperation, reason, false);
- });
- return;
}
+
if (reference != null) {
if (logger.isTraceEnabled()) {
logger.trace("Post ack Server " + server + " worked well for messageID=" + messageID + " nodeID=" + nodeID);
@@ -398,14 +420,7 @@ public class AMQPMirrorControllerTarget extends ProtonAbstractReceiver implement
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
- } else {
- if (reason != AckReason.EXPIRED) {
- // if expired, we don't need to check on paging
- // as the message will expire again when depaged (if on paging)
- performAckOnPage(nodeID, messageID, targetQueue, ackMessageOperation);
- }
}
-
}
/**
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest.java
new file mode 100644
index 0000000000..3fda73e495
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPMirrorFastACKTest.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.impl.SecurityConfiguration;
+import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
+import org.apache.activemq.artemis.core.journal.Journal;
+import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
+import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
+import org.apache.activemq.artemis.core.persistence.Persister;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageManager;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.protocol.amqp.broker.AMQPStandardMessage;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.spi.core.security.jaas.InVMLoginModule;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.ExecutorFactory;
+import org.apache.activemq.artemis.utils.Wait;
+import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQPMirrorFastACKTest extends AmqpClientTestSupport {
+
+ private static final String SLOW_SERVER_NAME = "slow";
+ private static final int SLOW_SERVER_PORT = AMQP_PORT + 1;
+
+ private static final int ENCODE_DELAY = 10;
+
+ private ActiveMQServer slowServer;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ slowServer = createSlowServer();
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ if (slowServer != null) {
+ slowServer.stop();
+ }
+ } finally {
+ super.tearDown();
+ }
+ }
+
+ @Test
+ public void testMirrorTargetFastACK() throws Exception {
+ final int NUMBER_OF_MESSAGES = 10;
+ CountDownLatch done = new CountDownLatch(NUMBER_OF_MESSAGES);
+
+ AMQPMirrorBrokerConnectionElement replication = configureMirrorTowardsSlow(server);
+
+ slowServer.start();
+ server.start();
+
+ waitForServerToStart(slowServer);
+ waitForServerToStart(server);
+
+ server.addAddressInfo(new AddressInfo(getQueueName()).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+ server.createQueue(new QueueConfiguration(getQueueName()).setRoutingType(RoutingType.ANYCAST).setAddress(getQueueName()).setAutoCreated(false));
+
+ ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+
+ try (Connection connection = factory.createConnection()) {
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
+ MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
+
+ connection.start();
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ message.acknowledge();
+ done.countDown();
+ } catch (Exception ignore) {
+ // Ignore
+ }
+ }
+ });
+
+ producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+ producer.send(session.createTextMessage("i=" + i));
+ }
+
+ Assert.assertTrue(done.await(5000, TimeUnit.MILLISECONDS));
+ }
+
+ Queue snf = server.locateQueue(replication.getMirrorSNF());
+ Queue queue = slowServer.locateQueue(getQueueName());
+
+ Wait.waitFor(() -> snf.getMessageCount() == 0 && snf.getMessagesAdded() > NUMBER_OF_MESSAGES);
+ Wait.assertTrue("Expected mirrored target queue " + getQueueName() + " to be empty", () -> queue.getMessageCount() == 0 && queue.getMessagesAdded() == NUMBER_OF_MESSAGES);
+ }
+
+ @Override
+ protected ActiveMQServer createServer() throws Exception {
+ return createServer(AMQP_PORT, false);
+ }
+
+ private AMQPMirrorBrokerConnectionElement configureMirrorTowardsSlow(ActiveMQServer source) {
+ AMQPBrokerConnectConfiguration connection = new AMQPBrokerConnectConfiguration("mirror", "tcp://localhost:" + SLOW_SERVER_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+ AMQPMirrorBrokerConnectionElement replication = new AMQPMirrorBrokerConnectionElement().setDurable(true);
+ connection.addElement(replication);
+
+ source.getConfiguration().addAMQPConnection(connection);
+ return replication;
+ }
+
+ private ActiveMQServer createSlowServer() throws Exception {
+ ActiveMQSecurityManager securityManager = new ActiveMQJAASSecurityManager(InVMLoginModule.class.getName(), new SecurityConfiguration());
+ ActiveMQServer server = new ActiveMQServerImpl(createBasicConfig(SLOW_SERVER_PORT), mBeanServer, securityManager) {
+ @Override
+ protected StorageManager createStorageManager() {
+ return AMQPMirrorFastACKTest.this.createStorageManager(getConfiguration(), getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener);
+ }
+ };
+
+ server.getConfiguration().setName(SLOW_SERVER_NAME);
+ server.getConfiguration().getAcceptorConfigurations().clear();
+ server.getConfiguration().getAcceptorConfigurations().add(addAcceptorConfiguration(slowServer, SLOW_SERVER_PORT));
+
+ server.getConfiguration().setJMXManagementEnabled(true);
+ server.getConfiguration().setMessageExpiryScanPeriod(100);
+
+ configureAddressPolicy(server);
+ configureBrokerSecurity(server);
+
+ return server;
+ }
+
+ private StorageManager createStorageManager(Configuration configuration,
+ CriticalAnalyzer criticalAnalyzer,
+ ExecutorFactory executorFactory,
+ ScheduledExecutorService scheduledPool,
+ ExecutorFactory ioExecutorFactory,
+ IOCriticalErrorListener ioCriticalErrorListener) {
+ return new JournalStorageManager(configuration, criticalAnalyzer, executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
+ @Override
+ protected Journal createMessageJournal(Configuration config,
+ IOCriticalErrorListener criticalErrorListener,
+ int fileSize) {
+ return new JournalImpl(ioExecutorFactory, fileSize, config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), config.getJournalFileOpenTimeout(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0, criticalErrorListener, config.getJournalMaxAtticFiles()) {
+ @Override
+ public void appendAddRecordTransactional(long txID,
+ long id,
+ byte recordType,
+ Persister persister,
+ Object record) throws Exception {
+ super.appendAddRecordTransactional(txID, id, recordType, record instanceof AMQPStandardMessage ? new SlowMessagePersister<>(persister) : persister, record);
+ }
+ };
+ }
+ };
+ }
+
+ static class SlowMessagePersister<T> implements Persister<T> {
+
+ private final Persister<T> delegate;
+
+ SlowMessagePersister(Persister<T> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public byte getID() {
+ return delegate.getID();
+ }
+
+ @Override
+ public int getEncodeSize(T record) {
+ return delegate.getEncodeSize(record);
+ }
+
+ @Override
+ public void encode(ActiveMQBuffer buffer, T record) {
+ try {
+ // This will slow down IO completion for transactional message write
+ Thread.sleep(ENCODE_DELAY);
+ } catch (Exception ignore) {
+ // ignore
+ }
+ delegate.encode(buffer, record);
+ }
+
+ @Override
+ public T decode(ActiveMQBuffer buffer, T record, CoreMessageObjectPools pool) {
+ return delegate.decode(buffer, record, pool);
+ }
+ }
+}