You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/06/12 21:22:39 UTC
[activemq-artemis] branch master updated: ARTEMIS-1982 queue
metrics can go negative
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 3ff75c3 ARTEMIS-1982 queue metrics can go negative
new e506180 This closes #2700
3ff75c3 is described below
commit 3ff75c33537be3e890b0ccb8cde00969ad7dc558
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Mon Jun 10 15:33:10 2019 +0100
ARTEMIS-1982 queue metrics can go negative
When redelivery is exhausted and messages are sent to a DLA with bindings
then some queue metrics can go negative.
---
.../apache/activemq/artemis/core/server/Queue.java | 21 ++++++-
.../artemis/core/server/impl/QueueImpl.java | 26 +++++----
.../artemis/core/server/impl/RefsOperation.java | 2 +-
.../server/impl/ScheduledDeliveryHandlerTest.java | 12 ++--
.../integration/amqp/AmqpDLQReceiverTest.java | 68 ++++++++++++++++++++++
.../tests/unit/core/postoffice/impl/FakeQueue.java | 9 +--
6 files changed, 114 insertions(+), 24 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index bf0a22d..2d7f373 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
@@ -281,7 +282,14 @@ public interface Queue extends Bindable,CriticalComponent {
int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;
- void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception;
+ /**
+ *
+ * @param tx
+ * @param ref
+ * @return whether or not the message was actually sent to a DLA with bindings
+ * @throws Exception
+ */
+ boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception;
boolean changeReferencePriority(long messageID, byte newPriority) throws Exception;
@@ -315,7 +323,16 @@ public interface Queue extends Bindable,CriticalComponent {
int getGroupCount();
- boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception;
+ /**
+ *
+ * @param ref
+ * @param timeBase
+ * @param ignoreRedeliveryDelay
+ * @return a Pair of Booleans: the first indicates whether or not redelivery happened; the second indicates whether
+ * or not the message was actually sent to a DLA with bindings
+ * @throws Exception
+ */
+ Pair<Boolean, Boolean> checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception;
/**
* It will iterate thorugh memory only (not paging)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a772529..44b55e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1635,13 +1635,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
- if (checkRedelivery(reference, timeBase, false)) {
+ Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
+ if (redeliveryResult.getA()) {
if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
internalAddHead(reference);
}
resetAllIterators();
- } else {
+ } else if (!redeliveryResult.getB()) {
decDelivering(reference);
}
}
@@ -2815,9 +2816,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
deliverAsync();
}
}
-
@Override
- public boolean checkRedelivery(final MessageReference reference,
+ public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
final long timeBase,
final boolean ignoreRedeliveryDelay) throws Exception {
Message message = reference.getMessage();
@@ -2827,7 +2827,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
}
// no DLQ check on internal queues
- return true;
+ return new Pair<>(true, false);
}
if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
@@ -2845,9 +2845,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (logger.isTraceEnabled()) {
logger.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
}
- sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());
+ boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());
- return false;
+ return new Pair<>(false, dlaResult);
} else {
// Second check Redelivery Delay
if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
@@ -2866,7 +2866,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
decDelivering(reference);
- return true;
+ return new Pair<>(true, false);
}
}
@@ -3114,13 +3114,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
-
@Override
- public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
- sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
+ public boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
+ return sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
}
- private void sendToDeadLetterAddress(final Transaction tx,
+ private boolean sendToDeadLetterAddress(final Transaction tx,
final MessageReference ref,
final SimpleString deadLetterAddress) throws Exception {
if (deadLetterAddress != null) {
@@ -3132,12 +3131,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
+ return true;
}
} else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
ref.acknowledge(tx, AckReason.KILLED, null);
}
+
+ return false;
}
private void move(final Transaction originalTX,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index de52cc4..c8d9297 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -150,7 +150,7 @@ public class RefsOperation extends TransactionOperationAbstract {
protected void rollbackRedelivery(Transaction tx, MessageReference ref, long timeBase, Map<QueueImpl, LinkedList<MessageReference>> queueMap) throws Exception {
// if ignore redelivery check, we just perform redelivery straight
- if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) {
+ if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck).getA()) {
LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
if (toCancel == null) {
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 6a03c02..ad4cbb3 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RefCountMessage;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -1292,7 +1293,8 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
+ public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
+ return false;
}
@Override
@@ -1375,10 +1377,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
- public boolean checkRedelivery(MessageReference ref,
- long timeBase,
- boolean ignoreRedeliveryDelay) throws Exception {
- return false;
+ public Pair<Boolean, Boolean> checkRedelivery(MessageReference ref,
+ long timeBase,
+ boolean ignoreRedeliveryDelay) throws Exception {
+ return new Pair<>(false, false);
}
@Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDLQReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDLQReceiverTest.java
new file mode 100644
index 0000000..a95c908
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDLQReceiverTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for broker side support of the Durable Subscription mapping for JMS.
+ */
+public class AmqpDLQReceiverTest extends AmqpClientTestSupport {
+
+ @Test(timeout = 60000)
+ public void testCreateDurableReceiver() throws Exception {
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getQueueName());
+ sendMessages(getQueueName(), 1);
+ Queue queue = getProxyToQueue(getQueueName());
+ assertNotNull(queue);
+ receiver.flow(100);
+ for (int i = 0; i < 10; i++) {
+ System.out.println("i = " + i);
+ AmqpMessage receive = receiver.receive(5000, TimeUnit.MILLISECONDS);
+ receive.modified(true, false);
+ Queue queueView = getProxyToQueue(getQueueName());
+ System.out.println("receive = " + receive.getWrappedMessage().getDeliveryCount());
+ System.out.println("queueView.getMessageCount() = " + queueView.getMessageCount());
+ System.out.println("queueView.getDeliveringCount() = " + queueView.getDeliveringCount());
+ System.out.println("queueView.getPersistentSize() = " + queueView.getPersistentSize());
+ }
+
+ receiver.close();
+ connection.close();
+ Thread.sleep(5000);
+ Queue queueView = getProxyToQueue(getQueueName());
+ System.out.println("queueView.getMessageCount() = " + queueView.getMessageCount());
+ System.out.println("queueView.getDeliveringCount() = " + queueView.getDeliveringCount());
+ System.out.println("queueView.getPersistentSize() = " + queueView.getPersistentSize());
+ Assert.assertEquals(0, queueView.getMessageCount());
+ }
+
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 2944fd4..290aa15 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
@@ -172,8 +173,8 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
- public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
-
+ public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
+ return false;
}
@Override
@@ -380,11 +381,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
- public boolean checkRedelivery(final MessageReference ref,
+ public Pair<Boolean, Boolean> checkRedelivery(final MessageReference ref,
final long timeBase,
final boolean check) throws Exception {
// no-op
- return false;
+ return new Pair<>(false, false);
}
@Override