You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2016/07/26 11:30:26 UTC
[1/2] activemq-artemis git commit: Add default AMQP flow behaviour
and fix proton test
Repository: activemq-artemis
Updated Branches:
refs/heads/master 110158bb8 -> 6cb681555
Add default AMQP flow behaviour and fix proton test
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b549bb24
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b549bb24
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b549bb24
Branch: refs/heads/master
Commit: b549bb243c2b4536f2ca9d84e777cca9bff019b9
Parents: 110158b
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Jul 26 11:11:33 2016 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Jul 26 11:14:25 2016 +0100
----------------------------------------------------------------------
.../plug/ProtonSessionIntegrationCallback.java | 1 +
.../context/AbstractProtonReceiverContext.java | 11 +-
.../tests/integration/proton/ProtonTest.java | 115 +++++++++++++------
3 files changed, 92 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b549bb24/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index ab57fe1..b2d029f 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -405,6 +405,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
public void run() {
if (receiver.getRemoteCredit() < threshold) {
receiver.flow(credits);
+ connection.flush();
}
}
});
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b549bb24/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
index 5a43029..c210950 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/AbstractProtonReceiverContext.java
@@ -58,10 +58,17 @@ public abstract class AbstractProtonReceiverContext extends ProtonInitializable
}
public void flow(int credits, int threshold) {
- synchronized (connection.getLock()) {
+ // Use the SessionSPI to allocate producer credits, or default, always allocate credit.
+ if (sessionSPI != null) {
sessionSPI.offerProducerCredit(address, credits, threshold, receiver);
}
- connection.flush();
+ else {
+ synchronized (connection.getLock()) {
+ receiver.flow(credits);
+ connection.flush();
+ }
+ }
+
}
public void drain(int credits) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b549bb24/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index 8874271..2c68dde 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -45,7 +45,9 @@ import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -136,9 +138,9 @@ public class ProtonTest extends ActiveMQTestBase {
server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+ // Default Page
AddressSettings addressSettings = new AddressSettings();
- addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
- addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
+ addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.PAGE);
server.getConfiguration().getAddressesSettings().put("#", addressSettings);
server.start();
@@ -230,20 +232,25 @@ public class ProtonTest extends ActiveMQTestBase {
@Test
public void testResourceLimitExceptionOnAddressFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
+ setAddressFullBlockPolicy();
+
fillAddress(address + 1);
}
@Test
public void testAddressIsBlockedForOtherProdudersWhenFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
+ setAddressFullBlockPolicy();
+
String destinationAddress = address + 1;
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination d = session.createQueue(destinationAddress);
+ MessageProducer p = session.createProducer(d);
+
fillAddress(destinationAddress);
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Exception e = null;
try {
- Destination d = session.createQueue(destinationAddress);
- MessageProducer p = session.createProducer(d);
p.send(session.createBytesMessage());
}
catch (ResourceAllocationException rae) {
@@ -256,6 +263,7 @@ public class ProtonTest extends ActiveMQTestBase {
@Test
public void testCreditsAreNotAllocatedWhenAddressIsFull() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
+ setAddressFullBlockPolicy();
// Only allow 1 credit to be submitted at a time.
Field maxCreditAllocation = ProtonServerReceiverContext.class.getDeclaredField("maxCreditAllocation");
@@ -269,9 +277,13 @@ public class ProtonTest extends ActiveMQTestBase {
try {
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(destinationAddress);
- sender.setSendTimeout(1000);
- sendUntilFull(sender);
- assertTrue(sender.getSender().getCredit() <= 0);
+
+ // Use blocking send to ensure buffered messages do not interfere with credit.
+ sender.setSendTimeout(-1);
+ sendUntilFull(sender, destinationAddress);
+
+ // This should be -1. A single message is buffered in the client, and 0 credit has been allocated.
+ assertTrue(sender.getSender().getCredit() == -1);
}
finally {
amqpConnection.close();
@@ -282,13 +294,14 @@ public class ProtonTest extends ActiveMQTestBase {
@Test
public void testCreditsAreRefreshedWhenAddressIsUnblocked() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
+ setAddressFullBlockPolicy();
String destinationAddress = address + 1;
int messagesSent = fillAddress(destinationAddress);
- AmqpConnection amqpConnection = null;
+ AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+ AmqpConnection amqpConnection = amqpConnection = client.connect();
try {
- amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(destinationAddress);
@@ -308,7 +321,8 @@ public class ProtonTest extends ActiveMQTestBase {
// Wait for address to unblock and flow frame to arrive
Thread.sleep(500);
- assertTrue(sender.getSender().getCredit() > 0);
+
+ assertTrue(sender.getSender().getCredit() == 0);
assertNotNull(receiver.receive());
}
finally {
@@ -319,11 +333,12 @@ public class ProtonTest extends ActiveMQTestBase {
@Test
public void testNewLinkAttachAreNotAllocatedCreditsWhenAddressIsBlocked() throws Exception {
if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP protocol
+ setAddressFullBlockPolicy();
fillAddress(address + 1);
- AmqpConnection amqpConnection = null;
+ AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
+ AmqpConnection amqpConnection = amqpConnection = client.connect();
try {
- amqpConnection = AmqpClient.connect(new URI(tcpAmqpConnectionUri));
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address + 1);
// Wait for a potential flow frame.
@@ -344,38 +359,62 @@ public class ProtonTest extends ActiveMQTestBase {
private int fillAddress(String address) throws Exception {
AmqpClient client = new AmqpClient(new URI(tcpAmqpConnectionUri), userName, password);
AmqpConnection amqpConnection = client.connect();
+ int messagesSent = 0;
+ Exception exception = null;
try {
AmqpSession session = amqpConnection.createSession();
AmqpSender sender = session.createSender(address);
- return sendUntilFull(sender);
+ messagesSent = sendUntilFull(sender, null);
+ }
+ catch (Exception e) {
+ exception = e;
}
finally {
amqpConnection.close();
}
+
+ // Should receive a rejected error
+ assertNotNull(exception);
+ assertTrue(exception.getMessage().contains("amqp:resource-limit-exceeded"));
+
+ return messagesSent;
}
- private int sendUntilFull(AmqpSender sender) throws IOException {
- AmqpMessage message = new AmqpMessage();
+ private int sendUntilFull(final AmqpSender sender, String expectedErrorMessage) throws Exception {
+ final AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[50 * 1024];
+ message.setBytes(payload);
- int sentMessages = 0;
- int maxMessages = 50;
+ final int maxMessages = 50;
+ final AtomicInteger sentMessages = new AtomicInteger(0);
+ final Exception[] errors = new Exception[1];
+ final CountDownLatch timeout = new CountDownLatch(1);
- Exception e = null;
- try {
- for (int i = 0; i < maxMessages; i++) {
- message.setBytes(payload);
- sender.send(message);
- sentMessages++;
+ Runnable sendMessages = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ for (int i = 0; i < maxMessages; i++) {
+ sender.send(message);
+ sentMessages.getAndIncrement();
+ }
+ timeout.countDown();
+ }
+ catch (IOException e) {
+ errors[0] = e;
+ }
}
- }
- catch (IOException ioe) {
- e = ioe;
- }
+ };
+
+ Thread t = new Thread(sendMessages);
+ t.start();
+
+ timeout.await(5, TimeUnit.SECONDS);
- assertNotNull(e);
- assertTrue(e.getMessage().contains("amqp:resource-limit-exceeded"));
- return sentMessages;
+ if (errors[0] != null) {
+ throw errors[0];
+ }
+ return sentMessages.get();
}
@Test
@@ -398,7 +437,6 @@ public class ProtonTest extends ActiveMQTestBase {
Destination jmsReplyTo = message.getJMSReplyTo();
Assert.assertNotNull(jmsReplyTo);
Assert.assertNotNull(message);
-
}
@Test
@@ -729,10 +767,13 @@ public class ProtonTest extends ActiveMQTestBase {
consumer.close();
connection.close();
+
+ // Wait for Acks to be processed and message removed from queue.
+ Thread.sleep(500);
+
Assert.assertEquals(0, getMessageCount(q));
long taken = (System.currentTimeMillis() - time) / 1000;
System.out.println("taken = " + taken);
-
}
@Test
@@ -1140,6 +1181,14 @@ public class ProtonTest extends ActiveMQTestBase {
return connection;
}
+ private void setAddressFullBlockPolicy() {
+ // For BLOCK tests
+ AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");
+ addressSettings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
+ addressSettings.setMaxSizeBytes(1 * 1024 * 1024);
+ server.getAddressSettingsRepository().addMatch("#", addressSettings);
+ }
+
public static class AnythingSerializable implements Serializable {
private int count;
[2/2] activemq-artemis git commit: This closes #663 Add default AMQP
flow behaviour and fix proton test
Posted by an...@apache.org.
This closes #663 Add default AMQP flow behaviour and fix proton test
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/6cb68155
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/6cb68155
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/6cb68155
Branch: refs/heads/master
Commit: 6cb681555a35bfc78f8f606a2c1e57c9593fde75
Parents: 110158b b549bb2
Author: Andy Taylor <an...@gmail.com>
Authored: Tue Jul 26 12:29:55 2016 +0100
Committer: Andy Taylor <an...@gmail.com>
Committed: Tue Jul 26 12:29:55 2016 +0100
----------------------------------------------------------------------
.../plug/ProtonSessionIntegrationCallback.java | 1 +
.../context/AbstractProtonReceiverContext.java | 11 +-
.../tests/integration/proton/ProtonTest.java | 115 +++++++++++++------
3 files changed, 92 insertions(+), 35 deletions(-)
----------------------------------------------------------------------