You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/03 19:03:08 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5593

Repository: activemq
Updated Branches:
  refs/heads/master 741e3aad3 -> 2ec586f26


https://issues.apache.org/jira/browse/AMQ-5593

Add support for JMS mapping compliant temp topic and temp queue creation
as well as responding correctly to authorization errors if the
connection has no rights to create them.  Also cleans up code to use a
consistent createDestination implementation that uses the names only and
not attempt to interpret the client only destination annotations.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2ec586f2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2ec586f2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2ec586f2

Branch: refs/heads/master
Commit: 2ec586f2671488e1d9b6a48cc91e81046a480a54
Parents: 741e3aa
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Mar 3 13:02:36 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Mar 3 13:02:36 2015 -0500

----------------------------------------------------------------------
 .../transport/amqp/ActiveMQJMSVendor.java       | 137 -------------------
 .../transport/amqp/AmqpProtocolConverter.java   | 105 +++++++++-----
 .../message/AMQPNativeOutboundTransformer.java  |  23 ++--
 .../amqp/message/AMQPRawInboundTransformer.java |   2 +-
 .../amqp/message/ActiveMQJMSVendor.java         | 101 ++++++++++++++
 .../amqp/message/InboundTransformer.java        |  54 +-------
 .../transport/amqp/message/JMSVendor.java       |  11 +-
 .../transport/amqp/JMSClientSimpleAuthTest.java |  38 ++++-
 .../activemq/transport/amqp/JMSClientTest.java  |  92 +++++++++++++
 .../JMSMappingInboundTransformerTest.java       |   6 +-
 .../transport/amqp/simple-auth-amqp-broker.xml  |  26 +++-
 11 files changed, 344 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec586f2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
deleted file mode 100644
index c00a390..0000000
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMapMessage;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.ActiveMQStreamMessage;
-import org.apache.activemq.command.ActiveMQTempQueue;
-import org.apache.activemq.command.ActiveMQTempTopic;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.transport.amqp.message.JMSVendor;
-
-public class ActiveMQJMSVendor extends JMSVendor {
-
-    final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
-
-    private static final String PREFIX_MARKER = "://";
-
-    private ActiveMQJMSVendor() {
-    }
-
-    @Override
-    public BytesMessage createBytesMessage() {
-        return new ActiveMQBytesMessage();
-    }
-
-    @Override
-    public StreamMessage createStreamMessage() {
-        return new ActiveMQStreamMessage();
-    }
-
-    @Override
-    public Message createMessage() {
-        return new ActiveMQMessage();
-    }
-
-    @Override
-    public TextMessage createTextMessage() {
-        return new ActiveMQTextMessage();
-    }
-
-    @Override
-    public ObjectMessage createObjectMessage() {
-        return new ActiveMQObjectMessage();
-    }
-
-    @Override
-    public MapMessage createMapMessage() {
-        return new ActiveMQMapMessage();
-    }
-
-    @Override
-    public Destination createDestination(String name) {
-        return super.createDestination(name, Destination.class);
-    }
-
-    @Override
-    public <T extends Destination> T createDestination(String name, Class<T> kind) {
-        String destinationName = name;
-        int prefixEnd = name.lastIndexOf(PREFIX_MARKER);
-
-        if (prefixEnd >= 0) {
-            destinationName = name.substring(prefixEnd + PREFIX_MARKER.length());
-        }
-
-        if (kind == Queue.class) {
-            return kind.cast(new ActiveMQQueue(destinationName));
-        }
-        if (kind == Topic.class) {
-            return kind.cast(new ActiveMQTopic(destinationName));
-        }
-        if (kind == TemporaryQueue.class) {
-            return kind.cast(new ActiveMQTempQueue(destinationName));
-        }
-        if (kind == TemporaryTopic.class) {
-            return kind.cast(new ActiveMQTempTopic(destinationName));
-        }
-
-        return kind.cast(ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE));
-    }
-
-    @Override
-    public void setJMSXUserID(Message msg, String value) {
-        ((ActiveMQMessage) msg).setUserID(value);
-    }
-
-    @Override
-    public void setJMSXGroupID(Message msg, String value) {
-        ((ActiveMQMessage) msg).setGroupID(value);
-    }
-
-    @Override
-    public void setJMSXGroupSequence(Message msg, int value) {
-        ((ActiveMQMessage) msg).setGroupSequence(value);
-    }
-
-    @Override
-    public void setJMSXDeliveryCount(Message msg, long value) {
-        ((ActiveMQMessage) msg).setRedeliveryCounter((int) value);
-    }
-
-    @Override
-    public String toAddress(Destination dest) {
-        return ((ActiveMQDestination) dest).getQualifiedName();
-    }
-}

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec586f2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 1b86040..6800854 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -67,6 +67,7 @@ import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.store.PersistenceAdapterSupport;
 import org.apache.activemq.transport.amqp.message.AMQPNativeInboundTransformer;
 import org.apache.activemq.transport.amqp.message.AMQPRawInboundTransformer;
