You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ro...@apache.org on 2015/04/22 11:28:28 UTC
activemq git commit: AMQ-5738: Ensure the sender links for
non-durable consumers also get unregistered, remove duplicate closed field,
fix method names.
Repository: activemq
Updated Branches:
refs/heads/master b313209aa -> 3a5f127d5
AMQ-5738: Ensure the sender links for non-durable consumers also get unregistered, remove duplicate closed field, fix method names.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3a5f127d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3a5f127d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3a5f127d
Branch: refs/heads/master
Commit: 3a5f127d52f96466bb9e2c660ac5dbbed03ecf1d
Parents: b313209
Author: Robert Gemmell <ro...@apache.org>
Authored: Wed Apr 22 10:17:38 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Wed Apr 22 10:17:38 2015 +0100
----------------------------------------------------------------------
.../activemq/transport/amqp/protocol/AmqpConnection.java | 4 ++--
.../activemq/transport/amqp/protocol/AmqpSender.java | 11 +++++------
.../activemq/transport/amqp/protocol/AmqpSession.java | 6 +++---
3 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/3a5f127d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index bab16c9..c0ea6ad 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -563,11 +563,11 @@ public class AmqpConnection implements AmqpProtocolConverter {
//----- Utility methods for connection resources to use ------------------//
- void regosterSender(ConsumerId consumerId, AmqpSender sender) {
+ void registerSender(ConsumerId consumerId, AmqpSender sender) {
subscriptionsByConsumerId.put(consumerId, sender);
}
- void unregosterSender(ConsumerId consumerId) {
+ void unregisterSender(ConsumerId consumerId) {
subscriptionsByConsumerId.remove(consumerId);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/3a5f127d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index eefcbe3..13826b3 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -80,7 +80,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
private final ConsumerInfo consumerInfo;
private final boolean presettle;
- private boolean closed;
private int currentCredit;
private boolean draining;
private long lastDeliveredSequenceId;
@@ -108,8 +107,8 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
@Override
public void open() {
- if (!closed) {
- session.regosterSender(getConsumerId(), this);
+ if (!isClosed()) {
+ session.registerSender(getConsumerId(), this);
}
super.open();
@@ -142,9 +141,9 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
rsi.setClientId(session.getConnection().getClientId());
sendToActiveMQ(rsi, null);
-
- session.unregisterSender(getConsumerId());
}
+
+ session.unregisterSender(getConsumerId());
}
super.close();
@@ -350,7 +349,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
//----- Internal Implementation ------------------------------------------//
public void pumpOutbound() throws Exception {
- while (!closed) {
+ while (!isClosed()) {
while (currentBuffer != null) {
int sent = getEndpoint().send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
if (sent > 0) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/3a5f127d/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index d9f0c0f..d2901ba 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -345,14 +345,14 @@ public class AmqpSession implements AmqpResource {
connection.pumpProtonToSocket();
}
- public void regosterSender(ConsumerId consumerId, AmqpSender sender) {
+ public void registerSender(ConsumerId consumerId, AmqpSender sender) {
consumers.put(consumerId, sender);
- connection.regosterSender(consumerId, sender);
+ connection.registerSender(consumerId, sender);
}
public void unregisterSender(ConsumerId consumerId) {
consumers.remove(consumerId);
- connection.unregosterSender(consumerId);
+ connection.unregisterSender(consumerId);
}
//----- Configuration accessors ------------------------------------------//