You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/06/12 21:22:39 UTC

[activemq-artemis] branch master updated: ARTEMIS-1982 queue metrics can go negative

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff75c3  ARTEMIS-1982 queue metrics can go negative
     new e506180  This closes #2700
3ff75c3 is described below

commit 3ff75c33537be3e890b0ccb8cde00969ad7dc558
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Mon Jun 10 15:33:10 2019 +0100

    ARTEMIS-1982 queue metrics can go negative
    
    When redelivery is exhausted and messages are sent to a DLA with bindings
    then some queue metrics can go negative.
---
 .../apache/activemq/artemis/core/server/Queue.java | 21 ++++++-
 .../artemis/core/server/impl/QueueImpl.java        | 26 +++++----
 .../artemis/core/server/impl/RefsOperation.java    |  2 +-
 .../server/impl/ScheduledDeliveryHandlerTest.java  | 12 ++--
 .../integration/amqp/AmqpDLQReceiverTest.java      | 68 ++++++++++++++++++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java |  9 +--
 6 files changed, 114 insertions(+), 24 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index bf0a22d..2d7f373 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
@@ -281,7 +282,14 @@ public interface Queue extends Bindable,CriticalComponent {
 
    int sendMessagesToDeadLetterAddress(Filter filter) throws Exception;
 
-   void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception;
+   /**
+    *
+    * @param tx
+    * @param ref
+    * @return whether or not the message was actually sent to a DLA with bindings
+    * @throws Exception
+    */
+   boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception;
 
    boolean changeReferencePriority(long messageID, byte newPriority) throws Exception;
 
@@ -315,7 +323,16 @@ public interface Queue extends Bindable,CriticalComponent {
 
    int getGroupCount();
 
-   boolean checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception;
+   /**
+    *
+    * @param ref
+    * @param timeBase
+    * @param ignoreRedeliveryDelay
+    * @return a Pair of Booleans: the first indicates whether or not redelivery happened; the second indicates whether
+    *         or not the message was actually sent to a DLA with bindings
+    * @throws Exception
+    */
+   Pair<Boolean, Boolean> checkRedelivery(MessageReference ref, long timeBase, boolean ignoreRedeliveryDelay) throws Exception;
 
    /**
     * It will iterate thorugh memory only (not paging)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a772529..44b55e0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1635,13 +1635,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public synchronized void cancel(final MessageReference reference, final long timeBase) throws Exception {
-      if (checkRedelivery(reference, timeBase, false)) {
+      Pair<Boolean, Boolean> redeliveryResult = checkRedelivery(reference, timeBase, false);
+      if (redeliveryResult.getA()) {
          if (!scheduledDeliveryHandler.checkAndSchedule(reference, false)) {
             internalAddHead(reference);
          }
 
          resetAllIterators();
-      } else {
+      } else if (!redeliveryResult.getB()) {
          decDelivering(reference);
       }
    }
@@ -2815,9 +2816,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          deliverAsync();
       }
    }
-
    @Override
-   public boolean checkRedelivery(final MessageReference reference,
+   public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
                                   final long timeBase,
                                   final boolean ignoreRedeliveryDelay) throws Exception {
       Message message = reference.getMessage();
@@ -2827,7 +2827,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
          }
          // no DLQ check on internal queues
-         return true;
+         return new Pair<>(true, false);
       }
 
       if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
@@ -2845,9 +2845,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          if (logger.isTraceEnabled()) {
             logger.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
          }
-         sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());
+         boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());
 
-         return false;
+         return new Pair<>(false, dlaResult);
       } else {
          // Second check Redelivery Delay
          if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
@@ -2866,7 +2866,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
          decDelivering(reference);
 
-         return true;
+         return new Pair<>(true, false);
       }
    }
 
@@ -3114,13 +3114,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-
    @Override
-   public void sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
-      sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
+   public boolean sendToDeadLetterAddress(final Transaction tx, final MessageReference ref) throws Exception {
+      return sendToDeadLetterAddress(tx, ref, addressSettingsRepository.getMatch(address.toString()).getDeadLetterAddress());
    }
 
-   private void sendToDeadLetterAddress(final Transaction tx,
+   private boolean sendToDeadLetterAddress(final Transaction tx,
                                         final MessageReference ref,
                                         final SimpleString deadLetterAddress) throws Exception {
       if (deadLetterAddress != null) {
@@ -3132,12 +3131,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          } else {
             ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
             move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
+            return true;
          }
       } else {
          ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);
 
          ref.acknowledge(tx, AckReason.KILLED, null);
       }
+
+      return false;
    }
 
    private void move(final Transaction originalTX,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
index de52cc4..c8d9297 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java
@@ -150,7 +150,7 @@ public class RefsOperation extends TransactionOperationAbstract {
 
    protected void rollbackRedelivery(Transaction tx, MessageReference ref, long timeBase, Map<QueueImpl, LinkedList<MessageReference>> queueMap) throws Exception {
       // if ignore redelivery check, we just perform redelivery straight
-      if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck)) {
+      if (ref.getQueue().checkRedelivery(ref, timeBase, ignoreRedeliveryCheck).getA()) {
          LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
 
          if (toCancel == null) {
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 6a03c02..ad4cbb3 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -35,6 +35,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RefCountMessage;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -1292,7 +1293,8 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
+      public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
+         return false;
       }
 
       @Override
@@ -1375,10 +1377,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public boolean checkRedelivery(MessageReference ref,
-                                     long timeBase,
-                                     boolean ignoreRedeliveryDelay) throws Exception {
-         return false;
+      public Pair<Boolean, Boolean> checkRedelivery(MessageReference ref,
+                                                    long timeBase,
+                                                    boolean ignoreRedeliveryDelay) throws Exception {
+         return new Pair<>(false, false);
       }
 
       @Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDLQReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDLQReceiverTest.java
new file mode 100644
index 0000000..a95c908
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDLQReceiverTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for broker side support of the Durable Subscription mapping for JMS.
+ */
+public class AmqpDLQReceiverTest extends AmqpClientTestSupport {
+
+   @Test(timeout = 60000)
+   public void testCreateDurableReceiver() throws Exception {
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getQueueName());
+      sendMessages(getQueueName(), 1);
+      Queue queue = getProxyToQueue(getQueueName());
+      assertNotNull(queue);
+      receiver.flow(100);
+      for (int i = 0; i < 10; i++) {
+         System.out.println("i = " + i);
+         AmqpMessage receive = receiver.receive(5000, TimeUnit.MILLISECONDS);
+         receive.modified(true, false);
+         Queue queueView = getProxyToQueue(getQueueName());
+         System.out.println("receive = " + receive.getWrappedMessage().getDeliveryCount());
+         System.out.println("queueView.getMessageCount() = " + queueView.getMessageCount());
+         System.out.println("queueView.getDeliveringCount() = " + queueView.getDeliveringCount());
+         System.out.println("queueView.getPersistentSize() = " + queueView.getPersistentSize());
+      }
+
+      receiver.close();
+      connection.close();
+      Thread.sleep(5000);
+      Queue queueView = getProxyToQueue(getQueueName());
+      System.out.println("queueView.getMessageCount() = " + queueView.getMessageCount());
+      System.out.println("queueView.getDeliveringCount() = " + queueView.getDeliveringCount());
+      System.out.println("queueView.getPersistentSize() = " + queueView.getPersistentSize());
+      Assert.assertEquals(0, queueView.getMessageCount());
+   }
+
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 2944fd4..290aa15 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Executor;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.filter.Filter;
@@ -172,8 +173,8 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public void sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
-
+   public boolean sendToDeadLetterAddress(Transaction tx, MessageReference ref) throws Exception {
+      return false;
    }
 
    @Override
@@ -380,11 +381,11 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
-   public boolean checkRedelivery(final MessageReference ref,
+   public Pair<Boolean, Boolean> checkRedelivery(final MessageReference ref,
                                   final long timeBase,
                                   final boolean check) throws Exception {
       // no-op
-      return false;
+      return new Pair<>(false, false);
    }
 
    @Override