You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/10 19:55:24 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5655
Repository: activemq
Updated Branches:
refs/heads/master 55f040e61 -> 185213b44
https://issues.apache.org/jira/browse/AMQ-5655
Provide the open failed property when the broker sends an error
response, otherwise send the normal connection properties on the open.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/185213b4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/185213b4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/185213b4
Branch: refs/heads/master
Commit: 185213b44a00a5f5f89ba5fa733f229a48ac9868
Parents: 55f040e
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 10 14:54:58 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Mar 10 14:55:11 2015 -0400
----------------------------------------------------------------------
.../transport/amqp/AmqpProtocolConverter.java | 28 +++++++++++++++++---
1 file changed, 24 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/185213b4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index d0baa8e..b9b2ff2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -148,6 +148,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Object[] NO_LOCAL_FILTER_IDS = new Object[] { NO_LOCAL_CODE, NO_LOCAL_NAME };
private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
+ private static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
private final AmqpTransport amqpTransport;
private final AmqpWireFormat amqpWireFormat;
@@ -184,8 +185,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
this.protonTransport.setChannelMax(CHANNEL_MAX);
this.protonConnection.collect(eventCollector);
- this.protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
- this.protonConnection.setProperties(getConnetionProperties());
updateTracer();
}
@@ -215,6 +214,21 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
return properties;
}
+ /**
+ * Load and return a <code>Map<Symbol, Object></code> that contains the properties
+ * that this connection supplies to incoming connections when the open has failed
+ * and the remote should expect a close to follow.
+ *
+ * @return the properties that are offered to the incoming connection.
+ */
+ protected Map<Symbol, Object> getFailedConnetionProperties() {
+ Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+
+ properties.put(CONNECTION_OPEN_FAILED, true);
+
+ return properties;
+ }
+
@Override
public void updateTracer() {
if (amqpTransport.isTrace()) {
@@ -597,9 +611,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
Throwable exception = null;
try {
- protonConnection.open();
-
if (response.isException()) {
+ protonConnection.setProperties(getFailedConnetionProperties());
+ protonConnection.open();
+
exception = ((ExceptionResponse) response).getException();
if (exception instanceof SecurityException) {
protonConnection.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
@@ -608,7 +623,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} else {
protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, exception.getMessage()));
}
+
protonConnection.close();
+ } else {
+ protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
+ protonConnection.setProperties(getConnetionProperties());
+ protonConnection.open();
}
} finally {
pumpProtonToSocket();