You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/10/14 23:32:46 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5391

Repository: activemq
Updated Branches:
  refs/heads/trunk 27833d025 -> 78cb1120b


https://issues.apache.org/jira/browse/AMQ-5391

Allow for an anonymous relay using a configurable node name when
creating the new link, default is $relay.  Message's that arrive without
a to field set are rejected as this is required for a relay. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/78cb1120
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/78cb1120
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/78cb1120

Branch: refs/heads/trunk
Commit: 78cb1120b7b93957c36e0abc12e1d22f0f0d7390
Parents: 27833d0
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 14 17:32:23 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 14 17:32:23 2014 -0400

----------------------------------------------------------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 57 ++++++++++++++++++--
 .../activemq/transport/amqp/AmqpWireFormat.java |  9 ++++
 2 files changed, 62 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/78cb1120/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index fb275b8..472aeb9 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.jms.Destination;
 import javax.jms.InvalidClientIDException;
 import javax.jms.InvalidSelectorException;
 
@@ -111,10 +112,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
     private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
     private static final byte[] EMPTY_BYTE_ARRAY = new byte[] {};
+    private static final int CHANNEL_MAX = 32767;
     private final AmqpTransport amqpTransport;
     private static final Symbol COPY = Symbol.getSymbol("copy");
     private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
     private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
+    private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("x-opt-anonymous-relay");
     private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
 
     protected int prefetch;
@@ -132,10 +135,33 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
 
         this.protonTransport.bind(this.protonConnection);
+
+        // NOTE: QPid JMS client has a bug where the channel max is stored as a
+        //       short value in the Connection class which means that if we allow
+        //       the default channel max of 65535 to be sent then no new sessions
+        //       can be created because the value would be -1 when checked.
+        this.protonTransport.setChannelMax(CHANNEL_MAX);
+
         this.protonConnection.collect(eventCollector);
+        this.protonConnection.setProperties(getConnectionProperties());
+
         updateTracer();
     }
 
+    /**
+     * Load and return a <code>Map<Symbol, Object></code> that contains the connection
+     * properties which will allow the client to better communicate with this broker.
+     *
+     * @return the properties that are sent to new clients on connect.
+     */
+    protected Map<Symbol, Object> getConnectionProperties() {
+        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+
+        properties.put(ANONYMOUS_RELAY, amqpTransport.getWireFormat().getAnonymousNodeName());
+
+        return properties;
+    }
+
     @Override
     public void updateTracer() {
         if (amqpTransport.isTrace()) {
@@ -559,10 +585,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
         private final ActiveMQDestination destination;
         private boolean closed;
+        private final boolean anonymous;
 
-        public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
+        public ProducerContext(ProducerId producerId, ActiveMQDestination destination, boolean anonymous) {
             this.producerId = producerId;
             this.destination = destination;
+            this.anonymous = anonymous;
         }
 
         @Override
@@ -581,6 +609,17 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
 
                 if (destination != null) {
                     message.setJMSDestination(destination);
+                } else if (isAnonymous()) {
+                    Destination toDestination = message.getJMSDestination();
+                    if (toDestination == null || !(toDestination instanceof ActiveMQDestination)) {
+                        Rejected rejected = new Rejected();
+                        ErrorCondition condition = new ErrorCondition();
+                        condition.setCondition(Symbol.valueOf("failed"));
+                        condition.setDescription("Missing to field for message sent to an anonymous producer");
+                        rejected.setError(condition);
+                        delivery.disposition(rejected);
+                        return;
+                    }
                 }
                 message.setProducerId(producerId);
 
@@ -673,6 +712,10 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 sendToActiveMQ(new RemoveInfo(producerId), null);
             }
         }
+
+        public boolean isAnonymous() {
+            return anonymous;
+        }
     }
 
     long nextTransactionId = 1;
@@ -795,8 +838,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             } else {
                 Target target = (Target) remoteTarget;
                 ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
-                ActiveMQDestination dest;
-                if (target.getDynamic()) {
+                ActiveMQDestination dest = null;
+                boolean anonymous = false;
+
+                if (target.getAddress().equals(amqpTransport.getWireFormat().getAnonymousNodeName())) {
+                    anonymous = true;
+                } else if (target.getDynamic()) {
                     dest = createTempQueue();
                     Target actualTarget = new Target();
                     actualTarget.setAddress(dest.getQualifiedName());
@@ -806,10 +853,11 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                     dest = createDestination(remoteTarget);
                 }
 
-                ProducerContext producerContext = new ProducerContext(producerId, dest);
+                ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous);
 
                 receiver.setContext(producerContext);
                 receiver.flow(flow);
+
                 ProducerInfo producerInfo = new ProducerInfo(producerId);
                 producerInfo.setDestination(dest);
                 sendToActiveMQ(producerInfo, new ResponseHandler() {
@@ -1383,6 +1431,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         return condition;
     }
 
+    @Override
     public void setPrefetch(int prefetch) {
         this.prefetch = prefetch;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/78cb1120/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
index 779cb65..f6c2880 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWireFormat.java
@@ -40,6 +40,7 @@ public class AmqpWireFormat implements WireFormat {
     private int version = 1;
     private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
     private int maxAmqpFrameSize = NO_AMQP_MAX_FRAME_SIZE;
+    private String anonymousNodeName = "$relay";
 
     @Override
     public ByteSequence marshal(Object command) throws IOException {
@@ -126,4 +127,12 @@ public class AmqpWireFormat implements WireFormat {
     public void setMaxAmqpFrameSize(int maxAmqpFrameSize) {
         this.maxAmqpFrameSize = maxAmqpFrameSize;
     }
+
+    public String getAnonymousNodeName() {
+        return anonymousNodeName;
+    }
+
+    public void setAnonymousNodeName(String anonymousNodeName) {
+        this.anonymousNodeName = anonymousNodeName;
+    }
 }