You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/04/14 17:18:38 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5723

Repository: activemq
Updated Branches:
  refs/heads/master 47f5c0857 -> b3bf8e74f


https://issues.apache.org/jira/browse/AMQ-5723

Ensure that we settle the delivery state of incoming deliveries that are
already remotely settled so that the resources associated are freed.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b3bf8e74
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b3bf8e74
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b3bf8e74

Branch: refs/heads/master
Commit: b3bf8e74f29bf30930da20c503ce11b3b780deaf
Parents: 47f5c08
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Apr 14 11:18:32 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Apr 14 11:18:32 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpReceiver.java   |  6 ++--
 .../transport/amqp/client/AmqpSender.java       |  9 +++---
 .../transport/amqp/client/AmqpSession.java      | 19 +++++++++++-
 .../transport/amqp/interop/AmqpSenderTest.java  | 32 ++++++++++++++++++++
 4 files changed, 58 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 9ab7ebe..6051cde 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -219,7 +219,6 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
                             rejected.setError(condition);
                             delivery.disposition(rejected);
                         } else {
-
                             if (getEndpoint().getCredit() <= (getConfiguredReceiverCredit() * .2)) {
                                 LOG.trace("Sending more credit ({}) to producer: {}", getConfiguredReceiverCredit() - getEndpoint().getCredit(), getProducerId());
                                 getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
@@ -234,10 +233,9 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
                             } else {
                                 delivery.disposition(Accepted.getInstance());
                             }
-
-                            delivery.settle();
                         }
 
+                        delivery.settle();
                         session.pumpProtonToSocket();
                     }
                 });
@@ -247,6 +245,8 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
                     getEndpoint().flow(getConfiguredReceiverCredit() - getEndpoint().getCredit());
                     session.pumpProtonToSocket();
                 }
+
+                delivery.settle();
                 sendToActiveMQ(message);
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index c8829e0..e8e5c8b 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -377,13 +377,12 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
                 outcome = (Outcome) state;
             } else {
                 LOG.warn("Message send updated with unsupported state: {}", state);
-                continue;
+                outcome = null;
             }
 
             AsyncResult request = (AsyncResult) delivery.getContext();
 
             if (outcome instanceof Accepted) {
-                toRemove.add(delivery);
                 LOG.trace("Outcome of delivery was accepted: {}", delivery);
                 tagGenerator.returnTag(delivery.getTag());
                 if (request != null && !request.isComplete()) {
@@ -391,7 +390,6 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
                 }
             } else if (outcome instanceof Rejected) {
                 Exception remoteError = getRemoteError();
-                toRemove.add(delivery);
                 LOG.trace("Outcome of delivery was rejected: {}", delivery);
                 tagGenerator.returnTag(delivery.getTag());
                 if (request != null && !request.isComplete()) {
@@ -399,9 +397,12 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
                 } else {
                     connection.fireClientException(getRemoteError());
                 }
-            } else {
+            } else if (outcome != null) {
                 LOG.warn("Message send updated with unsupported outcome: {}", outcome);
             }
+
+            delivery.settle();
+            toRemove.add(delivery);
         }
 
         pending.removeAll(toRemove);

http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 8af362b..b7ebeec 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -54,16 +54,33 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
      * Create a sender instance using the given address
      *
      * @param address
-     * 	      the address to which the sender will produce its messages.
+     *        the address to which the sender will produce its messages.
      *
      * @return a newly created sender that is ready for use.
      *
      * @throws Exception if an error occurs while creating the sender.
      */
     public AmqpSender createSender(final String address) throws Exception {
+        return createSender(address, false);
+    }
+
+    /**
+     * Create a sender instance using the given address
+     *
+     * @param address
+     * 	      the address to which the sender will produce its messages.
+     * @param presettle
+     *        controls if the created sender produces message that have already been marked settled.
+     *
+     * @return a newly created sender that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the sender.
+     */
+    public AmqpSender createSender(final String address, boolean presettle) throws Exception {
         checkClosed();
 
         final AmqpSender sender = new AmqpSender(AmqpSession.this, address, getNextSenderId());
+        sender.setPresettle(presettle);
         final ClientFuture request = new ClientFuture();
 
         connection.getScheduler().execute(new Runnable() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/b3bf8e74/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
index 3f6a454..886a42e 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSenderTest.java
@@ -18,14 +18,17 @@ package org.apache.activemq.transport.amqp.interop;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
 import org.apache.activemq.transport.amqp.client.AmqpMessage;
 import org.apache.activemq.transport.amqp.client.AmqpSender;
 import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.apache.activemq.util.Wait;
 import org.junit.Test;
 
 /**
@@ -91,4 +94,33 @@ public class AmqpSenderTest extends AmqpClientTestSupport {
         sender.close();
         connection.close();
     }
+
+    @Test(timeout = 60000)
+    public void testPresettledSender() throws Exception {
+        final int MSG_COUNT = 1000;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("topic://" + getTestName(), true);
+
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setText("Test-Message: " + i);
+            sender.send(message);
+        }
+
+        final TopicViewMBean topic = getProxyToTopic(getTestName());
+        assertTrue("All messages should arrive", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return topic.getEnqueueCount() == MSG_COUNT;
+            }
+        }));
+
+        sender.close();
+        connection.close();
+    }
 }
\ No newline at end of file