You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/30 00:13:44 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6341

Repository: activemq
Updated Branches:
  refs/heads/master 4e23adfcc -> 83827f277


https://issues.apache.org/jira/browse/AMQ-6341

Wait on broker response for async broker commands.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/83827f27
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/83827f27
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/83827f27

Branch: refs/heads/master
Commit: 83827f27709a2b8b1f8d080a4913100b80fe3429
Parents: 4e23adf
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jun 29 20:13:34 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jun 29 20:13:34 2016 -0400

----------------------------------------------------------------------
 .../transport/amqp/protocol/AmqpReceiver.java   | 12 ++++--
 .../transport/amqp/protocol/AmqpSender.java     | 42 +++++++++++++-------
 .../transport/amqp/protocol/AmqpSession.java    | 12 ++++--
 3 files changed, 44 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/83827f27/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
index 07abb42..3ae018e 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpReceiver.java
@@ -90,10 +90,16 @@ public class AmqpReceiver extends AmqpAbstractReceiver {
     @Override
     public void close() {
         if (!isClosed() && isOpened()) {
-            sendToActiveMQ(new RemoveInfo(getProducerId()));
-        }
+            sendToActiveMQ(new RemoveInfo(getProducerId()), new ResponseHandler() {
 
-        super.close();
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    AmqpReceiver.super.close();
+                }
+            });
+        } else {
+            super.close();
+        }
     }
 
     //----- Configuration accessors ------------------------------------------//

http://git-wip-us.apache.org/repos/asf/activemq/blob/83827f27/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index 9fb85a3..12bd627 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -118,12 +118,18 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
         if (!isClosed() && isOpened()) {
             RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
             removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
-            sendToActiveMQ(removeCommand);
 
-            session.unregisterSender(getConsumerId());
-        }
+            sendToActiveMQ(removeCommand, new ResponseHandler() {
 
-        super.detach();
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    session.unregisterSender(getConsumerId());
+                    AmqpSender.super.detach();
+                }
+            });
+        } else {
+            super.detach();
+        }
     }
 
     @Override
@@ -131,21 +137,27 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
         if (!isClosed() && isOpened()) {
             RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
             removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
-            sendToActiveMQ(removeCommand);
 
-            if (consumerInfo.isDurable()) {
-                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
-                rsi.setConnectionId(session.getConnection().getConnectionId());
-                rsi.setSubscriptionName(getEndpoint().getName());
-                rsi.setClientId(session.getConnection().getClientId());
+            sendToActiveMQ(removeCommand, new ResponseHandler() {
 
-                sendToActiveMQ(rsi);
-            }
+                @Override
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    if (consumerInfo.isDurable()) {
+                        RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                        rsi.setConnectionId(session.getConnection().getConnectionId());
+                        rsi.setSubscriptionName(getEndpoint().getName());
+                        rsi.setClientId(session.getConnection().getClientId());
 
-            session.unregisterSender(getConsumerId());
-        }
+                        sendToActiveMQ(rsi);
+                    }
 
-        super.close();
+                    session.unregisterSender(getConsumerId());
+                    AmqpSender.super.close();
+                }
+            });
+        } else {
+            super.close();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/83827f27/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
index 20a8b9f..c390b8c 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSession.java
@@ -108,11 +108,15 @@ public class AmqpSession implements AmqpResource {
     public void close() {
         LOG.debug("Session {} closed", getSessionId());
 
-        getEndpoint().setContext(null);
-        getEndpoint().close();
-        getEndpoint().free();
+        connection.sendToActiveMQ(new RemoveInfo(getSessionId()), new ResponseHandler() {
 
-        connection.sendToActiveMQ(new RemoveInfo(getSessionId()));
+            @Override
+            public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                getEndpoint().setContext(null);
+                getEndpoint().close();
+                getEndpoint().free();
+            }
+        });
     }
 
     /**