You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/10 19:55:24 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5655

Repository: activemq
Updated Branches:
  refs/heads/master 55f040e61 -> 185213b44


https://issues.apache.org/jira/browse/AMQ-5655

Provide the open failed property when the broker sends an error
response, otherwise send the normal connection properties on the open.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/185213b4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/185213b4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/185213b4

Branch: refs/heads/master
Commit: 185213b44a00a5f5f89ba5fa733f229a48ac9868
Parents: 55f040e
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 10 14:54:58 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Mar 10 14:55:11 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 28 +++++++++++++++++---
 1 file changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/185213b4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index d0baa8e..b9b2ff2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -148,6 +148,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Object[] NO_LOCAL_FILTER_IDS = new Object[] { NO_LOCAL_CODE, NO_LOCAL_NAME };
     private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
     private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
+    private static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
 
     private final AmqpTransport amqpTransport;
     private final AmqpWireFormat amqpWireFormat;
@@ -184,8 +185,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         this.protonTransport.setChannelMax(CHANNEL_MAX);
 
         this.protonConnection.collect(eventCollector);
-        this.protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
-        this.protonConnection.setProperties(getConnetionProperties());
 
         updateTracer();
     }
@@ -215,6 +214,21 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         return properties;
     }
 
+    /**
+     * Load and return a <code>Map<Symbol, Object></code> that contains the properties
+     * that this connection supplies to incoming connections when the open has failed
+     * and the remote should expect a close to follow.
+     *
+     * @return the properties that are offered to the incoming connection.
+     */
+    protected Map<Symbol, Object> getFailedConnetionProperties() {
+        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+
+        properties.put(CONNECTION_OPEN_FAILED, true);
+
+        return properties;
+    }
+
     @Override
     public void updateTracer() {
         if (amqpTransport.isTrace()) {
@@ -597,9 +611,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
                 Throwable exception = null;
                 try {
-                    protonConnection.open();
-
                     if (response.isException()) {
+                        protonConnection.setProperties(getFailedConnetionProperties());
+                        protonConnection.open();
+
                         exception = ((ExceptionResponse) response).getException();
                         if (exception instanceof SecurityException) {
                             protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
@@ -608,7 +623,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                         } else {
                             protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage()));
                         }
+
                         protonConnection.close();
+                    } else {
+                        protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+                        protonConnection.setProperties(getConnetionProperties());
+                        protonConnection.open();
                     }
                 } finally {
                     pumpProtonToSocket();