+import org.apache.activemq.transport.amqp.message.ActiveMQJMSVendor;
 import org.apache.activemq.transport.amqp.message.AutoOutboundTransformer;
 import org.apache.activemq.transport.amqp.message.EncodedMessage;
 import org.apache.activemq.transport.amqp.message.InboundTransformer;
@@ -130,6 +131,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
     private static final Symbol COPY = Symbol.getSymbol("copy");
     private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
     private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
+    private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
+    private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
 
     private final AmqpTransport amqpTransport;
     private final AmqpWireFormat amqpWireFormat;
@@ -700,13 +703,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             if (!closed) {
                 EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
                 final ActiveMQMessage message = (ActiveMQMessage) getInboundTransformer().transform(em);
-
-                // TODO - we need to cast TempTopic to TempQueue as we internally are using temp queues for all dynamic destinations
-                // we need to figure out how to support both queues and topics
-                if (message.getJMSReplyTo() != null && message.getJMSReplyTo() instanceof ActiveMQTempTopic) {
-                    ActiveMQTempTopic tempTopic = (ActiveMQTempTopic)message.getJMSReplyTo();
-                    message.setJMSReplyTo(new ActiveMQTempQueue(tempTopic.getPhysicalName()));
-                }
                 current = null;
 
                 if (destination != null) {
@@ -928,29 +924,29 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
             } else {
                 Target target = (Target) remoteTarget;
                 ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
-                ActiveMQDestination dest = null;
+                ActiveMQDestination destination = null;
                 boolean anonymous = false;
                 String targetNodeName = target.getAddress();
 
                 if ((targetNodeName == null || targetNodeName.length() == 0) && !target.getDynamic()) {
                     anonymous = true;
                 } else if (target.getDynamic()) {
-                    dest = createTempQueue();
+                    destination = createTemporaryDestination(receiver, target.getCapabilities());
                     Target actualTarget = new Target();
-                    actualTarget.setAddress(dest.getQualifiedName());
+                    actualTarget.setAddress(destination.getQualifiedName());
                     actualTarget.setDynamic(true);
                     receiver.setTarget(actualTarget);
                 } else {
-                    dest = createDestination(remoteTarget);
+                    destination = createDestination(remoteTarget);
                 }
 
-                final ProducerContext producerContext = new ProducerContext(producerId, dest, anonymous);
+                final ProducerContext producerContext = new ProducerContext(producerId, destination, anonymous);
 
                 receiver.setContext(producerContext);
                 receiver.flow(flow);
 
                 ProducerInfo producerInfo = new ProducerInfo(producerId);
-                producerInfo.setDestination(dest);
+                producerInfo.setDestination(destination);
                 sendToActiveMQ(producerInfo, new ResponseHandler() {
                     @Override
                     public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
@@ -979,25 +975,24 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         }
     }
 
-    private ActiveMQDestination createDestination(Object terminus) throws AmqpProtocolException {
-        if (terminus == null) {
+    private ActiveMQDestination createDestination(Object endpoint) throws AmqpProtocolException {
+        if (endpoint == null) {
             return null;
-        } else if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
-            org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) terminus;
-            if (source.getAddress() == null || source.getAddress().length() == 0) {
-                throw new AmqpProtocolException("amqp:invalid-field", "source address not set");
-            }
-            return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE);
-        } else if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Target) {
-            org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) terminus;
-            if (target.getAddress() == null || target.getAddress().length() == 0) {
-                throw new AmqpProtocolException("amqp:invalid-field", "target address not set");
-            }
-            return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE);
-        } else if (terminus instanceof Coordinator) {
+        } else if (endpoint instanceof Coordinator) {
             return null;
+        } else if (endpoint instanceof org.apache.qpid.proton.amqp.messaging.Terminus) {
+            org.apache.qpid.proton.amqp.messaging.Terminus terminus = (org.apache.qpid.proton.amqp.messaging.Terminus) endpoint;
+            if (terminus.getAddress() == null || terminus.getAddress().length() == 0) {
+                if (terminus instanceof org.apache.qpid.proton.amqp.messaging.Source) {
+                    throw new AmqpProtocolException("amqp:invalid-field", "source address not set");
+                } else {
+                    throw new AmqpProtocolException("amqp:invalid-field", "target address not set");
+                }
+            }
+
+            return ActiveMQDestination.createDestination(terminus.getAddress(), ActiveMQDestination.QUEUE_TYPE);
         } else {
-            throw new RuntimeException("Unexpected terminus type: " + terminus);
+            throw new RuntimeException("Unexpected terminus type: " + endpoint);
         }
     }
 
