You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/30 00:13:44 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6341
Repository: activemq
Updated Branches:
refs/heads/master 4e23adfcc -> 83827f277
https://issues.apache.org/jira/browse/AMQ-6341
Wait on broker response for async broker commands.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/83827f27
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/83827f27
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/83827f27
Branch: refs/heads/master
Commit: 83827f27709a2b8b1f8d080a4913100b80fe3429
Parents: 4e23adf
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jun 29 20:13:34 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jun 29 20:13:34 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/protocol/AmqpReceiver.java | 12 ++++--
.../transport/amqp/protocol/AmqpSender.java | 42 +++++++++++++-------
.../transport/amqp/protocol/AmqpSession.java | 12 ++++--
3 files changed, 44 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/83827f27/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 07abb42..3ae018e 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -90,10 +90,16 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
@Override
public void close() {
if (!isClosed() && isOpened()) {
- sendToActiveMQ(new RemoveInfo(getProducerId()));
- }
+ sendToActiveMQ(new RemoveInfo(getProducerId()), new ResponseHandler() {
- super.close();
+ @Override
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ AmqpReceiver.super.close();
+ }
+ });
+ } else {
+ super.close();
+ }
}
//----- Configuration accessors ------------------------------------------//
http://git-wip-us.apache.org/repos/asf/activemq/blob/83827f27/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 9fb85a3..12bd627 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -118,12 +118,18 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
if (!isClosed() && isOpened()) {
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
- sendToActiveMQ(removeCommand);
- session.unregisterSender(getConsumerId());
- }
+ sendToActiveMQ(removeCommand, new ResponseHandler() {
- super.detach();
+ @Override
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ session.unregisterSender(getConsumerId());
+ AmqpSender.super.detach();
+ }
+ });
+ } else {
+ super.detach();
+ }
}
@Override
@@ -131,21 +137,27 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
if (!isClosed() && isOpened()) {
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
- sendToActiveMQ(removeCommand);
- if (consumerInfo.isDurable()) {
- RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
- rsi.setConnectionId(session.getConnection().getConnectionId());
- rsi.setSubscriptionName(getEndpoint().getName());
- rsi.setClientId(session.getConnection().getClientId());
+ sendToActiveMQ(removeCommand, new ResponseHandler() {
- sendToActiveMQ(rsi);
- }
+ @Override
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ if (consumerInfo.isDurable()) {
+ RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+ rsi.setConnectionId(session.getConnection().getConnectionId());
+ rsi.setSubscriptionName(getEndpoint().getName());
+ rsi.setClientId(session.getConnection().getClientId());
- session.unregisterSender(getConsumerId());
- }
+ sendToActiveMQ(rsi);
+ }
- super.close();
+ session.unregisterSender(getConsumerId());
+ AmqpSender.super.close();
+ }
+ });
+ } else {
+ super.close();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/83827f27/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index 20a8b9f..c390b8c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -108,11 +108,15 @@ public class AmqpSession implements AmqpResource {
public void close() {
LOG.debug("Session {} closed", getSessionId());
- getEndpoint().setContext(null);
- getEndpoint().close();
- getEndpoint().free();
+ connection.sendToActiveMQ(new RemoveInfo(getSessionId()), new ResponseHandler() {
- connection.sendToActiveMQ(new RemoveInfo(getSessionId()));
+ @Override
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ getEndpoint().setContext(null);
+ getEndpoint().close();
+ getEndpoint().free();
+ }
+ });
}
/**