You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/09/13 20:22:38 UTC
[1/2] activemq-artemis git commit: ARTEMIS-1898 - make sure tosend
credits on rejected messages
Repository: activemq-artemis
Updated Branches:
refs/heads/master 51f4bf0f9 -> 86c4d1b85
ARTEMIS-1898 - make sure tosend credits on rejected messages
And also to run the credit runnables once memory is free in fail mode
https://issues.apache.org/jira/browse/ARTEMIS-1898
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/82795b7b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/82795b7b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/82795b7b
Branch: refs/heads/master
Commit: 82795b7bff64da8770a0c53b9eb1373b2d63fbf6
Parents: 51f4bf0
Author: andytaylor <an...@gmail.com>
Authored: Wed Sep 12 11:16:41 2018 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 13 16:22:28 2018 -0400
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 43 ++++++---
.../proton/ProtonServerReceiverContext.java | 1 +
.../core/paging/impl/PagingStoreImpl.java | 5 +-
.../amqp/AmqpFlowControlFailTest.java | 94 ++++++++++++++++++++
4 files changed, 131 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/82795b7b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 6b163ae..7fef3db 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -109,6 +109,8 @@ public class AMQPSessionCallback implements SessionCallback {
private final AddressQueryCache<AddressQueryResult> addressQueryCache = new AddressQueryCache<>();
+ private CreditRunnable creditRunnable;
+
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
@@ -580,20 +582,37 @@ public class AMQPSessionCallback implements SessionCallback {
final int threshold,
final Receiver receiver) {
try {
+ /*
+ * The credit runnable will always be run in this thread unless the address or disc is full. If this is the case the
+ * runnable is run once the memory or disc is free, if this happens we don't want to keep adding runnables as this
+ * may cause a memory leak, one is enough.
+ * */
+ if (creditRunnable != null && !creditRunnable.isRun())
+ return;
PagingManager pagingManager = manager.getServer().getPagingManager();
- Runnable creditRunnable = () -> {
- connection.lock();
- try {
- if (receiver.getCredit() <= threshold) {
- int topUp = credits - receiver.getCredit();
- if (topUp > 0) {
- receiver.flow(topUp);
+ creditRunnable = new CreditRunnable() {
+ boolean isRun = false;
+ @Override
+ public boolean isRun() {
+ return isRun;
+ }
+
+ @Override
+ public void run() {
+ connection.lock();
+ try {
+ if (receiver.getCredit() <= threshold) {
+ int topUp = credits - receiver.getCredit();
+ if (topUp > 0) {
+ receiver.flow(topUp);
+ }
}
+ } finally {
+ isRun = true;
+ connection.unlock();
}
- } finally {
- connection.unlock();
+ connection.flush();
}
- connection.flush();
};
if (address == null) {
@@ -772,5 +791,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
}
-
+ interface CreditRunnable extends Runnable {
+ boolean isRun();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/82795b7b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
index cdd1362..0f0e9d5 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java
@@ -262,6 +262,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements
delivery.disposition(rejected);
delivery.settle();
+ flow(amqpCredits, minCreditRefresh);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/82795b7b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
index 5f0d3c8..1392cfc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java
@@ -658,6 +658,9 @@ public class PagingStoreImpl implements PagingStore {
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
if (isFull()) {
+ if (runWhenAvailable != null) {
+ onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
+ }
return false;
}
} else if (pagingManager.isDiskFull() || addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && (maxSize != -1 || usingGlobalMaxSize)) {
@@ -704,7 +707,7 @@ public class PagingStoreImpl implements PagingStore {
ActiveMQServerLogger.LOGGER.negativeAddressSize(newSize, address.toString());
}
- if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK) {
+ if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK || addressFullMessagePolicy == AddressFullMessagePolicy.FAIL) {
if (usingGlobalMaxSize && !globalFull || maxSize != -1) {
checkReleaseMemory(globalFull, newSize);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/82795b7b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
new file mode 100644
index 0000000..2f65dfb
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFlowControlFailTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class AmqpFlowControlFailTest extends JMSClientTestSupport {
+
+ @Override
+ protected void configureAddressPolicy(ActiveMQServer server) {
+ // For BLOCK tests
+ AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
+ addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
+ addressSettings.setMaxSizeBytes(1000);
+ // addressSettings.setMaxSizeBytesRejectThreshold(MAX_SIZE_BYTES_REJECT_THRESHOLD);
+ server.getAddressSettingsRepository().addMatch("#", addressSettings);
+ }
+
+ @Test(timeout = 60000)
+ public void testMesagesNotSent() throws Exception {
+ AmqpClient client = createAmqpClient(getBrokerAmqpConnectionURI());
+ AmqpConnection connection = addConnection(client.connect());
+ int messagesSent = 0;
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(getQueueName());
+ boolean rejected = false;
+ for (int i = 0; i < 1000; i++) {
+ final AmqpMessage message = new AmqpMessage();
+ byte[] payload = new byte[10];
+ message.setBytes(payload);
+ try {
+ sender.send(message);
+ messagesSent++;
+ System.out.println("message = " + message);
+ } catch (IOException e) {
+ rejected = true;
+ }
+ }
+ assertTrue(rejected);
+ rejected = false;
+ assertEquals(0, sender.getSender().getCredit());
+ AmqpSession session2 = connection.createSession();
+ AmqpReceiver receiver = session2.createReceiver(getQueueName());
+ receiver.flow(messagesSent);
+ for (int i = 0; i < messagesSent; i++) {
+ AmqpMessage receive = receiver.receive();
+ receive.accept();
+ }
+ receiver.close();
+ session2.close();
+ assertEquals(1000, sender.getSender().getCredit());
+ for (int i = 0; i < 1000; i++) {
+ final AmqpMessage message = new AmqpMessage();
+ byte[] payload = new byte[100];
+ message.setBytes(payload);
+ try {
+ sender.send(message);
+ } catch (IOException e) {
+ rejected = true;
+ }
+ }
+ assertTrue(rejected);
+ assertEquals(0, sender.getSender().getCredit());
+ } finally {
+ connection.close();
+ }
+ }
+}
[2/2] activemq-artemis git commit: This closes #2306
Posted by cl...@apache.org.
This closes #2306
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/86c4d1b8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/86c4d1b8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/86c4d1b8
Branch: refs/heads/master
Commit: 86c4d1b85b86e80cf19018765ec6b807c9d91ebd
Parents: 51f4bf0 82795b7
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Sep 13 16:22:29 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Sep 13 16:22:29 2018 -0400
----------------------------------------------------------------------
.../amqp/broker/AMQPSessionCallback.java | 43 ++++++---
.../proton/ProtonServerReceiverContext.java | 1 +
.../core/paging/impl/PagingStoreImpl.java | 5 +-
.../amqp/AmqpFlowControlFailTest.java | 94 ++++++++++++++++++++
4 files changed, 131 insertions(+), 12 deletions(-)
----------------------------------------------------------------------