@@ -1415,7 +1410,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
                 }
             } else if (source.getDynamic()) {
                 // lets create a temp dest.
-                destination = createTempQueue();
+                destination = createTemporaryDestination(sender, source.getCapabilities());
                 source = new org.apache.qpid.proton.amqp.messaging.Source();
                 source.setAddress(destination.getQualifiedName());
                 source.setDynamic(true);
@@ -1517,17 +1512,59 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
         return result;
     }
 
-    private ActiveMQDestination createTempQueue() {
-        ActiveMQDestination rc;
-        rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
+    private ActiveMQDestination createTemporaryDestination(final Link link, Symbol[] capabilities) {
+        ActiveMQDestination rc = null;
+        if (contains(capabilities, TEMP_TOPIC_CAPABILITY)) {
+            rc = new ActiveMQTempTopic(connectionId, nextTempDestinationId++);
+        } else if (contains(capabilities, TEMP_QUEUE_CAPABILITY)) {
+            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
+        } else {
+            LOG.debug("Dynamic link request with no type capability, defaults to Temporary Queue");
+            rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);
+        }
+
         DestinationInfo info = new DestinationInfo();
         info.setConnectionId(connectionId);
         info.setOperationType(DestinationInfo.ADD_OPERATION_TYPE);
         info.setDestination(rc);
-        sendToActiveMQ(info, null);
+
+        sendToActiveMQ(info, new ResponseHandler() {
+
+            @Override
+            public void onResponse(IAmqpProtocolConverter converter, Response response) throws IOException {
+                if (response.isException()) {
+                    link.setSource(null);
+
+                    Throwable exception = ((ExceptionResponse) response).getException();
+                    if (exception instanceof SecurityException) {
+                        link.setCondition(new ErrorCondition(AmqpError.UNAUTHORIZED_ACCESS, exception.getMessage()));
+                    } else {
+                        link.setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, exception.getMessage()));
+                    }
+
+                    link.close();
+                    link.free();
+                }
+            }
+        });
+
         return rc;
     }
 
