You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/13 20:22:56 UTC

activemq-artemis git commit: ARTEMIS-1898 - make sure tosend credits on rejected messages

Repository: activemq-artemis
Updated Branches:
  refs/heads/2.6.x c5e2ed7ac -> 84096b1a6


ARTEMIS-1898 - make sure tosend credits on rejected messages

And also to run the credit runnables once memory is free in fail mode

https://issues.apache.org/jira/browse/ARTEMIS-1898
(cherry picked from commit 82795b7bff64da8770a0c53b9eb1373b2d63fbf6)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/84096b1a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/84096b1a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/84096b1a

Branch: refs/heads/2.6.x
Commit: 84096b1a6c0ecf7e4e21a522cb3f128bdfebee85
Parents: c5e2ed7
Author: andytaylor <an...@gmail.com>
Authored: Wed Sep 12 11:16:41 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 13 16:22:48 2018 -0400

----------------------------------------------------------------------
 .../amqp/broker/AMQPSessionCallback.java        | 43 ++++++---
 .../proton/ProtonServerReceiverContext.java     |  1 +
 .../core/paging/impl/PagingStoreImpl.java       |  5 +-
 .../amqp/AmqpFlowControlFailTest.java           | 94 ++++++++++++++++++++
 4 files changed, 131 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84096b1a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 6b163ae..7fef3db 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -109,6 +109,8 @@ public class AMQPSessionCallback implements SessionCallback {
 
    private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
 
+   private CreditRunnable creditRunnable;
+
    public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
                               ProtonProtocolManager manager,
                               AMQPConnectionContext connection,
@@ -580,20 +582,37 @@ public class AMQPSessionCallback implements SessionCallback {
                                    final int threshold,
                                    final Receiver receiver) {
       try {
+         /*
+         * The credit runnable will always be run in this thread unless the address or disc is full. If this is the case the
+         * runnable is run once the memory or disc is free, if this happens we don't want to keep adding runnables as this
+         * may cause a memory leak, one is enough.
+         * */
+         if (creditRunnable != null && !creditRunnable.isRun())
+            return;
          PagingManager pagingManager = manager.getServer().getPagingManager();
-         Runnable creditRunnable = () -> {
-            connection.lock();
-            try {
-               if (receiver.getCredit() <= threshold) {
-                  int topUp = credits - receiver.getCredit();
-                  if (topUp > 0) {
-                     receiver.flow(topUp);
+         creditRunnable = new CreditRunnable() {
+            boolean isRun = false;
+            @Override
+            public boolean isRun() {
+               return isRun;
+            }
+
+            @Override
+            public void run() {
+               connection.lock();
+               try {
+                  if (receiver.getCredit() <= threshold) {
+                     int topUp = credits - receiver.getCredit();
+                     if (topUp > 0) {
+                        receiver.flow(topUp);
+                     }
                   }
+               } finally {
+                  isRun = true;
+                  connection.unlock();
                }
-            } finally {
-               connection.unlock();
+               connection.flush();
             }
-            connection.flush();
          };
 
          if (address == null) {
@@ -772,5 +791,7 @@ public class AMQPSessionCallback implements SessionCallback {
       }
 
    }
-
+   interface CreditRunnable extends Runnable {
+      boolean isRun();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84096b1a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index cdd1362..0f0e9d5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -262,6 +262,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
 
          delivery.disposition(rejected);
          delivery.settle();
+         flow(amqpCredits, minCreditRefresh);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84096b1a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 5f0d3c8..1392cfc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -658,6 +658,9 @@ public class PagingStoreImpl implements PagingStore {
 
       if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
          if (isFull()) {
+            if (runWhenAvailable != null) {
+               onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+            }
             return false;
          }
       } else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
@@ -704,7 +707,7 @@ public class PagingStoreImpl implements PagingStore {
          ActiveMQServerLogger.LOGGER.negativeAddressSize(newSize, address.toString());
       }
 
-      if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
+      if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
          if (usingGlobalMaxSize && !globalFull || maxSize != -1) {
             checkReleaseMemory(globalFull, newSize);
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84096b1a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
new file mode 100644
index 0000000..2f65dfb
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class AmqpFlowControlFailTest extends JMSClientTestSupport {
+
+   @Override
+   protected void configureAddressPolicy(ActiveMQServer server) {
+      // For BLOCK tests
+      AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
+      addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+      addressSettings.setMaxSizeBytes(1000);
+     // addressSettings.setMaxSizeBytesRejectThreshold(MAX_SIZE_BYTES_REJECT_THRESHOLD);
+      server.getAddressSettingsRepository().addMatch("#", addressSettings);
+   }
+
+   @Test(timeout = 60000)
+   public void testMesagesNotSent() throws Exception {
+      AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
+      AmqpConnection connection = addConnection(client.connect());
+      int messagesSent = 0;
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(getQueueName());
+         boolean rejected = false;
+         for (int i = 0; i < 1000; i++) {
+            final AmqpMessage message = new AmqpMessage();
+            byte[] payload = new byte[10];
+            message.setBytes(payload);
+            try {
+               sender.send(message);
+               messagesSent++;
+               System.out.println("message = " + message);
+            } catch (IOException e) {
+               rejected = true;
+            }
+         }
+         assertTrue(rejected);
+         rejected = false;
+         assertEquals(0, sender.getSender().getCredit());
+         AmqpSession session2 = connection.createSession();
+         AmqpReceiver receiver = session2.createReceiver(getQueueName());
+         receiver.flow(messagesSent);
+         for (int i = 0; i < messagesSent; i++) {
+            AmqpMessage receive = receiver.receive();
+            receive.accept();
+         }
+         receiver.close();
+         session2.close();
+         assertEquals(1000, sender.getSender().getCredit());
+         for (int i = 0; i < 1000; i++) {
+            final AmqpMessage message = new AmqpMessage();
+            byte[] payload = new byte[100];
+            message.setBytes(payload);
+            try {
+               sender.send(message);
+            } catch (IOException e) {
+               rejected = true;
+            }
+         }
+         assertTrue(rejected);
+         assertEquals(0, sender.getSender().getCredit());
+      } finally {
+         connection.close();
+      }
+   }
+}