You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2021/06/02 15:23:55 UTC
[activemq-artemis] branch main updated: ARTEMIS-3234 - revisit fix
to deal with credit on unmatched acks,
thanks to brusdev for the interceptor feature and test from ARTEMIS-2650
This is an automated email from the ASF dual-hosted git repository.
gtully pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 815f383 ARTEMIS-3234 - revisit fix to deal with credit on unmatched acks, thanks to brusdev for the interceptor feature and test from ARTEMIS-2650
815f383 is described below
commit 815f383f9c19abacd0c56543ebc0e93e06dffbf9
Author: gtully <ga...@gmail.com>
AuthorDate: Wed Jun 2 00:09:02 2021 +0100
ARTEMIS-3234 - revisit fix to deal with credit on unmatched acks, thanks to brusdev for the interceptor feature and test from ARTEMIS-2650
---
.../core/protocol/openwire/OpenWireConnection.java | 25 ++--
.../protocol/openwire/OpenWireInterceptor.java | 27 ++++
.../protocol/openwire/OpenWireProtocolManager.java | 42 ++++--
.../openwire/OpenWireProtocolManagerFactory.java | 10 +-
.../core/protocol/openwire/amq/AMQConsumer.java | 39 +++---
.../openwire/OpenWireProtocolManagerTest.java | 2 +-
.../openwire/interop/GeneralInteropTest.java | 145 ++++++++++++++++++++-
7 files changed, 233 insertions(+), 57 deletions(-)
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 8af0731..1216a52 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -131,7 +131,6 @@ import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ConsumerState;
-import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.util.ByteSequence;
@@ -257,11 +256,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
if (state == null) {
return null;
}
- ConnectionInfo info = state.getInfo();
- if (info == null) {
- return null;
- }
- return info;
+ return state.getInfo();
}
//tells the connection that
@@ -311,6 +306,12 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
AuditLogger.setRemoteAddress(getRemoteAddress());
}
+ if (this.protocolManager.invokeIncoming(command, this) != null) {
+ logger.debugf("Interceptor rejected OpenWire command: %s", command);
+ disconnect(true);
+ return;
+ }
+
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
@@ -496,6 +497,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
public void physicalSend(Command command) throws IOException {
+ if (this.protocolManager.invokeOutgoing(command, this) != null) {
+ return;
+ }
if (logger.isTraceEnabled()) {
tracePhysicalSend(transportConnection, command);
@@ -595,10 +599,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
SessionState ss = state.getSessionState(id.getParentId());
if (ss != null) {
result.setProducerState(ss.getProducerState(id));
- ProducerState producerState = ss.getProducerState(id);
- if (producerState != null && producerState.getInfo() != null) {
- ProducerInfo info = producerState.getInfo();
- }
}
producerExchanges.put(id, result);
}
@@ -672,6 +672,9 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
}
+ if (fail) {
+ shutdown(fail);
+ }
}
@Override
@@ -808,8 +811,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
try {
physicalSend(command);
- } catch (Exception e) {
- return false;
} catch (Throwable t) {
return false;
}
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireInterceptor.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireInterceptor.java
new file mode 100644
index 0000000..7acc302
--- /dev/null
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireInterceptor.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.activemq.artemis.core.protocol.openwire;
+
+import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.command.Command;
+
+public interface OpenWireInterceptor extends BaseInterceptor<Command> {
+
+}
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 3aa5868..840a9a7 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -35,7 +35,6 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
-import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
@@ -49,8 +48,8 @@ import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.selector.impl.LRUCache;
+import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
@@ -82,7 +81,7 @@ import org.apache.activemq.util.LongSequenceGenerator;
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.SELECTOR_AWARE_OPTION;
-public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
+public class OpenWireProtocolManager extends AbstractProtocolManager<Command, OpenWireInterceptor, OpenWireConnection> implements ClusterTopologyListener {
private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
@@ -94,7 +93,7 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
private final OpenWireProtocolManagerFactory factory;
- private OpenWireFormatFactory wireFactory;
+ private final OpenWireFormatFactory wireFactory;
private boolean prefixPacketSize = true;
@@ -135,7 +134,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();
- protected class VirtualTopicConfig {
+ private final List<OpenWireInterceptor> incomingInterceptors = new ArrayList<>();
+ private final List<OpenWireInterceptor> outgoingInterceptors = new ArrayList<>();
+
+
+ protected static class VirtualTopicConfig {
public int filterPathTerminus;
public boolean selectorAware;
@@ -160,7 +163,9 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
private final Map<DestinationFilter, VirtualTopicConfig> vtConsumerDestinationMatchers = new HashMap<>();
protected final LRUCache<ActiveMQDestination, ActiveMQDestination> vtDestMapCache = new LRUCache();
- public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
+ public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server,
+ List<BaseInterceptor> incomingInterceptors,
+ List<BaseInterceptor> outgoingInterceptors) {
this.factory = factory;
this.server = server;
this.wireFactory = new OpenWireFormatFactory();
@@ -170,6 +175,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
scheduledPool = server.getScheduledPool();
this.wireFormat = (OpenWireFormat) wireFactory.createWireFormat();
+ updateInterceptors(incomingInterceptors, outgoingInterceptors);
+
final ClusterManager clusterManager = this.server.getClusterManager();
ClusterConnection cc = clusterManager.getDefaultConnection(null);
@@ -245,14 +252,29 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
}
@Override
- public ProtocolManagerFactory<Interceptor> getFactory() {
+ public ProtocolManagerFactory getFactory() {
return factory;
}
@Override
- public void updateInterceptors(List<BaseInterceptor> incomingInterceptors,
- List<BaseInterceptor> outgoingInterceptors) {
- // NO-OP
+ public void updateInterceptors(List incoming, List outgoing) {
+ this.incomingInterceptors.clear();
+ if (incoming != null) {
+ this.incomingInterceptors.addAll(getFactory().filterInterceptors(incoming));
+ }
+
+ this.outgoingInterceptors.clear();
+ if (outgoing != null) {
+ this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
+ }
+ }
+
+ public String invokeIncoming(Command command, OpenWireConnection connection) {
+ return super.invokeInterceptors(this.incomingInterceptors, command, connection);
+ }
+
+ public String invokeOutgoing(Command command, OpenWireConnection connection) {
+ return super.invokeInterceptors(this.outgoingInterceptors, command, connection);
}
@Override
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java
index d40e2ef..7368737 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManagerFactory.java
@@ -16,12 +16,10 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
-import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@@ -30,7 +28,7 @@ import org.apache.activemq.artemis.utils.uri.BeanSupport;
import org.osgi.service.component.annotations.Component;
@Component(service = ProtocolManagerFactory.class)
-public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFactory<Interceptor> {
+public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFactory<OpenWireInterceptor> {
public static final String OPENWIRE_PROTOCOL_NAME = "OPENWIRE";
@@ -44,12 +42,12 @@ public class OpenWireProtocolManagerFactory extends AbstractProtocolManagerFacto
final List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) throws Exception {
BeanSupport.stripPasswords(parameters);
- return BeanSupport.setData(new OpenWireProtocolManager(this, server), parameters);
+ return BeanSupport.setData(new OpenWireProtocolManager(this, server, incomingInterceptors, outgoingInterceptors), parameters);
}
@Override
- public List<Interceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
- return Collections.emptyList();
+ public List<OpenWireInterceptor> filterInterceptors(List<BaseInterceptor> interceptors) {
+ return internalFilterInterceptors(OpenWireInterceptor.class, interceptors);
}
@Override
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index d68fa91..c06227e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -293,40 +293,33 @@ public class AMQConsumer {
}
final int ackMessageCount = ack.getMessageCount();
- acquireCredit(ackMessageCount);
-
if (ack.isDeliveredAck()) {
+ acquireCredit(ackMessageCount);
deliveredAcksCreditExtension += ackMessageCount;
// our work is done
return;
}
- // some sort of real ack, rebalance deliveredAcksCreditExtension
- if (deliveredAcksCreditExtension > 0) {
- deliveredAcksCreditExtension -= ackMessageCount;
- if (deliveredAcksCreditExtension >= 0) {
- currentWindow.addAndGet(-ackMessageCount);
- }
- }
+ final MessageId lastID = ack.getLastMessageId();
+ final MessageId startID = ack.getFirstMessageId() == null ? lastID : ack.getFirstMessageId();
- final MessageId startID, lastID;
+ // if it's browse only, nothing to be acked
+ final boolean removeReferences = !serverConsumer.isBrowseOnly() && !serverConsumer.getQueue().isNonDestructive();
+ final List<MessageReference> ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData()));
- if (ack.getFirstMessageId() == null) {
- startID = ack.getLastMessageId();
- lastID = ack.getLastMessageId();
- } else {
- startID = ack.getFirstMessageId();
- lastID = ack.getLastMessageId();
- }
+ if (!ackList.isEmpty() || !removeReferences || serverConsumer.getQueue().isTemporary()) {
- boolean removeReferences = !serverConsumer.isBrowseOnly(); // if it's browse only, nothing to be acked, we just remove the lists
- if (serverConsumer.getQueue().isNonDestructive()) {
- removeReferences = false;
- }
+ // valid match in delivered or browsing or temp - deal with credit
+ acquireCredit(ackMessageCount);
- final List<MessageReference> ackList = serverConsumer.scanDeliveringReferences(removeReferences, reference -> startID.equals(reference.getProtocolData()), reference -> lastID.equals(reference.getProtocolData()));
+ // some sort of real ack, rebalance deliveredAcksCreditExtension
+ if (deliveredAcksCreditExtension > 0) {
+ deliveredAcksCreditExtension -= ackMessageCount;
+ if (deliveredAcksCreditExtension >= 0) {
+ currentWindow.addAndGet(-ackMessageCount);
+ }
+ }
- if (!ackList.isEmpty()) {
if (ack.isExpiredAck()) {
for (MessageReference ref : ackList) {
ref.getQueue().expire(ref, serverConsumer);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java
index e060cb3..4ee2a6c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireProtocolManagerTest.java
@@ -36,7 +36,7 @@ public class OpenWireProtocolManagerTest extends ActiveMQTestBase {
@Test
public void testVtAutoConversion() {
- underTest = new OpenWireProtocolManager(null, new DummyServer()) {
+ underTest = new OpenWireProtocolManager(null, new DummyServer(), null, null) {
@Override
public ActiveMQDestination virtualTopicConsumerToFQQN(ActiveMQDestination destination) {
if (lruCacheRef == null) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
index 28715b1..42ca410 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
@@ -27,20 +27,31 @@ import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
+import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
+import org.apache.activemq.artemis.core.protocol.openwire.OpenWireInterceptor;
import org.apache.activemq.artemis.core.server.ServerSession;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.TransportListener;
import org.junit.Before;
import org.junit.Test;
@@ -51,13 +62,10 @@ import org.junit.Test;
*/
public class GeneralInteropTest extends BasicOpenWireTest {
- private ServerLocator locator;
-
@Before
@Override
public void setUp() throws Exception {
super.setUp();
- locator = this.createInVMNonHALocator();
}
@Test
@@ -194,6 +202,103 @@ public class GeneralInteropTest extends BasicOpenWireTest {
@Test
public void testFailoverReceivingFromCore() throws Exception {
+
+ /**
+ * to get logging to stdout from failover client
+ * org.slf4j.impl.SimpleLoggerFactory simpleLoggerFactory = new SimpleLoggerFactory();
+ * ((SimpleLogger)simpleLoggerFactory.getLogger(FailoverTransport.class.getName())).setLevel(SimpleLogger.TRACE);
+ */
+
+ final String text = "HelloWorld";
+ final int prefetchSize = 10;
+
+ SimpleString dla = new SimpleString("DLA");
+ SimpleString dlq = new SimpleString("DLQ1");
+ server.createQueue(new QueueConfiguration(dlq).setAddress(dla).setDurable(false));
+ server.getAddressSettingsRepository().addMatch(queueName, new AddressSettings().setDeadLetterAddress(dla));
+
+ sendMultipleTextMessagesUsingCoreJms(queueName, text, 100);
+
+ String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT
+ + ")?randomize=false&timeout=400&reconnectDelay=500" +
+ "&useExponentialBackOff=false&initialReconnectDelay=500&nested.wireFormat.maxInactivityDuration=500" +
+ "&nested.wireFormat.maxInactivityDurationInitalDelay=500" +
+ "&nested.soTimeout=500&nested.connectionTimeout=400&jms.connectResponseTimeout=400&jms.sendTimeout=400&jms.closeTimeout=400";
+
+ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(urlString);
+ connectionFactory.setSendAcksAsync(false);
+ connectionFactory.setOptimizeAcknowledge(false);
+ connectionFactory.getPrefetchPolicy().setAll(prefetchSize);
+
+ Connection connection = connectionFactory.createConnection();
+ try {
+ connection.setClientID("test.consumer.queue." + queueName);
+ connection.start();
+
+ Message message = null;
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(queueName);
+ QueueControl queueControl = (QueueControl)server.getManagementService().
+ getResource(ResourceNames.QUEUE + queueName);
+
+ QueueControl dlqControl = (QueueControl)server.getManagementService().
+ getResource(ResourceNames.QUEUE + dlq.toString());
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+ assertEquals(text + 0, ((TextMessage)message).getText());
+ message.acknowledge();
+
+ Wait.assertEquals(1L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
+ Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
+
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+ assertEquals(text + 1, ((TextMessage)message).getText());
+
+ // client won't get a reply to the ack command, just a disconnect and will replay the ack on reconnect
+ server.getRemotingService().addIncomingInterceptor(new OpenWireInterceptor() {
+ @Override
+ public boolean intercept(Command packet, RemotingConnection connection) throws ActiveMQException {
+ if (packet.isMessageAck()) {
+ server.getRemotingService().removeIncomingInterceptor(this);
+ return false;
+ }
+ return true;
+ }
+ });
+
+ message.acknowledge();
+
+ // after a response to the replay....
+ // the message should be redelivered and pending for the replayed ack... hence it gets acked ok.
+ // the real delivery gets suppressed as a duplicate by the message audit and poison acked
+ // but there is a race between client failover reconnect and server dispatch to a new consumer
+ // if redispatch has not happened, the replayed ack is dropped and the posion ack will match and try and dlq
+ Wait.waitFor(() -> dlqControl.getMessageCount() == 1 && queueControl.getMessagesAcknowledged() == 1
+ || dlqControl.getMessageCount() == 0 && queueControl.getMessagesAcknowledged() == 2, 3000, 100);
+ Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
+
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+ assertEquals(text + 2, ((TextMessage)message).getText());
+ message.acknowledge();
+
+ Wait.waitFor(() -> dlqControl.getMessageCount() == 1 && queueControl.getMessagesAcknowledged() == 2
+ || dlqControl.getMessageCount() == 0 && queueControl.getMessagesAcknowledged() == 3, 3000, 100);
+ Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 30000, 100);
+ } finally {
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testFailoverReceivingFromCoreWithAckAfterInterrupt() throws Exception {
final int prefetchSize = 10;
final String text = "HelloWorld";
@@ -227,13 +332,43 @@ public class GeneralInteropTest extends BasicOpenWireTest {
Wait.assertEquals(1L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
- //Force a disconnection.
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ assertTrue(message instanceof TextMessage);
+ assertEquals(text + 1, ((TextMessage)message).getText());
+
+ CountDownLatch interrupted = new CountDownLatch(1);
+ ((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
+ @Override
+ public void onCommand(Object command) {
+ }
+
+ @Override
+ public void onException(IOException error) {
+ }
+
+ @Override
+ public void transportInterupted() {
+ interrupted.countDown();
+ }
+
+ @Override
+ public void transportResumed() {
+ }
+ });
+
+ //Force a disconnection that will result in duplicate ack
for (ServerSession serverSession : server.getSessions()) {
if (session.toString().contains(serverSession.getName())) {
serverSession.getRemotingConnection().fail(new ActiveMQDisconnectedException());
}
}
+ assertTrue(interrupted.await(10, TimeUnit.SECONDS));
+
+ // ack will be dropped
+ message.acknowledge();
+
Wait.assertEquals(1L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);