You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/10/14 00:50:18 UTC
git commit: Add some initial support for gathing connection
capabilities and properties into a config object. Start on real anonymous
producer support.
Repository: qpid-jms
Updated Branches:
refs/heads/master 95941245a -> e3039f140
Add some initial support for gathing connection capabilities and
properties into a config object. Start on real anonymous producer
support.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/e3039f14
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/e3039f14
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/e3039f14
Branch: refs/heads/master
Commit: e3039f140e10d5f427e25fb4d0de92133667468b
Parents: 9594124
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Oct 13 18:50:11 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Oct 13 18:50:11 2014 -0400
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpConnection.java | 16 +++++
.../provider/amqp/AmqpConnectionProperties.java | 69 ++++++++++++++++++++
.../jms/provider/amqp/AmqpFixedProducer.java | 16 +++--
.../qpid/jms/provider/amqp/AmqpSession.java | 5 +-
4 files changed, 97 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e3039f14/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index e53d5da..97118d7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -49,6 +49,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
private boolean connected;
private AmqpSaslAuthenticator authenticator;
private final AmqpSession connectionSession;
+ private AmqpConnectionProperties properties;
private String queuePrefix;
private String topicPrefix;
@@ -114,6 +115,10 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
if (!connected && isOpen()) {
connected = true;
+
+ this.properties = new AmqpConnectionProperties(
+ endpoint.getRemoteOfferedCapabilities(), endpoint.getRemoteProperties());
+
connectionSession.open(new AsyncResult() {
@Override
@@ -329,6 +334,17 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
return this.amqpMessageFactory;
}
+ /**
+ * Returns the connection properties for an established connection which defines the various
+ * capabilities and configuration options of the remote connection. Prior to the establishment
+ * of a connection this method returns null.
+ *
+ * @return the properties available for this connection or null if not connected.
+ */
+ public AmqpConnectionProperties getProperties() {
+ return properties;
+ }
+
@Override
public String toString() {
return "AmqpConnection { " + getConnectionInfo().getConnectionId() + " }";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e3039f14/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
new file mode 100644
index 0000000..add4d72
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Symbol;
+
+/**
+ * Class used to examine the capabilities and connection properties of the
+ * remote connection and provide that information to the client code in a
+ * simpler and more easy to digest manner.
+ */
+public class AmqpConnectionProperties {
+
+ private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay");
+
+ private String anonymousRelayName;
+
+ /**
+ * Creates a new instance of this class from the given remote capabilities and properties.
+ *
+ * @param capabilities
+ * the capabilities offered by the remote connection.
+ * @param properties
+ * the properties offered by the remote connection.
+ */
+ public AmqpConnectionProperties(Symbol[] capabilities, Map<Symbol, Object> properties) {
+ if (capabilities != null) {
+ processCapabilities(capabilities);
+ }
+
+ if (properties != null) {
+ processProperties(properties);
+ }
+ }
+
+ public boolean isAnonymousRelaySupported() {
+ return anonymousRelayName != null;
+ }
+
+ public String getAnonymousRelayName() {
+ return anonymousRelayName;
+ }
+
+ protected void processCapabilities(Symbol[] capabilities) {
+ // TODO - Inspect capabilities for configuration options
+ }
+
+ protected void processProperties(Map<Symbol, Object> properties) {
+ if (properties.containsKey(ANONYMOUS_RELAY)) {
+ anonymousRelayName = (String) properties.get(ANONYMOUS_RELAY);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e3039f14/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 6a37da3..3afc492 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -233,16 +233,22 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
protected void doOpen() {
- JmsDestination destination = info.getDestination();
+ String targetAddress;
+
+ if (info.getDestination() != null) {
+ JmsDestination destination = info.getDestination();
+ targetAddress = session.getQualifiedName(destination);
+ } else {
+ targetAddress = connection.getProperties().getAnonymousRelayName();
+ }
- String destnationName = session.getQualifiedName(destination);
String sourceAddress = getProducerId().toString();
Source source = new Source();
source.setAddress(sourceAddress);
Target target = new Target();
- target.setAddress(destnationName);
+ target.setAddress(targetAddress);
- String senderName = sourceAddress + ":" + destnationName;
+ String senderName = sourceAddress + ":" + targetAddress;
endpoint = session.getProtonSession().sender(senderName);
endpoint.setSource(source);
endpoint.setTarget(target);
@@ -268,7 +274,7 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
public boolean isAnonymous() {
- return false;
+ return this.info.getDestination() == null;
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e3039f14/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 44e864a..10519f3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -101,10 +101,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
public AmqpProducer createProducer(JmsProducerInfo producerInfo) {
AmqpProducer producer = null;
- // TODO - There seems to be an issue with Proton not allowing links with a Target
- // that has no address. Otherwise we could just ensure that messages sent
- // to these anonymous targets have their 'to' value set to the destination.
- if (producerInfo.getDestination() != null) {
+ if (producerInfo.getDestination() != null || connection.getProperties().isAnonymousRelaySupported()) {
LOG.debug("Creating fixed Producer for: {}", producerInfo.getDestination());
producer = new AmqpFixedProducer(this, producerInfo);
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org