+    private boolean contains(Symbol[] symbols, Symbol key) {
+        if (symbols == null) {
+            return false;
+        }
+
+        for (Symbol symbol : symbols) {
+            if (symbol.equals(key)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     // //////////////////////////////////////////////////////////////////////////
     //
     // Implementation methods

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec586f2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
index ebe6fcc..9d008bf 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
@@ -37,17 +37,21 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
 
     @Override
     public EncodedMessage transform(Message msg) throws Exception {
-        if( msg == null )
+        if (msg == null) {
             return null;
-        if( !(msg instanceof BytesMessage) )
+        }
+        if (!(msg instanceof BytesMessage)) {
             return null;
+        }
+
         try {
-            if( !msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
+            if (!msg.getBooleanProperty(prefixVendor + "NATIVE")) {
                 return null;
             }
         } catch (MessageFormatException e) {
             return null;
         }
+
         return transform(this, (BytesMessage) msg);
     }
 
@@ -65,15 +69,15 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
 
         try {
             int count = msg.getIntProperty("JMSXDeliveryCount");
-            if( count > 1 ) {
+            if (count > 1) {
 
                 // decode...
                 ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create();
                 int offset = 0;
                 int len = data.length;
-                while( len > 0 ) {
+                while (len > 0) {
                     final int decoded = amqp.decode(data, offset, len);
-                    assert decoded > 0: "Make progress decoding the message";
+                    assert decoded > 0 : "Make progress decoding the message";
                     offset += decoded;
                     len -= decoded;
                 }
@@ -84,11 +88,11 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
                 amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1));
 
                 // Re-encode...
-                ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
+                ByteBuffer buffer = ByteBuffer.wrap(new byte[1024 * 4]);
                 final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
                 int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
-                if( overflow.position() > 0 ) {
-                    buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
+                if (overflow.position() > 0) {
+                    buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]);
                     c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
                 }
                 data = buffer.array();
@@ -99,5 +103,4 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
 
         return new EncodedMessage(messageFormat, data, 0, dataSize);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec586f2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
index 20703d7..5742a76 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPRawInboundTransformer.java
@@ -35,7 +35,7 @@ public class AMQPRawInboundTransformer extends InboundTransformer {
 
         final long now = System.currentTimeMillis();
         rc.setJMSTimestamp(now);
-        if( defaultTtl > 0 ) {
+        if (defaultTtl > 0) {
             rc.setJMSExpiration(now + defaultTtl);
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec586f2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
new file mode 100644
index 0000000..a976240
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/ActiveMQJMSVendor.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.message;
+
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.StreamMessage;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMapMessage;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQObjectMessage;
+import org.apache.activemq.command.ActiveMQStreamMessage;
+import org.apache.activemq.command.ActiveMQTextMessage;
+
+public class ActiveMQJMSVendor implements JMSVendor {
+
+    final public static ActiveMQJMSVendor INSTANCE = new ActiveMQJMSVendor();
+
+    private ActiveMQJMSVendor() {
+    }
+
+    @Override
+    public BytesMessage createBytesMessage() {
+        return new ActiveMQBytesMessage();
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() {
+        return new ActiveMQStreamMessage();
+    }
+
+    @Override
+    public Message createMessage() {
+        return new ActiveMQMessage();
+    }
+
+    @Override
+    public TextMessage createTextMessage() {
+        return new ActiveMQTextMessage();
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() {
+        return new ActiveMQObjectMessage();
+    }
+
+    @Override
+    public MapMessage createMapMessage() {
+        return new ActiveMQMapMessage();
+    }
+
+    @Override
+    public Destination createDestination(String name) {
+        return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
+    }
+
+    @Override
+    public void setJMSXUserID(Message msg, String value) {
+        ((ActiveMQMessage) msg).setUserID(value);
+    }
+
+    @Override
+    public void setJMSXGroupID(Message msg, String value) {
+        ((ActiveMQMessage) msg).setGroupID(value);
+    }
+
+    @Override
+    public void setJMSXGroupSequence(Message msg, int value) {
+        ((ActiveMQMessage) msg).setGroupSequence(value);
+    }
+
+    @Override
+    public void setJMSXDeliveryCount(Message msg, long value) {
+        ((ActiveMQMessage) msg).setRedeliveryCounter((int) value);
+    }
+
+    @Override
+    public String toAddress(Destination dest) {
+        return ((ActiveMQDestination) dest).getQualifiedName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec586f2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index a346899..e8ac740 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -16,19 +16,12 @@
  */
 package org.apache.activemq.transport.amqp.message;
 
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
 import javax.jms.DeliveryMode;
-import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.Queue;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.Topic;
 
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Decimal128;
@@ -141,19 +134,12 @@ public abstract class InboundTransformer {
             }
         }
 
-        Class<? extends Destination> toAttributes = Destination.class;
-        Class<? extends Destination> replyToAttributes = Destination.class;
-
         final MessageAnnotations ma = amqp.getMessageAnnotations();
         if (ma != null) {
             for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
                 String key = entry.getKey().toString();
                 if ("x-opt-jms-type".equals(key.toString()) && entry.getValue() != null) {
                     jms.setJMSType(entry.getValue().toString());
-                } else if ("x-opt-to-type".equals(key.toString())) {
-                    toAttributes = toClassFromAttributes(entry.getValue().toString());
-                } else if ("x-opt-reply-type".equals(key.toString())) {
-                    replyToAttributes = toClassFromAttributes(entry.getValue().toString());
                 } else {
                     setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
                 }
@@ -186,13 +172,13 @@ public abstract class InboundTransformer {
                 vendor.setJMSXUserID(jms, new String(userId.getArray(), userId.getArrayOffset(), userId.getLength(), "UTF-8"));
             }
             if (properties.getTo() != null) {
-                jms.setJMSDestination(vendor.createDestination(properties.getTo(), toAttributes));
+                jms.setJMSDestination(vendor.createDestination(properties.getTo()));
             }
             if (properties.getSubject() != null) {
                 jms.setStringProperty(prefixVendor + "Subject", properties.getSubject());
             }
             if (properties.getReplyTo() != null) {
-                jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo(), replyToAttributes));
+                jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
             }
             if (properties.getCorrelationId() != null) {
                 jms.setJMSCorrelationID(properties.getCorrelationId().toString());
@@ -243,42 +229,6 @@ public abstract class InboundTransformer {
         }
     }
 
-    private static final Set<String> QUEUE_ATTRIBUTES = createSet("queue");
-    private static final Set<String> TOPIC_ATTRIBUTES = createSet("topic");
-    private static final Set<String> TEMP_QUEUE_ATTRIBUTES = createSet("queue", "temporary");
-    private static final Set<String> TEMP_TOPIC_ATTRIBUTES = createSet("topic", "temporary");
-
-    private static Set<String> createSet(String... args) {
-        HashSet<String> s = new HashSet<String>();
-        for (String arg : args) {
-            s.add(arg);
-        }
-        return Collections.unmodifiableSet(s);
-    }
-
-    Class<? extends Destination> toClassFromAttributes(String value) {
-        if( value ==null ) {
-            return null;
-        }
-        HashSet<String> attributes = new HashSet<String>();
-        for( String x: value.split("\\s*,\\s*") ) {
-            attributes.add(x);
-        }
-         if( QUEUE_ATTRIBUTES.equals(attributes) ) {
-            return Queue.class;
-        }
-        if( TOPIC_ATTRIBUTES.equals(attributes) ) {
-            return Topic.class;
-        }
-        if( TEMP_QUEUE_ATTRIBUTES.equals(attributes) ) {
-            return TemporaryQueue.class;
-        }
-        if( TEMP_TOPIC_ATTRIBUTES.equals(attributes) ) {
-            return TemporaryTopic.class;
-        }
-        return Destination.class;
-    }
-
     private void setProperty(Message msg, String key, Object value) throws JMSException {
         if (value instanceof UnsignedLong) {
             long v = ((UnsignedLong) value).longValue();

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec586f2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
index 0fdee0d..f9169ec 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSVendor.java
@@ -24,7 +24,7 @@ import javax.jms.ObjectMessage;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 
-abstract public class JMSVendor {
+public interface JMSVendor {
 
     public abstract BytesMessage createBytesMessage();
 
@@ -40,14 +40,7 @@ abstract public class JMSVendor {
 
     public abstract void setJMSXUserID(Message message, String value);
 
-    @Deprecated
-    public Destination createDestination(String name) {
-        return null;
-    }
-
-    public <T extends Destination> T createDestination(String name, Class<T> kind) {
-        return kind.cast(createDestination(name));
-    }
+    public Destination createDestination(String name);
 
     public abstract void setJMSXGroupID(Message message, String groupId);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec586f2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
index 2f559e6..d710be0 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSimpleAuthTest.java
@@ -134,7 +134,7 @@ public class JMSClientSimpleAuthTest {
     public void testSendReceive() throws Exception {
         Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = session.createQueue("txQueue");
+        Queue queue = session.createQueue("USERS.txQueue");
         MessageProducer p = session.createProducer(queue);
         TextMessage message = null;
         message = session.createTextMessage();
@@ -153,6 +153,42 @@ public class JMSClientSimpleAuthTest {
         connection.close();
     }
 
+    @Test(timeout = 30000)
+    public void testCreateTemporaryQueueNotAuthorized() throws JMSException {
+        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        try {
+            session.createTemporaryQueue();
+        } catch (JMSSecurityException jmsse) {
+        } catch (JMSException jmse) {
+            LOG.info("Client should have thrown a JMSSecurityException but only threw JMSException");
+        }
+
+        // Should not be fatal
+        assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
+
+        session.close();
+    }
+
+    @Test(timeout = 30000)
+    public void testCreateTemporaryTopicNotAuthorized() throws JMSException {
+        Connection connection = JMSClientContext.INSTANCE.createConnection(amqpURI, "user", "userPassword");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        try {
+            session.createTemporaryTopic();
+        } catch (JMSSecurityException jmsse) {
+        } catch (JMSException jmse) {
+            LOG.info("Client should have thrown a JMSSecurityException but only threw JMSException");
+        }
+
+        // Should not be fatal
+        assertNotNull(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
+
+        session.close();
+    }
+
     protected BrokerService createBroker() throws Exception {
         return createBroker(SIMPLE_AUTH_AMQP_BROKER_XML);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec586f2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index d56359c..87553e2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -43,6 +43,8 @@ import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
@@ -55,6 +57,7 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
 import org.apache.activemq.util.Wait;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.objectweb.jtests.jms.framework.TestConfig;
 import org.slf4j.Logger;
@@ -1048,6 +1051,95 @@ public class JMSClientTest extends JMSClientTestSupport {
         consumer.close();
     }
 
+    @Test(timeout=30000)
+    public void testCreateTemporaryQueue() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createTemporaryQueue();
+            assertNotNull(queue);
+            assertTrue(queue instanceof TemporaryQueue);
+
+            final BrokerViewMBean broker = getProxyToBroker();
+            assertEquals(1, broker.getTemporaryQueues().length);
+        }
+    }
+
+    @Ignore("Broker cannot currently tell if it should delete a temp destination")
+    @Test(timeout=30000)
+    public void testDeleteTemporaryQueue() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createTemporaryQueue();
+            assertNotNull(queue);
+            assertTrue(queue instanceof TemporaryQueue);
+
+            final BrokerViewMBean broker = getProxyToBroker();
+            assertEquals(1, broker.getTemporaryQueues().length);
+
+            TemporaryQueue tempQueue = (TemporaryQueue) queue;
+            tempQueue.delete();
+
+            assertTrue("Temp Queue should be deleted.", Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return broker.getTemporaryQueues().length == 0;
+                }
+            }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
+        }
+    }
+
+    @Ignore("Legacy QPid client does not support creation of TemporaryTopics correctly")
+    @Test(timeout=30000)
+    public void testCreateTemporaryTopic() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic topic = session.createTemporaryTopic();
+            assertNotNull(topic);
+            assertTrue(topic instanceof TemporaryTopic);
+
+            final BrokerViewMBean broker = getProxyToBroker();
+            assertEquals(1, broker.getTemporaryTopics().length);
+        }
+    }
+
+    @Ignore("Broker cannot currently tell if it should delete a temp destination")
+    @Test(timeout=30000)
+    public void testDeleteTemporaryTopic() throws Exception {
+        ActiveMQAdmin.enableJMSFrameTracing();
+
+        connection = createConnection();
+        {
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Topic topic = session.createTemporaryTopic();
+            assertNotNull(topic);
+            assertTrue(topic instanceof TemporaryTopic);
+
+            final BrokerViewMBean broker = getProxyToBroker();
+            assertEquals(1, broker.getTemporaryTopics().length);
+
+            TemporaryTopic tempTopic = (TemporaryTopic) topic;
+            tempTopic.delete();
+
+            assertTrue("Temp Topic should be deleted.", Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return broker.getTemporaryTopics().length == 0;
+                }
+            }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
+        }
+    }
+
     protected void receiveMessages(MessageConsumer consumer) throws Exception {
         for (int i = 0; i < 10; i++) {
             Message message = consumer.receive(1000);

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec586f2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
index 1d66a40..7633e5c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/message/JMSMappingInboundTransformerTest.java
@@ -107,7 +107,8 @@ public class JMSMappingInboundTransformerTest {
 
         // Verify that createDestination was called with the provided 'to'
         // address and 'Destination' class
-        Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
+        // TODO - No need to really test this bit ?
+        // Mockito.verify(mockVendor).createDestination(toAddress, expectedClass);
     }
 
     // ======= JMSReplyTo Handling =========
@@ -160,7 +161,8 @@ public class JMSMappingInboundTransformerTest {
 
         // Verify that createDestination was called with the provided 'replyTo'
         // address and 'Destination' class
-        Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass);
+        // TODO - No need to really test this bit ?
+        // Mockito.verify(mockVendor).createDestination(replyToAddress, expectedClass);
     }
 
     // ======= Utility Methods =========

http://git-wip-us.apache.org/repos/asf/activemq/blob/2ec586f2/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml b/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml
index ca16a7d..9739386 100644
--- a/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml
+++ b/activemq-amqp/src/test/resources/org/apache/activemq/transport/amqp/simple-auth-amqp-broker.xml
@@ -31,11 +31,6 @@
       <queue physicalName="TEST.Q" />
     </destinations>
 
-    <!-- Use a non-default port in case the default port is in use -->
-    <managementContext>
-      <managementContext connectorPort="1199"/>
-    </managementContext>
-
     <transportConnectors>
       <transportConnector name="openwire" uri="vm://localhost" />
       <transportConnector name="amqp" uri="amqp://0.0.0.0:0"/>
@@ -49,6 +44,27 @@
           <authenticationUser username="guest" password="guestPassword" groups="guests"/>
         </users>
       </simpleAuthenticationPlugin>
+
+      <authorizationPlugin>
+        <map>
+          <authorizationMap>
+            <authorizationEntries>
+              <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
+              <authorizationEntry queue="USERS.>" read="users" write="users" admin="users" />
+              <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
+
+              <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
+              <authorizationEntry topic="USERS.>" read="users" write="users" admin="users" />
+              <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users" />
+
+              <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/>
+            </authorizationEntries>
+            <tempDestinationAuthorizationEntry>
+                <tempDestinationAuthorizationEntry read="admins" write="admins" admin="admins"/>
+            </tempDestinationAuthorizationEntry>
+          </authorizationMap>
+        </map>
+      </authorizationPlugin>
     </plugins>
   </broker>