You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/04/14 17:18:38 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5723
Repository: activemq
Updated Branches:
refs/heads/master 47f5c0857 -> b3bf8e74f
https://issues.apache.org/jira/browse/AMQ-5723
Ensure that we settle the delivery state of incoming deliveries that are
already remotely settled so that the resources associated are freed.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b3bf8e74
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b3bf8e74
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b3bf8e74
Branch: refs/heads/master
Commit: b3bf8e74f29bf30930da20c503ce11b3b780deaf
Parents: 47f5c08
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Apr 14 11:18:32 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Apr 14 11:18:32 2015 -0400
----------------------------------------------------------------------
.../transport/amqp/protocol/AmqpReceiver.java | 6 ++--
.../transport/amqp/client/AmqpSender.java | 9 +++---
.../transport/amqp/client/AmqpSession.java | 19 +++++++++++-
.../transport/amqp/interop/AmqpSenderTest.java | 32 ++++++++++++++++++++
4 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 9ab7ebe..6051cde 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -219,7 +219,6 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
rejected.setError(condition);
delivery.disposition(rejected);
} else {
-
if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
@@ -234,10 +233,9 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
} else {
delivery.disposition(Accepted.getInstance());
}
-
- delivery.settle();
}
+ delivery.settle();
session.pumpProtonToSocket();
}
});
@@ -247,6 +245,8 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
session.pumpProtonToSocket();
}
+
+ delivery.settle();
sendToActiveMQ(message);
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index c8829e0..e8e5c8b 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -377,13 +377,12 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
outcome = (Outcome) state;
} else {
LOG.warn("Message send updated with unsupported state: {}", state);
- continue;
+ outcome = null;
}
AsyncResult request = (AsyncResult) delivery.getContext();
if (outcome instanceof Accepted) {
- toRemove.add(delivery);
LOG.trace("Outcome of delivery was accepted: {}", delivery);
tagGenerator.returnTag(delivery.getTag());
if (request != null && !request.isComplete()) {
@@ -391,7 +390,6 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
}
} else if (outcome instanceof Rejected) {
Exception remoteError = getRemoteError();
- toRemove.add(delivery);
LOG.trace("Outcome of delivery was rejected: {}", delivery);
tagGenerator.returnTag(delivery.getTag());
if (request != null && !request.isComplete()) {
@@ -399,9 +397,12 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
} else {
connection.fireClientException(getRemoteError());
}
- } else {
+ } else if (outcome != null) {
LOG.warn("Message send updated with unsupported outcome: {}", outcome);
}
+
+ delivery.settle();
+ toRemove.add(delivery);
}
pending.removeAll(toRemove);
http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 8af362b..b7ebeec 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -54,16 +54,33 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
* Create a sender instance using the given address
*
* @param address
- * the address to which the sender will produce its messages.
+ * the address to which the sender will produce its messages.
*
* @return a newly created sender that is ready for use.
*
* @throws Exception if an error occurs while creating the sender.
*/
public AmqpSender createSender(final String address) throws Exception {
+ return createSender(address, false);
+ }
+
+ /**
+ * Create a sender instance using the given address
+ *
+ * @param address
+ * the address to which the sender will produce its messages.
+ * @param presettle
+ * controls if the created sender produces message that have already been marked settled.
+ *
+ * @return a newly created sender that is ready for use.
+ *
+ * @throws Exception if an error occurs while creating the sender.
+ */
+ public AmqpSender createSender(final String address, boolean presettle) throws Exception {
checkClosed();
final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId());
+ sender.setPresettle(presettle);
final ClientFuture request = new ClientFuture();
connection.getScheduler().execute(new Runnable() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
index 3f6a454..886a42e 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
@@ -18,14 +18,17 @@ package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.util.Wait;
import org.junit.Test;
/**
@@ -91,4 +94,33 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
sender.close();
connection.close();
}
+
+ @Test(timeout = 60000)
+ public void testPresettledSender() throws Exception {
+ final int MSG_COUNT = 1000;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender("topic://" + getTestName(), true);
+
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setText("Test-Message: " + i);
+ sender.send(message);
+ }
+
+ final TopicViewMBean topic = getProxyToTopic(getTestName());
+ assertTrue("All messages should arrive", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return topic.getEnqueueCount() == MSG_COUNT;
+ }
+ }));
+
+ sender.close();
+ connection.close();
+ }
}
\ No newline at end of file