You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by wy...@apache.org on 2019/09/04 03:06:23 UTC
[activemq-artemis] branch master updated: ARTEMIS-2380 Fix
delivering message in the case of consume close
This is an automated email from the ASF dual-hosted git repository.
wy96f pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 4a61d2d ARTEMIS-2380 Fix delivering message in the case of consume close
new 9c1cbf3 This closes #2703
4a61d2d is described below
commit 4a61d2dc769ebc76b5cb18a240931ad169c0123e
Author: Wei Yang <wy...@gmail.com>
AuthorDate: Fri Aug 30 17:38:04 2019 +0800
ARTEMIS-2380 Fix delivering message in the case of consume close
---
.../apache/activemq/artemis/core/server/Queue.java | 4 +++
.../artemis/core/server/ServerSession.java | 6 ++++
.../artemis/core/server/impl/QueueImpl.java | 23 +++++++++++++
.../artemis/core/server/impl/RefsOperation.java | 28 ++++++++++++++-
.../core/server/impl/ServerConsumerImpl.java | 13 ++++++-
.../core/server/impl/ServerSessionImpl.java | 40 ++++++++++++++++++++++
.../server/impl/ScheduledDeliveryHandlerTest.java | 10 ++++++
.../tests/integration/client/ReceiveTest.java | 39 +++++++++++++++++++++
.../integration/management/QueueControlTest.java | 37 ++++++++++++++++++++
.../tests/unit/core/postoffice/impl/FakeQueue.java | 10 ++++++
10 files changed, 208 insertions(+), 2 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 3ff1b84..adcb72e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -129,6 +129,10 @@ public interface Queue extends Bindable,CriticalComponent {
void addConsumer(Consumer consumer) throws Exception;
+ void addLingerSession(String sessionId);
+
+ void removeLingerSession(String sessionId);
+
void removeConsumer(Consumer consumer);
int getConsumerCount();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 210fe89..f940ec2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -429,6 +429,10 @@ public interface ServerSession extends SecurityAuth {
List<MessageReference> getInTXMessagesForConsumer(long consumerId);
+ List<MessageReference> getInTxLingerMessages();
+
+ void addLingerConsumer(ServerConsumer consumer);
+
String getValidatedUser();
SimpleString getMatchingQueue(SimpleString address, RoutingType routingType) throws Exception;
@@ -490,4 +494,6 @@ public interface ServerSession extends SecurityAuth {
int getProducerCount();
int getDefaultConsumerWindowSize(SimpleString address);
+
+ String toManagementString();
}
\ No newline at end of file
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 04ca5d4..a9eab4f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -80,6 +80,7 @@ import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
import org.apache.activemq.artemis.core.server.management.ManagementService;
@@ -101,6 +102,7 @@ import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.ReferenceCounter;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
@@ -321,6 +323,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
*/
private final Object directDeliveryGuard = new Object();
+ private final ConcurrentHashSet<String> lingerSessionIds = new ConcurrentHashSet<>();
+
public String debug() {
StringWriter str = new StringWriter();
PrintWriter out = new PrintWriter(str);
@@ -1261,6 +1265,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
@Override
+ public void addLingerSession(String sessionId) {
+ lingerSessionIds.add(sessionId);
+ }
+
+ @Override
+ public void removeLingerSession(String sessionId) {
+ lingerSessionIds.remove(sessionId);
+ }
+
+ @Override
public void removeConsumer(final Consumer consumer) {
enterCritical(CRITICAL_CONSUMER);
@@ -1585,6 +1599,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
mapReturn.put(holder.consumer.toManagementString(), msgs);
}
}
+
+ for (String lingerSessionId : lingerSessionIds) {
+ ServerSession serverSession = server.getSessionByID(lingerSessionId);
+ List<MessageReference> refs = serverSession == null ? null : serverSession.getInTxLingerMessages();
+ if (refs != null && !refs.isEmpty()) {
+ mapReturn.put(serverSession.toManagementString(), refs);
+ }
+ }
+
return mapReturn;
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index f0b6d34..d3cd425 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -52,6 +52,8 @@ public class RefsOperation extends TransactionOperationAbstract {
*/
protected boolean ignoreRedeliveryCheck = false;
+ private String lingerSessionId = null;
+
public RefsOperation(Queue queue, AckReason reason, StorageManager storageManager) {
this.queue = queue;
this.reason = reason;
@@ -97,6 +99,8 @@ public class RefsOperation extends TransactionOperationAbstract {
List<MessageReference> ackedRefs = new ArrayList<>();
for (MessageReference ref : refsToAck) {
+ clearLingerRef(ref);
+
ref.emptyConsumerID();
if (logger.isTraceEnabled()) {
@@ -175,8 +179,10 @@ public class RefsOperation extends TransactionOperationAbstract {
@Override
public void afterCommit(final Transaction tx) {
for (MessageReference ref : refsToAck) {
+ clearLingerRef(ref);
+
synchronized (ref.getQueue()) {
- queue.postAcknowledge(ref, reason);
+ ref.getQueue().postAcknowledge(ref, reason);
}
}
@@ -190,6 +196,12 @@ public class RefsOperation extends TransactionOperationAbstract {
}
}
+ private void clearLingerRef(MessageReference ref) {
+ if (!ref.hasConsumerId() && lingerSessionId != null) {
+ ref.getQueue().removeLingerSession(lingerSessionId);
+ }
+ }
+
private void decrementRefCount(MessageReference refmsg) {
try {
refmsg.getMessage().decrementRefCount();
@@ -228,4 +240,18 @@ public class RefsOperation extends TransactionOperationAbstract {
return refsToAck;
}
+ public synchronized List<MessageReference> getLingerMessages() {
+ List<MessageReference> list = new LinkedList<>();
+ for (MessageReference ref : refsToAck) {
+ if (!ref.hasConsumerId() && lingerSessionId != null) {
+ list.add(ref);
+ }
+ }
+
+ return list;
+ }
+
+ public void setLingerSession(String lingerSessionId) {
+ this.lingerSessionId = lingerSessionId;
+ }
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index ddba797..c668f6e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.server.impl;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
@@ -571,6 +571,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
tx.rollback();
+ addLingerRefs();
+
if (!browseOnly) {
TypedProperties props = new TypedProperties();
@@ -607,6 +609,15 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
}
+ private void addLingerRefs() throws Exception {
+ if (!browseOnly) {
+ List<MessageReference> lingerRefs = session.getInTXMessagesForConsumer(this.id);
+ if (lingerRefs != null && !lingerRefs.isEmpty()) {
+ session.addLingerConsumer(this);
+ }
+ }
+ }
+
@Override
public void removeItself() throws Exception {
if (browseOnly) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 7c26028..08f2d4e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -2114,6 +2114,41 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
@Override
+ public List<MessageReference> getInTxLingerMessages() {
+ Transaction transaction = tx;
+ if (transaction == null && callback != null) {
+ transaction = callback.getCurrentTransaction();
+ }
+ RefsOperation operation = transaction == null ? null : (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+
+ return operation == null ? null : operation.getLingerMessages();
+ }
+
+ @Override
+ public void addLingerConsumer(ServerConsumer consumer) {
+ Transaction transaction = tx;
+ if (transaction == null && callback != null) {
+ transaction = callback.getCurrentTransaction();
+ }
+ if (transaction != null) {
+ synchronized (transaction) {
+ // Transaction might be committed/rolledback, we need to synchronize and judge state
+ if (transaction.getState() != State.COMMITTED && transaction.getState() != State.ROLLEDBACK) {
+ RefsOperation operation = (RefsOperation) transaction.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
+ List<MessageReference> refs = operation == null ? null : operation.getListOnConsumer(consumer.getID());
+ if (refs != null && !refs.isEmpty()) {
+ for (MessageReference ref : refs) {
+ ref.emptyConsumerID();
+ }
+ operation.setLingerSession(name);
+ consumer.getQueue().addLingerSession(name);
+ }
+ }
+ }
+ }
+ }
+
+ @Override
public SimpleString removePrefix(SimpleString address) {
if (prefixEnabled && address != null) {
return PrefixUtil.getAddress(address, prefixes);
@@ -2183,4 +2218,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
return as.getDefaultConsumerWindowSize();
}
+
+ @Override
+ public String toManagementString() {
+ return "ServerSession [id=" + getConnectionID() + ":" + getName() + "]";
+ }
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 5646905..e266ef5 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1011,6 +1011,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
+ public void addLingerSession(String sessionId) {
+
+ }
+
+ @Override
+ public void removeLingerSession(String sessionId) {
+
+ }
+
+ @Override
public void removeConsumer(Consumer consumer) {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java
index 40b8333..cca8b10 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ReceiveTest.java
@@ -41,8 +41,12 @@ public class ReceiveTest extends ActiveMQTestBase {
SimpleString addressA;
+ SimpleString addressB;
+
SimpleString queueA;
+ SimpleString queueB;
+
private ServerLocator locator;
private ActiveMQServer server;
@@ -54,6 +58,8 @@ public class ReceiveTest extends ActiveMQTestBase {
addressA = RandomUtil.randomSimpleString();
queueA = RandomUtil.randomSimpleString();
+ addressB = RandomUtil.randomSimpleString();
+ queueB = RandomUtil.randomSimpleString();
locator = createInVMNonHALocator();
server = createServer(false);
@@ -162,4 +168,37 @@ public class ReceiveTest extends ActiveMQTestBase {
session.close();
sendSession.close();
}
+
+ @Test
+ public void testMultiConsumersOnSession() throws Exception {
+ ClientSessionFactory cf = createSessionFactory(locator.setCallTimeout(10000000));
+ ClientSession sendSession = cf.createSession(false, true, true);
+ ClientProducer cp1 = sendSession.createProducer(addressA);
+ ClientProducer cp2 = sendSession.createProducer(addressB);
+
+ ClientSession session = cf.createSession(false, true, false);
+ session.createQueue(addressA, queueA, false);
+ session.createQueue(addressB, queueB, false);
+
+ ClientConsumer cc1 = session.createConsumer(queueA);
+ ClientConsumer cc2 = session.createConsumer(queueB);
+ session.start();
+
+ cp1.send(sendSession.createMessage(false));
+ cp2.send(sendSession.createMessage(false));
+ Assert.assertNotNull(cc1.receive().acknowledge());
+ Assert.assertNotNull(cc2.receive().acknowledge());
+ session.commit();
+
+ final Queue queue1 = server.locateQueue(queueA);
+ final Queue queue2 = server.locateQueue(queueB);
+
+ Wait.assertTrue(() -> queue1.getMessageCount() == 0, 500, 100);
+ Wait.assertTrue(() -> queue1.getMessagesAcknowledged() == 1, 500, 100);
+ Wait.assertTrue(() -> queue2.getMessageCount() == 0, 500, 100);
+ Wait.assertTrue(() -> queue2.getMessagesAcknowledged() == 1, 500, 100);
+
+ session.close();
+ sendSession.close();
+ }
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 15b01b8..32e52cc 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -639,6 +639,43 @@ public class QueueControlTest extends ManagementTestBase {
}
@Test
+ public void testListDeliveringMessagesOnClosedConsumer() throws Exception {
+ SimpleString address = RandomUtil.randomSimpleString();
+ SimpleString queue = RandomUtil.randomSimpleString();
+ int intValue = RandomUtil.randomInt();
+ session.createQueue(address, RoutingType.MULTICAST, queue, null, durable);
+
+ Queue srvqueue = server.locateQueue(queue);
+
+ QueueControl queueControl = createManagementControl(address, queue);
+
+ ClientProducer producer = session.createProducer(address);
+ ClientMessage message = session.createMessage(durable);
+ message.putIntProperty(new SimpleString("key"), intValue);
+ producer.send(message);
+ producer.send(session.createMessage(durable));
+
+ ClientConsumer consumer = session.createConsumer(queue);
+ session.start();
+ ClientMessage msgRec = consumer.receive(5000);
+ assertNotNull(msgRec);
+ assertEquals(msgRec.getIntProperty("key").intValue(), intValue);
+ assertEquals(1, srvqueue.getDeliveringCount());
+ assertEquals(1, queueControl.listDeliveringMessages().size());
+
+ msgRec.acknowledge();
+ consumer.close();
+ assertEquals(1, srvqueue.getDeliveringCount());
+
+ System.out.println(queueControl.listDeliveringMessagesAsJSON());
+
+ Map<String, Map<String, Object>[]> deliveringMap = queueControl.listDeliveringMessages();
+ assertEquals(1, deliveringMap.size());
+
+ session.deleteQueue(queue);
+ }
+
+ @Test
public void testListScheduledMessages() throws Exception {
long delay = 2000;
SimpleString address = RandomUtil.randomSimpleString();
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 4cf5346..7b7890f 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -355,6 +355,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
+ public void addLingerSession(String sessionId) {
+
+ }
+
+ @Override
+ public void removeLingerSession(String sessionId) {
+
+ }
+
+ @Override
public void addRedistributor(final long delay) {
// no-op