You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ro...@apache.org on 2015/04/22 11:28:28 UTC

activemq git commit: AMQ-5738: Ensure the sender links for non-durable consumers also get unregistered, remove duplicate closed field, fix method names.

Repository: activemq
Updated Branches:
  refs/heads/master b313209aa -> 3a5f127d5


AMQ-5738: Ensure the sender links for non-durable consumers also get unregistered, remove duplicate closed field, fix method names.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3a5f127d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3a5f127d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3a5f127d

Branch: refs/heads/master
Commit: 3a5f127d52f96466bb9e2c660ac5dbbed03ecf1d
Parents: b313209
Author: Robert Gemmell <ro...@apache.org>
Authored: Wed Apr 22 10:17:38 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Wed Apr 22 10:17:38 2015 +0100

----------------------------------------------------------------------
 .../activemq/transport/amqp/protocol/AmqpConnection.java |  4 ++--
 .../activemq/transport/amqp/protocol/AmqpSender.java     | 11 +++++------
 .../activemq/transport/amqp/protocol/AmqpSession.java    |  6 +++---
 3 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/3a5f127d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index bab16c9..c0ea6ad 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -563,11 +563,11 @@ public class AmqpConnection implements AmqpProtocolConverter {
 
     //----- Utility methods for connection resources to use ------------------//
 
-    void regosterSender(ConsumerId consumerId, AmqpSender sender) {
+    void registerSender(ConsumerId consumerId, AmqpSender sender) {
         subscriptionsByConsumerId.put(consumerId, sender);
     }
 
-    void unregosterSender(ConsumerId consumerId) {
+    void unregisterSender(ConsumerId consumerId) {
         subscriptionsByConsumerId.remove(consumerId);
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/3a5f127d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index eefcbe3..13826b3 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -80,7 +80,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     private final ConsumerInfo consumerInfo;
     private final boolean presettle;
 
-    private boolean closed;
     private int currentCredit;
     private boolean draining;
     private long lastDeliveredSequenceId;
@@ -108,8 +107,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
 
     @Override
     public void open() {
-        if (!closed) {
-            session.regosterSender(getConsumerId(), this);
+        if (!isClosed()) {
+            session.registerSender(getConsumerId(), this);
         }
 
         super.open();
@@ -142,9 +141,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
                 rsi.setClientId(session.getConnection().getClientId());
 
                 sendToActiveMQ(rsi, null);
-
-                session.unregisterSender(getConsumerId());
             }
+
+            session.unregisterSender(getConsumerId());
         }
 
         super.close();
@@ -350,7 +349,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
     //----- Internal Implementation ------------------------------------------//
 
     public void pumpOutbound() throws Exception {
-        while (!closed) {
+        while (!isClosed()) {
             while (currentBuffer != null) {
                 int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
                 if (sent > 0) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/3a5f127d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index d9f0c0f..d2901ba 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -345,14 +345,14 @@ public class AmqpSession implements AmqpResource {
         connection.pumpProtonToSocket();
     }
 
-    public void regosterSender(ConsumerId consumerId, AmqpSender sender) {
+    public void registerSender(ConsumerId consumerId, AmqpSender sender) {
         consumers.put(consumerId, sender);
-        connection.regosterSender(consumerId, sender);
+        connection.registerSender(consumerId, sender);
     }
 
     public void unregisterSender(ConsumerId consumerId) {
         consumers.remove(consumerId);
-        connection.unregosterSender(consumerId);
+        connection.unregisterSender(consumerId);
     }
 
     //----- Configuration accessors ------------------------------------------//