You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/14 23:32:46 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5391
Repository: activemq
Updated Branches:
refs/heads/trunk 27833d025 -> 78cb1120b
https://issues.apache.org/jira/browse/AMQ-5391
Allow for an anonymous relay using a configurable node name when
creating the new link, default is $relay. Message's that arrive without
a to field set are rejected as this is required for a relay.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/78cb1120
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/78cb1120
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/78cb1120
Branch: refs/heads/trunk
Commit: 78cb1120b7b93957c36e0abc12e1d22f0f0d7390
Parents: 27833d0
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 14 17:32:23 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 14 17:32:23 2014 -0400
----------------------------------------------------------------------
.../transport/amqp/AmqpProtocolConverter.java | 57 ++++++++++++++++++--
.../activemq/transport/amqp/AmqpWireFormat.java | 9 ++++
2 files changed, 62 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/78cb1120/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index fb275b8..472aeb9 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.jms.Destination;
import javax.jms.InvalidClientIDException;
import javax.jms.InvalidSelectorException;
@@ -111,10 +112,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
+ private static final int CHANNEL_MAX = 32767;
private final AmqpTransport amqpTransport;
private static final Symbol COPY = Symbol.getSymbol("copy");
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
+ private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay");
private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
protected int prefetch;
@@ -132,10 +135,33 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
}
this.protonTransport.bind(this.protonConnection);
+
+ // NOTE: QPid JMS client has a bug where the channel max is stored as a
+ // short value in the Connection class which means that if we allow
+ // the default channel max of 65535 to be sent then no new sessions
+ // can be created because the value would be -1 when checked.
+ this.protonTransport.setChannelMax(CHANNEL_MAX);
+
this.protonConnection.collect(eventCollector);
+ this.protonConnection.setProperties(getConnectionProperties());
+
updateTracer();
}
+ /**
+ * Load and return a <code>Map<Symbol, Object></code> that contains the connection
+ * properties which will allow the client to better communicate with this broker.
+ *
+ * @return the properties that are sent to new clients on connect.
+ */
+ protected Map<Symbol, Object> getConnectionProperties() {
+ Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+
+ properties.put(ANONYMOUS_RELAY, amqpTransport.getWireFormat().getAnonymousNodeName());
+
+ return properties;
+ }
+
@Override
public void updateTracer() {
if (amqpTransport.isTrace()) {
@@ -559,10 +585,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
private final ActiveMQDestination destination;
private boolean closed;
+ private final boolean anonymous;
- public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
+ public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean anonymous) {
this.producerId = producerId;
this.destination = destination;
+ this.anonymous = anonymous;
}
@Override
@@ -581,6 +609,17 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
if (destination != null) {
message.setJMSDestination(destination);
+ } else if (isAnonymous()) {
+ Destination toDestination = message.getJMSDestination();
+ if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) {
+ Rejected rejected = new Rejected();
+ ErrorCondition condition = new ErrorCondition();
+ condition.setCondition(Symbol.valueOf("failed"));
+ condition.setDescription("Missing to field for message sent to an anonymous producer");
+ rejected.setError(condition);
+ delivery.disposition(rejected);
+ return;
+ }
}
message.setProducerId(producerId);
@@ -673,6 +712,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
sendToActiveMQ(new RemoveInfo(producerId), null);
}
}
+
+ public boolean isAnonymous() {
+ return anonymous;
+ }
}
long nextTransactionId = 1;
@@ -795,8 +838,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} else {
Target target = (Target) remoteTarget;
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
- ActiveMQDestination dest;
- if (target.getDynamic()) {
+ ActiveMQDestination dest = null;
+ boolean anonymous = false;
+
+ if (target.getAddress().equals(amqpTransport.getWireFormat().getAnonymousNodeName())) {
+ anonymous = true;
+ } else if (target.getDynamic()) {
dest = createTempQueue();
Target actualTarget = new Target();
actualTarget.setAddress(dest.getQualifiedName());
@@ -806,10 +853,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
dest = createDestination(remoteTarget);
}
- ProducerContext producerContext = new ProducerContext(producerId, dest);
+ ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous);
receiver.setContext(producerContext);
receiver.flow(flow);
+
ProducerInfo producerInfo = new ProducerInfo(producerId);
producerInfo.setDestination(dest);
sendToActiveMQ(producerInfo, new ResponseHandler() {
@@ -1383,6 +1431,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
return condition;
}
+ @Override
public void setPrefetch(int prefetch) {
this.prefetch = prefetch;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/78cb1120/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index 779cb65..f6c2880 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -40,6 +40,7 @@ public class AmqpWireFormat implements WireFormat {
private int version = 1;
private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
+ private String anonymousNodeName = "$relay";
@Override
public ByteSequence marshal(Object command) throws IOException {
@@ -126,4 +127,12 @@ public class AmqpWireFormat implements WireFormat {
public void setMaxAmqpFrameSize(int maxAmqpFrameSize) {
this.maxAmqpFrameSize = maxAmqpFrameSize;
}
+
+ public String getAnonymousNodeName() {
+ return anonymousNodeName;
+ }
+
+ public void setAnonymousNodeName(String anonymousNodeName) {
+ this.anonymousNodeName = anonymousNodeName;
+ }
}