You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/07/27 20:26:32 UTC
[activemq-artemis] 01/03: Revert "ARTEMIS-2423 Improving
Consumer/Queue Delivery lock"
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit 8a1f267bd567b8b5cfa7a6bc5ba1ae192c170a1d
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Sat Jul 27 10:45:08 2019 -0400
Revert "ARTEMIS-2423 Improving Consumer/Queue Delivery lock"
This reverts commit 7507a9fd4b282523c2b2f3517ed788153a35df4c.
---
.../protocol/amqp/broker/AMQPSessionCallback.java | 10 +-
.../core/protocol/mqtt/MQTTPublishManager.java | 24 +-
.../core/protocol/mqtt/MQTTSessionCallback.java | 14 +-
.../core/protocol/openwire/amq/AMQSession.java | 14 +-
.../artemis/core/protocol/stomp/StompSession.java | 3 +-
.../impl/ManagementRemotingConnection.java | 10 +-
.../protocol/core/impl/CoreSessionCallback.java | 10 +-
.../activemq/artemis/core/server/Consumer.java | 32 +--
.../core/server/cluster/impl/BridgeImpl.java | 2 +-
.../core/server/cluster/impl/Redistributor.java | 2 +-
.../artemis/core/server/impl/QueueImpl.java | 110 ++++----
.../core/server/impl/ServerConsumerImpl.java | 277 ++++++++++++---------
.../artemis/spi/core/protocol/SessionCallback.java | 10 +-
.../tests/integration/cli/DummyServerConsumer.java | 2 +-
.../tests/integration/client/HangConsumerTest.java | 10 +-
.../tests/integration/jms/client/GroupingTest.java | 1 +
.../tests/unit/core/server/impl/QueueImplTest.java | 17 +-
.../unit/core/server/impl/fakes/FakeConsumer.java | 2 +-
18 files changed, 282 insertions(+), 268 deletions(-)
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 616da5b..f850cc1 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -186,6 +186,11 @@ public class AMQPSessionCallback implements SessionCallback {
(String) null, this, true, operationContext, manager.getPrefixes());
}
+ @Override
+ public void afterDelivery() throws Exception {
+
+ }
+
public void start() {
}
@@ -600,11 +605,6 @@ public class AMQPSessionCallback implements SessionCallback {
}
@Override
- public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
-
- }
-
- @Override
public int sendLargeMessage(MessageReference ref,
Message message,
ServerConsumer consumer,
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index bb38539..abcfe3f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -59,12 +59,6 @@ public class MQTTPublishManager {
private MQTTSessionState.OutboundStore outboundStore;
- /** this is the last qos that happened during delivery.
- * since afterDelivery will happen in the same thread, no other threads should be calling delivery and afterDelivery
- * so it is safe to keep this value here.
- */
- private Integer currentQos;
-
public MQTTPublishManager(MQTTSession session) {
this.session = session;
}
@@ -114,6 +108,7 @@ public class MQTTPublishManager {
boolean isManagementConsumer(ServerConsumer consumer) {
return consumer == managementConsumer;
}
+
/**
* Since MQTT Subscriptions can over lap; a client may receive the same message twice. When this happens the client
* returns a PubRec or PubAck with ID. But we need to know which consumer to ack, since we only have the ID to go on we
@@ -124,35 +119,20 @@ public class MQTTPublishManager {
protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
// This is to allow retries of PubRel.
if (isManagementConsumer(consumer)) {
- currentQos = null;
sendPubRelMessage(message);
} else {
int qos = decideQoS(message, consumer);
- currentQos = qos;
if (qos == 0) {
sendServerMessage((int) message.getMessageID(), message, deliveryCount, qos);
+ session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
} else if (qos == 1 || qos == 2) {
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
sendServerMessage(mqttid, message, deliveryCount, qos);
} else {
- // this will happen during afterDeliver
- }
- }
- }
-
- protected void confirmMessage(ICoreMessage message, ServerConsumer consumer, int deliveryCount) throws Exception {
- if (currentQos != null) {
- int qos = currentQos.intValue();
- if (qos == 0) {
- session.getServerSession().individualAcknowledge(consumer.getID(), message.getMessageID());
- } else if (qos == 1 || qos == 2) {
- // everything happened in delivery
- } else {
// Client must have disconnected and it's Subscription QoS cleared
consumer.individualCancel(message.getMessageID(), false);
}
-
}
}
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index 168b7fa..50d5732 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -60,15 +60,6 @@ public class MQTTSessionCallback implements SessionCallback {
}
@Override
- public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumer, int deliveryCount) {
- try {
- session.getMqttPublishManager().confirmMessage(message.toCore(), consumer, deliveryCount);
- } catch (Exception e) {
- log.warn("Unable to send message: " + message.getMessageID() + " Cause: " + e.getMessage(), e);
- }
- }
-
- @Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false;
}
@@ -100,6 +91,11 @@ public class MQTTSessionCallback implements SessionCallback {
}
@Override
+ public void afterDelivery() throws Exception {
+
+ }
+
+ @Override
public void browserFinished(ServerConsumer consumer) {
}
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index e4ecd48..b780563 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -270,6 +270,12 @@ public class AMQSession implements SessionCallback {
}
+ // rename actualDest to destination
+ @Override
+ public void afterDelivery() throws Exception {
+
+ }
+
@Override
public void browserFinished(ServerConsumer consumer) {
AMQConsumer theConsumer = (AMQConsumer) consumer.getProtocolData();
@@ -307,14 +313,6 @@ public class AMQSession implements SessionCallback {
}
@Override
- public void afterDeliver(MessageReference ref,
- org.apache.activemq.artemis.api.core.Message message,
- ServerConsumer consumerID,
- int deliveryCount) {
-
- }
-
- @Override
public int sendLargeMessage(MessageReference reference,
org.apache.activemq.artemis.api.core.Message message,
ServerConsumer consumerID,
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 03f9b44..80bbbe8 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -110,9 +110,8 @@ public class StompSession implements SessionCallback {
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
}
-
@Override
- public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception {
+ public void afterDelivery() throws Exception {
PendingTask task;
while ((task = afterDeliveryTasks.poll()) != null) {
task.run();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
index 35eab8a..7e760c1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ManagementRemotingConnection.java
@@ -212,6 +212,11 @@ public class ManagementRemotingConnection implements RemotingConnection {
}
@Override
+ public void afterDelivery() throws Exception {
+
+ }
+
+ @Override
public boolean updateDeliveryCountAfterCancel(ServerConsumer consumer, MessageReference ref, boolean failed) {
return false;
}
@@ -232,11 +237,6 @@ public class ManagementRemotingConnection implements RemotingConnection {
}
@Override
- public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
-
- }
-
- @Override
public int sendLargeMessage(MessageReference reference,
Message message,
ServerConsumer consumerID,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
index fce0dd5..f53d028 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreSessionCallback.java
@@ -133,11 +133,6 @@ public final class CoreSessionCallback implements SessionCallback {
}
@Override
- public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
- // no op
- }
-
- @Override
public void sendProducerCreditsMessage(int credits, SimpleString address) {
Packet packet = new SessionProducerCreditsMessage(credits, address);
@@ -150,6 +145,11 @@ public final class CoreSessionCallback implements SessionCallback {
}
@Override
+ public void afterDelivery() throws Exception {
+
+ }
+
+ @Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
Packet packet = new SessionProducerCreditsFailMessage(credits, address);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
index 269c74f..babddc2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java
@@ -25,10 +25,6 @@ import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
public interface Consumer extends PriorityAware {
- interface GroupHandler {
- MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup);
- }
-
/**
*
* @see SessionCallback#supportsDirectDelivery()
@@ -38,7 +34,13 @@ public interface Consumer extends PriorityAware {
}
/**
-
+ * There was a change on semantic during 2.3 here.<br>
+ * We now first accept the message, and the actual deliver is done as part of
+ * {@link #proceedDeliver(MessageReference)}. This is to avoid holding a lock on the queues while
+ * the delivery is being accomplished To avoid a lock on the queue in case of misbehaving
+ * consumers.
+ * <p>
+ * This should return busy if handle is called before proceed deliver is called
*
* @param reference
* @return
@@ -46,29 +48,19 @@ public interface Consumer extends PriorityAware {
*/
HandleStatus handle(MessageReference reference) throws Exception;
- /**
- * This will return {@link HandleStatus#BUSY} if busy, {@link HandleStatus#NO_MATCH} if no match, or the MessageReference is handled
- * This should return busy if handle is called before proceed deliver is called
- * @param groupHandler
- * @param reference
- * @return
- * @throws Exception
- */
- default Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
- return handle(reference);
- }
-
/** wakes up internal threads to deliver more messages */
default void promptDelivery() {
}
/**
- * This will called after delivery
- * Giving protocols a chance to complete their deliveries doing things such as individualACK outside of main locks
+ * This will proceed with the actual delivery.
+ * Notice that handle should hold a readLock and proceedDelivery should release the readLock
+ * any lock operation on Consumer should also get a writeLock on the readWriteLock
+ * to guarantee there are no pending deliveries
*
* @throws Exception
*/
- void afterDeliver(MessageReference reference) throws Exception;
+ void proceedDeliver(MessageReference reference) throws Exception;
Filter getFilter();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
index 4c73261..7d5bbe8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java
@@ -689,7 +689,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
// FailureListener implementation --------------------------------
@Override
- public void afterDeliver(MessageReference ref) {
+ public void proceedDeliver(MessageReference ref) {
// no op
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
index 44a5e0b..7982018 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/Redistributor.java
@@ -193,7 +193,7 @@ public class Redistributor implements Consumer {
}
@Override
- public void afterDeliver(MessageReference ref) {
+ public void proceedDeliver(MessageReference ref) {
// no op
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 292d15f..998ef8c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -98,6 +98,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.BooleanUtil;
import org.apache.activemq.artemis.utils.Env;
import org.apache.activemq.artemis.utils.ReferenceCounter;
+import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
@@ -113,7 +114,7 @@ import org.jboss.logging.Logger;
* <p>
* Completely non blocking between adding to queue and delivering to consumers.
*/
-public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.GroupHandler {
+public class QueueImpl extends CriticalComponentImpl implements Queue {
protected static final int CRITICAL_PATHS = 5;
protected static final int CRITICAL_PATH_ADD_TAIL = 0;
@@ -267,6 +268,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
private final ExpiryScanner expiryScanner = new ExpiryScanner();
+ private final ReusableLatch deliveriesInTransit = new ReusableLatch(0);
+
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
@@ -952,7 +955,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
// directDeliver flag to be re-computed resulting in direct delivery if the queue is empty
// We don't recompute it on every delivery since executing isEmpty is expensive for a ConcurrentQueue
- if (getExecutor().isFlushed() &&
+ if (deliveriesInTransit.getCount() == 0 && getExecutor().isFlushed() &&
intermediateMessageReferences.isEmpty() && messageReferences.isEmpty() &&
pageIterator != null && !pageIterator.hasNext() &&
pageSubscription != null && !pageSubscription.isPaging()) {
@@ -971,7 +974,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
}
}
- if (direct && supportsDirectDeliver && directDeliver && deliverDirect(ref)) {
+ if (direct && supportsDirectDeliver && directDeliver && deliveriesInTransit.getCount() == 0 && deliverDirect(ref)) {
return;
}
@@ -1002,6 +1005,23 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
return false;
}
+ /**
+ * This will wait for any pending deliveries to finish
+ */
+ private boolean flushDeliveriesInTransit() {
+ try {
+ if (deliveriesInTransit.await(DELIVERY_TIMEOUT)) {
+ return true;
+ } else {
+ ActiveMQServerLogger.LOGGER.timeoutFlushInTransit(getName().toString(), getAddress().toString());
+ return false;
+ }
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.unableToFlushDeliveries(e);
+ return false;
+ }
+ }
+
@Override
public void forceDelivery() {
if (pageSubscription != null && pageSubscription.isPaging()) {
@@ -2346,6 +2366,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
@Override
public synchronized void pause(boolean persist) {
try {
+ this.flushDeliveriesInTransit();
if (persist && isDurable()) {
if (pauseStatusRecord >= 0) {
storageManager.deleteQueueStatus(pauseStatusRecord);
@@ -2586,16 +2607,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
consumer = groupConsumer;
}
- Object handleValue = handle(ref, consumer, groupConsumer == null);
-
- HandleStatus status;
-
- if (handleValue instanceof MessageReference) {
- ref = (MessageReference) handleValue;
- status = HandleStatus.HANDLED;
- } else {
- status = (HandleStatus) handleValue;
- }
+ HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
@@ -2603,6 +2615,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
// this is to avoid breaks on the loop when checking for any other factors.
noDelivery = 0;
+ if (redistributor == null) {
+ ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
+ }
+
+ deliveriesInTransit.countUp();
+
+
removeMessageReference(holder, ref);
handledconsumer = consumer;
handled++;
@@ -2634,10 +2653,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
// Round robin'd all
if (noDelivery == this.consumers.size()) {
- if (logger.isDebugEnabled()) {
- logger.debug(this + "::All the consumers were busy, giving up now");
+ if (handledconsumer != null) {
+ // this shouldn't really happen,
+ // however I'm keeping this as an assertion case future developers ever change the logic here on this class
+ ActiveMQServerLogger.LOGGER.nonDeliveryHandled();
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug(this + "::All the consumers were busy, giving up now");
+ }
+ break;
}
- break;
}
noDelivery = 0;
@@ -2645,7 +2670,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
}
if (handledconsumer != null) {
- afterDeliver(handledconsumer, ref);
+ proceedDeliver(handledconsumer, ref);
}
}
@@ -3173,7 +3198,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
}
}
- private boolean deliver(MessageReference ref) {
+ private boolean deliver(final MessageReference ref) {
synchronized (this) {
if (!supportsDirectDeliver) {
return false;
@@ -3200,24 +3225,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
consumer = groupConsumer;
}
- Object handleValue = handle(ref, consumer, groupConsumer == null);
-
-
- HandleStatus status;
-
- final MessageReference reference;
- if (handleValue instanceof MessageReference) {
- reference = (MessageReference) handleValue;
- status = HandleStatus.HANDLED;
- } else {
- reference = ref;
- status = (HandleStatus) handleValue;
- }
-
+ HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
+ final MessageReference reference;
+ if (redistributor == null) {
+ reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);
+ } else {
+ reference = ref;
+ }
+
messagesAdded.incrementAndGet();
+ deliveriesInTransit.countUp();
+ proceedDeliver(consumer, reference);
consumers.reset();
return true;
}
@@ -3248,16 +3269,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
return groupConsumer;
}
- /** This is {@link Consumer.GroupHandler#handleMessageGroup(MessageReference, Consumer, boolean)} */
- @Override
- public MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, boolean newGroup) {
- if (redistributor != null) {
- // no grouping work on this case
- return ref;
- }
- SimpleString groupID = extractGroupID(ref);
+ private MessageReference handleMessageGroup(MessageReference ref, Consumer consumer, Consumer groupConsumer, SimpleString groupID) {
if (exclusive) {
- if (newGroup) {
+ if (groupConsumer == null) {
exclusiveConsumer = consumer;
if (groupFirstKey != null) {
return new GroupFirstMessageReference(groupFirstKey, ref);
@@ -3268,7 +3282,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
if (extractGroupSequence(ref) == -1) {
groups.remove(groupID);
consumers.repeat();
- } else if (newGroup) {
+ } else if (groupConsumer == null) {
groups.put(groupID, consumer);
if (groupFirstKey != null) {
return new GroupFirstMessageReference(groupFirstKey, ref);
@@ -3280,11 +3294,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
return ref;
}
- private void afterDeliver(Consumer consumer, MessageReference reference) {
+ private void proceedDeliver(Consumer consumer, MessageReference reference) {
try {
- consumer.afterDeliver(reference);
+ consumer.proceedDeliver(reference);
} catch (Throwable t) {
errorProcessing(consumer, t, reference);
+ } finally {
+ deliveriesInTransit.countDown();
}
}
@@ -3329,10 +3345,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue, Consumer.
}
}
- private synchronized Object handle(final MessageReference reference, final Consumer consumer, boolean newGroup) {
- Object status;
+ private synchronized HandleStatus handle(final MessageReference reference, final Consumer consumer) {
+ HandleStatus status;
try {
- status = consumer.handleWithGroup(this, newGroup, reference);
+ status = consumer.handle(reference);
} catch (Throwable t) {
ActiveMQServerLogger.LOGGER.removingBadConsumer(t, consumer, reference);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index dde8d87..c709d4e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -24,8 +24,11 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@@ -104,6 +107,13 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private SlowConsumerDetectionListener slowConsumerListener;
+ /**
+ * We get a readLock when a message is handled, and return the readLock when the message is finally delivered
+ * When stopping the consumer we need to get a writeLock to make sure we had all delivery finished
+ * otherwise a rollback may get message sneaking in
+ */
+ private final ReadWriteLock lockDelivery = new ReentrantReadWriteLock();
+
private volatile AtomicInteger availableCredits = new AtomicInteger(0);
private boolean started;
@@ -382,20 +392,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
messageQueue.errorProcessing(this, e, deliveryObject);
}
- /** This is in case someone is using direct old API */
- @Override
- public HandleStatus handle(MessageReference ref) throws Exception {
- Object refReturn = handleWithGroup(null, false, ref);
-
- if (refReturn instanceof MessageReference) {
- return HandleStatus.HANDLED;
- } else {
- return (HandleStatus) refReturn;
- }
-
- }
@Override
- public Object handleWithGroup(GroupHandler handler, boolean newGroup, final MessageReference ref) throws Exception {
+ public HandleStatus handle(final MessageReference ref) throws Exception {
// available credits can be set back to null with a flow control option.
AtomicInteger checkInteger = availableCredits;
if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {
@@ -483,46 +481,42 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
- MessageReference deliveryReference = ref;
-
- if (handler != null) {
- deliveryReference = handler.handleMessageGroup(ref, this, newGroup);
- }
-
- proceedDeliver(deliveryReference);
+ lockDelivery.readLock().lock();
return HandleStatus.HANDLED;
}
}
- private void proceedDeliver(MessageReference reference) throws Exception {
- Message message = reference.getMessage();
+ @Override
+ public void proceedDeliver(MessageReference reference) throws Exception {
+ try {
+ Message message = reference.getMessage();
- if (server.hasBrokerMessagePlugins()) {
- server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
- }
+ if (server.hasBrokerMessagePlugins()) {
+ server.callBrokerMessagePlugins(plugin -> plugin.beforeDeliver(this, reference));
+ }
- if (message.isLargeMessage() && supportLargeMessage) {
- if (largeMessageDeliverer == null) {
- // This can't really happen as handle had already crated the deliverer
- // instead of throwing an exception in weird cases there is no problem on just go ahead and create it
- // again here
- largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
+ if (message.isLargeMessage() && supportLargeMessage) {
+ if (largeMessageDeliverer == null) {
+ // This can't really happen as handle had already crated the deliverer
+ // instead of throwing an exception in weird cases there is no problem on just go ahead and create it
+ // again here
+ largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, reference);
+ }
+ // The deliverer was prepared during handle, as we can't have more than one pending large message
+ // as it would return busy if there is anything pending
+ largeMessageDeliverer.deliver();
+ } else {
+ deliverStandardMessage(reference, message);
+ }
+ } finally {
+ lockDelivery.readLock().unlock();
+ callback.afterDelivery();
+ if (server.hasBrokerMessagePlugins()) {
+ server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
}
- // The deliverer was prepared during handle, as we can't have more than one pending large message
- // as it would return busy if there is anything pending
- largeMessageDeliverer.deliver();
- } else {
- deliverStandardMessage(reference, message);
}
- }
- @Override
- public void afterDeliver(MessageReference reference) throws Exception {
- callback.afterDeliver(reference, reference.getMessage(), ServerConsumerImpl.this, reference.getDeliveryCount());
- if (server.hasBrokerMessagePlugins()) {
- server.callBrokerMessagePlugins(plugin -> plugin.afterDeliver(this, reference));
- }
}
@Override
@@ -632,7 +626,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
* there are no other messages to be delivered.
*/
@Override
- public void forceDelivery(final long sequence) {
+ public void forceDelivery(final long sequence) {
forceDelivery(sequence, () -> {
Message forcedDeliveryMessage = new CoreMessage(storageManager.generateID(), 50);
MessageReference reference = MessageReference.Factory.createReference(forcedDeliveryMessage, messageQueue);
@@ -736,7 +730,19 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
@Override
public void setStarted(final boolean started) {
synchronized (lock) {
- this.started = browseOnly || started;
+ boolean locked = lockDelivery();
+
+ // This is to make sure nothing would sneak to the client while started = false
+ // the client will stop the session and perform a rollback in certain cases.
+ // in case something sneaks to the client you could get to messaging delivering forever until
+ // you restart the server
+ try {
+ this.started = browseOnly || started;
+ } finally {
+ if (locked) {
+ lockDelivery.writeLock().unlock();
+ }
+ }
}
// Outside the lock
@@ -745,10 +751,35 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
}
+ private boolean lockDelivery() {
+ try {
+ if (!lockDelivery.writeLock().tryLock(30, TimeUnit.SECONDS)) {
+ ActiveMQServerLogger.LOGGER.timeoutLockingConsumer();
+ if (server != null) {
+ server.threadDump();
+ }
+ return false;
+ }
+ return true;
+ } catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.failedToFinishDelivery(e);
+ return false;
+ }
+ }
+
@Override
public void setTransferring(final boolean transferring) {
synchronized (lock) {
- this.transferring = transferring;
+ // This is to make sure that the delivery process has finished any pending delivery
+ // otherwise a message may sneak in on the client while we are trying to stop the consumer
+ boolean locked = lockDelivery();
+ try {
+ this.transferring = transferring;
+ } finally {
+ if (locked) {
+ lockDelivery.writeLock().unlock();
+ }
+ }
}
// Outside the lock
@@ -1244,111 +1275,125 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
public boolean deliver() throws Exception {
- if (!started) {
- return false;
- }
+ lockDelivery.readLock().lock();
+ try {
+ if (!started) {
+ return false;
+ }
- LargeServerMessage currentLargeMessage = largeMessage;
- if (currentLargeMessage == null) {
- return true;
- }
+ LargeServerMessage currentLargeMessage = largeMessage;
+ if (currentLargeMessage == null) {
+ return true;
+ }
- if (availableCredits != null && availableCredits.get() <= 0) {
- if (logger.isTraceEnabled()) {
- logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" + availableCredits);
+ if (availableCredits != null && availableCredits.get() <= 0) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::FlowControl::delivery largeMessage interrupting as there are no more credits, available=" +
+ availableCredits);
+ }
+ releaseHeapBodyBuffer();
+ return false;
}
- releaseHeapBodyBuffer();
- return false;
- }
- if (!sentInitialPacket) {
- context = currentLargeMessage.getBodyEncoder();
+ if (!sentInitialPacket) {
+ context = currentLargeMessage.getBodyEncoder();
- sizePendingLargeMessage = context.getLargeBodySize();
+ sizePendingLargeMessage = context.getLargeBodySize();
- context.open();
+ context.open();
- sentInitialPacket = true;
+ sentInitialPacket = true;
- int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
+ int packetSize = callback.sendLargeMessage(ref, currentLargeMessage, ServerConsumerImpl.this, context.getLargeBodySize(), ref.getDeliveryCount());
- if (availableCredits != null) {
- final int credits = availableCredits.addAndGet(-packetSize);
+ if (availableCredits != null) {
+ final int credits = availableCredits.addAndGet(-packetSize);
- if (credits <= 0) {
- releaseHeapBodyBuffer();
- }
+ if (credits <= 0) {
+ releaseHeapBodyBuffer();
+ }
- if (logger.isTraceEnabled()) {
- logger.trace(this + "::FlowControl::" + " deliver initialpackage with " + packetSize + " delivered, available now = " + availableCredits);
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::FlowControl::" +
+ " deliver initialpackage with " +
+ packetSize +
+ " delivered, available now = " +
+ availableCredits);
+ }
}
- }
- // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
- // for too long
+ // Execute the rest of the large message on a different thread so as not to tie up the delivery thread
+ // for too long
- resumeLargeMessage();
+ resumeLargeMessage();
- return false;
- } else {
- if (availableCredits != null && availableCredits.get() <= 0) {
- if (logger.isTraceEnabled()) {
- logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" + availableCredits);
- }
- releaseHeapBodyBuffer();
return false;
- }
+ } else {
+ if (availableCredits != null && availableCredits.get() <= 0) {
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::FlowControl::deliverLargeMessage Leaving loop of send LargeMessage because of credits, available=" +
+ availableCredits);
+ }
+ releaseHeapBodyBuffer();
+ return false;
+ }
- final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
+ final int localChunkLen = (int) Math.min(sizePendingLargeMessage - positionPendingLargeMessage, minLargeMessageSize);
- final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
+ final ByteBuffer bodyBuffer = acquireHeapBodyBuffer(localChunkLen);
- assert bodyBuffer.remaining() == localChunkLen;
+ assert bodyBuffer.remaining() == localChunkLen;
- final int readBytes = context.encode(bodyBuffer);
+ final int readBytes = context.encode(bodyBuffer);
- assert readBytes == localChunkLen;
+ assert readBytes == localChunkLen;
- final byte[] body = bodyBuffer.array();
+ final byte[] body = bodyBuffer.array();
- assert body.length == readBytes;
+ assert body.length == readBytes;
- //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
- //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
- //resendCache != null && packet.isRequiresConfirmations()
+ //It is possible to recycle the same heap body buffer because it won't be cached by sendLargeMessageContinuation
+ //given that requiresResponse is false: ChannelImpl::send will use the resend cache only if
+ //resendCache != null && packet.isRequiresConfirmations()
- int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
+ int packetSize = callback.sendLargeMessageContinuation(ServerConsumerImpl.this, body, positionPendingLargeMessage + localChunkLen < sizePendingLargeMessage, false);
- int chunkLen = body.length;
+ int chunkLen = body.length;
- if (availableCredits != null) {
- final int credits = availableCredits.addAndGet(-packetSize);
+ if (availableCredits != null) {
+ final int credits = availableCredits.addAndGet(-packetSize);
- if (credits <= 0) {
- releaseHeapBodyBuffer();
- }
+ if (credits <= 0) {
+ releaseHeapBodyBuffer();
+ }
- if (logger.isTraceEnabled()) {
- logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" + packetSize + " available now=" + availableCredits);
+ if (logger.isTraceEnabled()) {
+ logger.trace(this + "::FlowControl::largeMessage deliver continuation, packetSize=" +
+ packetSize +
+ " available now=" +
+ availableCredits);
+ }
}
- }
- positionPendingLargeMessage += chunkLen;
+ positionPendingLargeMessage += chunkLen;
- if (positionPendingLargeMessage < sizePendingLargeMessage) {
- resumeLargeMessage();
+ if (positionPendingLargeMessage < sizePendingLargeMessage) {
+ resumeLargeMessage();
- return false;
+ return false;
+ }
}
- }
- if (logger.isTraceEnabled()) {
- logger.trace("Finished deliverLargeMessage");
- }
+ if (logger.isTraceEnabled()) {
+ logger.trace("Finished deliverLargeMessage");
+ }
- finish();
+ finish();
- return true;
+ return true;
+ } finally {
+ lockDelivery.readLock().unlock();
+ }
}
public void finish() throws Exception {
@@ -1408,7 +1453,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
if (status == HandleStatus.HANDLED) {
- afterDeliver(current);
+ proceedDeliver(current);
}
current = null;
@@ -1436,7 +1481,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
if (status == HandleStatus.HANDLED) {
- afterDeliver(ref);
+ proceedDeliver(ref);
} else if (status == HandleStatus.BUSY) {
// keep a reference on the current message reference
// to handle it next time the browser deliverer is executed
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
index 18ef253..5577522 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/SessionCallback.java
@@ -41,10 +41,11 @@ public interface SessionCallback {
*/
boolean hasCredits(ServerConsumer consumerID);
- // Certain protocols (MQTT) will need to confirm messages doing things such as individualACKS
- // and these need to be done outside of the main lock.
- // otherwise we could dead-lock during delivery
- void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) throws Exception;
+ /**
+ * This can be used to complete certain operations outside of the lock,
+ * like acks or other operations.
+ */
+ void afterDelivery() throws Exception;
/**
* Use this to updates specifics on the message after a redelivery happened.
@@ -68,7 +69,6 @@ public interface SessionCallback {
// Future developments may change this, but beware why I have chosen to keep the parameter separated here
int sendMessage(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount);
-
int sendLargeMessage(MessageReference reference,
Message message,
ServerConsumer consumerID,
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
index 9bc0ea2..58bf2d3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cli/DummyServerConsumer.java
@@ -190,7 +190,7 @@ public class DummyServerConsumer implements ServerConsumer {
}
@Override
- public void afterDeliver(MessageReference reference) throws Exception {
+ public void proceedDeliver(MessageReference reference) throws Exception {
}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 0006752..3e64ac5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -528,6 +528,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
@Override
+ public void afterDelivery() throws Exception {
+
+ }
+
+ @Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
targetCallback.sendProducerCreditsFailMessage(credits, address);
}
@@ -553,11 +558,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
}
- @Override
- public void afterDeliver(MessageReference ref, Message message, ServerConsumer consumerID, int deliveryCount) {
-
- }
-
/* (non-Javadoc)
* @see SessionCallback#sendLargeMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, long, int)
*/
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
index fc32c44..ba8cd95 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
@@ -271,6 +271,7 @@ public class GroupingTest extends JMSTestBase {
assertEquals(tm.getStringProperty("JMSXGroupID"), jmsxgroupID);
}
+ Thread.sleep(2000);
//session.rollback();
//session.close();
//consume all msgs from 2nd first consumer
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 076e164..f20cce3 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -1312,21 +1312,12 @@ public class QueueImplTest extends ActiveMQTestBase {
@Override
public synchronized HandleStatus handle(MessageReference reference) {
- return HandleStatus.HANDLED;
- }
-
- @Override
- public Object handleWithGroup(GroupHandler groupHandler, boolean newGroup, MessageReference reference) throws Exception {
if (count == 0) {
//the first message is handled and will be used to determine this consumer
//to be the group consumer
count++;
firstMessageHandled.countDown();
- if (groupHandler != null) {
- return groupHandler.handleMessageGroup(reference, this, newGroup);
- } else {
- return HandleStatus.HANDLED;
- }
+ return HandleStatus.HANDLED;
} else if (count <= 2) {
//the next two attempts to send the second message will be done
//attempting a direct delivery and an async one after that
@@ -1338,11 +1329,7 @@ public class QueueImplTest extends ActiveMQTestBase {
//the second message should have stop the delivery loop:
//it will succeed just to let the message being handled and
//reduce the message count to 0
- if (groupHandler != null) {
- return groupHandler.handleMessageGroup(reference, this, newGroup);
- } else {
- return HandleStatus.HANDLED;
- }
+ return HandleStatus.HANDLED;
}
}
};
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
index 47b042b..2a5a330 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakeConsumer.java
@@ -138,7 +138,7 @@ public class FakeConsumer implements Consumer {
}
@Override
- public void afterDeliver(MessageReference ref) throws Exception {
+ public void proceedDeliver(MessageReference ref) throws Exception {
// no op
}