You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2015/04/22 13:22:27 UTC

[1/3] activemq-6 git commit: Queue Auto-create fixes on OpenWire

Repository: activemq-6
Updated Branches:
  refs/heads/master aa638197c -> 47edcd401


Queue Auto-create fixes on OpenWire

this is basically addressing a performance issue on OpenWire, setting the auto-create to the PostOffice
after not being able to route

The core protocol stays the same in regard to the auto-create since the exceptions are happening after the queueQuery


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/95b63289
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/95b63289
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/95b63289

Branch: refs/heads/master
Commit: 95b6328993b8215262cb49e0bc1e5999d3537e12
Parents: aa63819
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Apr 21 18:00:05 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Apr 21 18:00:07 2015 -0400

----------------------------------------------------------------------
 .../activemq/api/core/client/ClientSession.java | 19 ++++++++
 .../activemq/jms/client/ActiveMQConnection.java |  6 +--
 .../jms/client/ActiveMQMessageProducer.java     | 14 +++---
 .../management/impl/JMSServerControlImpl.java   |  9 ++--
 .../jms/server/impl/JMSServerManagerImpl.java   | 41 ++++++++++++++++-
 .../plug/ProtonSessionIntegrationCallback.java  |  3 +-
 .../protocol/openwire/OpenWireConnection.java   | 47 +++++++-------------
 .../protocol/openwire/amq/AMQServerSession.java |  7 ++-
 .../openwire/amq/AMQServerSessionFactory.java   |  5 ++-
 .../core/protocol/openwire/amq/AMQSession.java  | 13 +++---
 .../protocol/stomp/StompProtocolManager.java    |  4 +-
 .../activemq/ra/inflow/ActiveMQActivation.java  |  4 +-
 .../activemq/core/postoffice/PostOffice.java    | 11 ++---
 .../core/postoffice/impl/PostOfficeImpl.java    | 35 +++++++++++----
 .../core/impl/ActiveMQPacketHandler.java        |  2 +-
 .../activemq/core/server/ActiveMQServer.java    | 16 ++++++-
 .../activemq/core/server/QueueCreator.java      | 32 +++++++++++++
 .../activemq/core/server/ServerSession.java     |  2 +
 .../core/server/ServerSessionFactory.java       |  2 +-
 .../core/server/impl/ActiveMQServerImpl.java    | 38 ++++++++++++++--
 .../activemq/core/server/impl/DivertImpl.java   |  2 +-
 .../activemq/core/server/impl/QueueImpl.java    |  4 +-
 .../core/server/impl/ServerSessionImpl.java     | 44 +++++++++++++-----
 .../management/impl/ManagementServiceImpl.java  |  2 +-
 .../vertx/IncomingVertxEventHandler.java        |  2 +-
 .../client/AutoCreateJmsQueueTest.java          | 32 +++++++++++++
 .../integration/client/HangConsumerTest.java    |  5 ++-
 .../integration/openwire/OpenWireTestBase.java  |  8 ++--
 .../openwire/SimpleOpenWireTest.java            | 27 +++++++++++
 .../openwire/amq/ProducerFlowControlTest.java   | 13 +++---
 .../core/server/impl/fakes/FakePostOffice.java  | 30 ++++---------
 31 files changed, 343 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java
----------------------------------------------------------------------
diff --git a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java
index f746915..8bf7a9a 100644
--- a/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java
+++ b/activemq-core-client/src/main/java/org/apache/activemq/api/core/client/ClientSession.java
@@ -27,6 +27,25 @@ import org.apache.activemq.api.core.SimpleString;
  */
 public interface ClientSession extends XAResource, AutoCloseable
 {
+
+   /**
+    * This is used to identify a ClientSession as used by the JMS Layer
+    * The JMS Layer will add this through Meta-data, so the server or management layers
+    * can identify session created over core API purely or through the JMS Layer
+    */
+   String JMS_SESSION_IDENTIFIER_PROPERTY = "jms-session";
+
+
+   /**
+    * Just like {@link org.apache.activemq.api.core.client.ClientSession.AddressQuery#JMS_SESSION_IDENTIFIER_PROPERTY} this is
+    * used to identify the ClientID over JMS Session.
+    * However this is only used when the JMS Session.clientID is set (which is optional).
+    * With this property management tools and the server can identify the jms-client-id used over JMS
+    */
+   String JMS_SESSION_CLIENT_ID_PROPERTY = "jms-client-id";
+
+
+
    /**
     * Information returned by a binding query
     *

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java
index 2017d08..49fdd47 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQConnection.java
@@ -230,7 +230,7 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
 
       try
       {
-         initialSession.addUniqueMetaData("jms-client-id", clientID);
+         initialSession.addUniqueMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
       }
       catch (ActiveMQException e)
       {
@@ -732,10 +732,10 @@ public class ActiveMQConnection extends ActiveMQConnectionForContextImpl impleme
 
    private void addSessionMetaData(ClientSession session) throws ActiveMQException
    {
-      session.addMetaData("jms-session", "");
+      session.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, "");
       if (clientID != null)
       {
-         session.addMetaData("jms-client-id", clientID);
+         session.addMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java
index 5f13aaf..94ff1c7 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQMessageProducer.java
@@ -409,16 +409,12 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
             try
             {
                ClientSession.AddressQuery query = clientSession.addressQuery(address);
-               if (!query.isExists())
+
+               // if it's autoCreateJMSQueue we will let the PostOffice.route to execute the creation at the server's side
+               // as that's a more efficient path for such operation
+               if (!query.isExists() && !query.isAutoCreateJmsQueues())
                {
-                  if (query.isAutoCreateJmsQueues())
-                  {
-                     clientSession.createQueue(address, address, true);
-                  }
-                  else
-                  {
-                     throw new InvalidDestinationException("Destination " + address + " does not exist");
-                  }
+                  throw new InvalidDestinationException("Destination " + address + " does not exist");
                }
                else
                {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java
index 2d8b46e..8f16d76 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSServerControlImpl.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.activemq.api.core.client.ClientSession;
 import org.apache.activemq.api.core.management.Parameter;
 import org.apache.activemq.api.jms.JMSFactoryType;
 import org.apache.activemq.api.jms.management.ConnectionFactoryControl;
@@ -737,9 +738,10 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo
 
          Map<Object, ServerSession> jmsSessions = new HashMap<Object, ServerSession>();
 
+         // First separate the real jms sessions, after all we are only interested in those here on the *jms* server controller
          for (ServerSession session : sessions)
          {
-            if (session.getMetaData("jms-session") != null)
+            if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null)
             {
                jmsSessions.put(session.getConnectionID(), session);
             }
@@ -754,7 +756,8 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo
                obj.put("connectionID", connection.getID().toString());
                obj.put("clientAddress", connection.getRemoteAddress());
                obj.put("creationTime", connection.getCreationTime());
-               obj.put("clientID", session.getMetaData("jms-client-id"));
+               // Notice: this will be null when the user haven't set the client-id
+               obj.put("clientID", session.getMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY));
                obj.put("principal", session.getUsername());
                array.put(obj);
             }
@@ -986,7 +989,7 @@ public class JMSServerControlImpl extends AbstractControl implements JMSServerCo
 
    public String closeConnectionWithClientID(final String clientID) throws Exception
    {
-      return server.getActiveMQServer().destroyConnectionWithSessionMetadata("jms-client-id", clientID);
+      return server.getActiveMQServer().destroyConnectionWithSessionMetadata(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
    }
 
    private JSONObject toJSONObject(ServerConsumer consumer) throws Exception

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java
index e0cff5f..ad74b12 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/server/impl/JMSServerManagerImpl.java
@@ -18,7 +18,6 @@ package org.apache.activemq.jms.server.impl;
 
 import javax.naming.NamingException;
 import javax.transaction.xa.Xid;
-
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -50,6 +49,7 @@ import org.apache.activemq.core.security.Role;
 import org.apache.activemq.core.server.ActivateCallback;
 import org.apache.activemq.core.server.ActiveMQServer;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.core.server.management.Notification;
 import org.apache.activemq.core.settings.impl.AddressSettings;
@@ -62,9 +62,9 @@ import org.apache.activemq.jms.client.ActiveMQQueue;
 import org.apache.activemq.jms.client.ActiveMQTopic;
 import org.apache.activemq.jms.client.SelectorTranslator;
 import org.apache.activemq.jms.persistence.JMSStorageManager;
+import org.apache.activemq.jms.persistence.config.PersistedBindings;
 import org.apache.activemq.jms.persistence.config.PersistedConnectionFactory;
 import org.apache.activemq.jms.persistence.config.PersistedDestination;
-import org.apache.activemq.jms.persistence.config.PersistedBindings;
 import org.apache.activemq.jms.persistence.config.PersistedType;
 import org.apache.activemq.jms.persistence.impl.journal.JMSJournalStorageManagerImpl;
 import org.apache.activemq.jms.persistence.impl.nullpm.NullJMSStorageManagerImpl;
@@ -400,6 +400,8 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
          return;
       }
 
+      server.setJMSQueueCreator(new JMSQueueCreator());
+
       server.registerActivateCallback(this);
       /**
        * See this method's javadoc.
@@ -491,6 +493,16 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
                                            final boolean durable,
                                            final String... bindings) throws Exception
    {
+      return internalCreateJMSQueue(storeConfig, queueName, selectorString, durable, false, bindings);
+   }
+
+   protected boolean internalCreateJMSQueue(final boolean storeConfig,
+                                         final String queueName,
+                                         final String selectorString,
+                                         final boolean durable,
+                                         final boolean autoCreated,
+                                         final String... bindings) throws Exception
+   {
 
       if (active && queues.get(queueName) != null)
       {
@@ -1881,4 +1893,29 @@ public class JMSServerManagerImpl implements JMSServerManager, ActivateCallback
       }
    }
 
+
+
+
+   class JMSQueueCreator implements QueueCreator
+   {
+      private final SimpleString PREFIX = SimpleString.toSimpleString("jms.queue");
+      @Override
+      public boolean create(SimpleString address) throws Exception
+      {
+         AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
+         if (address.startsWith(PREFIX) && settings.isAutoCreateJmsQueues())
+         {
+            // stopped here... finish here
+            JMSServerManagerImpl.this.internalCreateJMSQueue(false, address.toString().substring(PREFIX.toString().length() + 1), null, true, true);
+            return true;
+         }
+         else
+         {
+            return false;
+         }
+      }
+   }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index e049175..000cc25 100644
--- a/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/activemq-protocols/activemq-amqp-protocol/src/main/java/org/apache/activemq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -105,7 +105,8 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
                                                         true, //boolean xa,
                                                         (String) null,
                                                         this,
-                                                        null);
+                                                        null,
+                                                        true);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
index 4130d6e..b096381 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.core.protocol.openwire;
 
+import javax.jms.InvalidDestinationException;
+import javax.jms.JMSSecurityException;
+import javax.jms.ResourceAllocationException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -31,15 +34,12 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSSecurityException;
-import javax.jms.ResourceAllocationException;
-
 import org.apache.activemq.api.core.ActiveMQBuffer;
 import org.apache.activemq.api.core.ActiveMQBuffers;
 import org.apache.activemq.api.core.ActiveMQException;
 import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.api.core.ActiveMQSecurityException;
+import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.BrokerInfo;
@@ -74,19 +74,6 @@ import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.core.server.QueueQueryResult;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.state.CommandVisitor;
-import org.apache.activemq.state.ConnectionState;
-import org.apache.activemq.state.ConsumerState;
-import org.apache.activemq.state.ProducerState;
-import org.apache.activemq.state.SessionState;
-import org.apache.activemq.thread.TaskRunner;
-import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.TransmitCallback;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.core.protocol.openwire.amq.AMQBrokerStoppedException;
 import org.apache.activemq.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
@@ -101,10 +88,21 @@ import org.apache.activemq.core.protocol.openwire.amq.AMQTransportConnectionStat
 import org.apache.activemq.core.remoting.CloseListener;
 import org.apache.activemq.core.remoting.FailureListener;
 import org.apache.activemq.core.server.ActiveMQServerLogger;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.spi.core.remoting.Acceptor;
 import org.apache.activemq.spi.core.remoting.Connection;
+import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.state.ConnectionState;
+import org.apache.activemq.state.ConsumerState;
+import org.apache.activemq.state.ProducerState;
+import org.apache.activemq.state.SessionState;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.TransmitCallback;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.utils.ConcurrentHashSet;
+import org.apache.activemq.wireformat.WireFormat;
 
 /**
  * Represents an activemq connection.
@@ -1403,12 +1401,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
 
          if (producerExchange.canDispatch(messageSend))
          {
-            if (messageSend.getDestination().isQueue())
-            {
-               SimpleString queueName = OpenWireUtil.toCoreAddress(messageSend.getDestination());
-               autoCreateQueueIfPossible(queueName, session);
-            }
-
             SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
             if (result.isBlockNextSend())
             {
@@ -1458,15 +1450,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
       return resp;
    }
 
-   public void autoCreateQueueIfPossible(SimpleString queueName, AMQSession session) throws Exception
-   {
-      QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
-      if (result.isAutoCreateJmsQueues() && !result.isExists())
-      {
-         session.getCoreServer().createQueue(queueName, queueName, null, false, false, true);
-      }
-   }
-
    private AMQProducerBrokerExchange getProducerBrokerExchange(ProducerId id) throws IOException
    {
       AMQProducerBrokerExchange result = producerExchanges.get(id);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java
index 2ea2286..171034c 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSession.java
@@ -44,6 +44,7 @@ import org.apache.activemq.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.core.server.ActiveMQServerLogger;
 import org.apache.activemq.core.server.MessageReference;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.ServerConsumer;
 import org.apache.activemq.core.server.ServerMessage;
 import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
@@ -72,6 +73,7 @@ public class AMQServerSession extends ServerSessionImpl
          SecurityStore securityStore, ManagementService managementService,
          ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress,
          SimpleString simpleString, SessionCallback callback,
+         QueueCreator queueCreator,
          OperationContext context) throws Exception
    {
       super(name, username, password,
@@ -83,7 +85,8 @@ public class AMQServerSession extends ServerSessionImpl
          securityStore, managementService,
          activeMQServerImpl, managementAddress,
          simpleString, callback,
-         context, new AMQTransactionFactory());
+         context, new AMQTransactionFactory(),
+         queueCreator);
    }
 
    //create a fake session just for security check
@@ -387,7 +390,7 @@ public class AMQServerSession extends ServerSessionImpl
 
       try
       {
-         postOffice.route(msg, routingContext, direct);
+         postOffice.route(msg, getQueueCreator(), routingContext, direct);
 
          Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java
index 088db65..908eded 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQServerSessionFactory.java
@@ -21,6 +21,7 @@ import org.apache.activemq.core.persistence.OperationContext;
 import org.apache.activemq.core.persistence.StorageManager;
 import org.apache.activemq.core.postoffice.PostOffice;
 import org.apache.activemq.core.security.SecurityStore;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.ServerSessionFactory;
 import org.apache.activemq.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.core.server.impl.ServerSessionImpl;
@@ -41,13 +42,13 @@ public class AMQServerSessionFactory implements ServerSessionFactory
          PostOffice postOffice, ResourceManager resourceManager,
          SecurityStore securityStore, ManagementService managementService,
          ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress,
-         SimpleString simpleString, SessionCallback callback,
+         SimpleString simpleString, SessionCallback callback, QueueCreator queueCreator,
          OperationContext context) throws Exception
    {
       return new AMQServerSession(name, username, password, minLargeMessageSize, autoCommitSends,
             autoCommitAcks, preAcknowledge, persistDeliveryCountBeforeDelivery, xa,
             connection, storageManager, postOffice, resourceManager, securityStore,
-            managementService, activeMQServerImpl, managementAddress, simpleString, callback,
+            managementService, activeMQServerImpl, managementAddress, simpleString, callback, queueCreator,
             context);
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
index 54dc8cb..b692183 100644
--- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
+++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/amq/AMQSession.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.core.protocol.openwire.amq;
 
+import javax.transaction.xa.Xid;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -28,8 +29,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.transaction.xa.Xid;
-
+import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
@@ -46,9 +46,6 @@ import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.command.TransactionInfo;
 import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.core.server.ActiveMQServerLogger;
-import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.core.paging.impl.PagingStoreImpl;
 import org.apache.activemq.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.core.protocol.openwire.OpenWireMessageConverter;
@@ -56,12 +53,14 @@ import org.apache.activemq.core.protocol.openwire.OpenWireProtocolManager;
 import org.apache.activemq.core.protocol.openwire.OpenWireUtil;
 import org.apache.activemq.core.protocol.openwire.SendingResult;
 import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.ActiveMQServerLogger;
 import org.apache.activemq.core.server.ServerConsumer;
 import org.apache.activemq.core.server.ServerMessage;
 import org.apache.activemq.core.server.impl.ServerMessageImpl;
 import org.apache.activemq.core.transaction.impl.XidImpl;
 import org.apache.activemq.spi.core.protocol.SessionCallback;
 import org.apache.activemq.spi.core.remoting.ReadyListener;
+import org.apache.activemq.wireformat.WireFormat;
 
 public class AMQSession implements SessionCallback
 {
@@ -109,7 +108,7 @@ public class AMQSession implements SessionCallback
       {
          coreSession = (AMQServerSession) server.createSession(name, username, password,
                minLargeMessageSize, connection, true, false, false, false,
-               null, this, new AMQServerSessionFactory());
+               null, this, new AMQServerSessionFactory(), true);
 
          long sessionId = sessInfo.getSessionId().getValue();
          if (sessionId == -1)
@@ -143,7 +142,7 @@ public class AMQSession implements SessionCallback
          if (d.isQueue())
          {
             SimpleString queueName = OpenWireUtil.toCoreAddress(d);
-            connection.autoCreateQueueIfPossible(queueName, this);
+            getCoreServer().getJMSQueueCreator().create(queueName);
          }
          AMQConsumer consumer = new AMQConsumer(this, d, info);
          consumer.init();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
index 3780180..a2a8b44 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
@@ -274,7 +274,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
                                                       false,
                                                       false,
                                                       null,
-                                                      stompSession, null);
+                                                      stompSession, null, true);
          stompSession.setServerSession(session);
          sessions.put(connection.getID(), stompSession);
       }
@@ -299,7 +299,7 @@ class StompProtocolManager implements ProtocolManager<StompFrameInterceptor>, No
                                                       false,
                                                       false,
                                                       null,
-                                                      stompSession, null);
+                                                      stompSession, null, true);
          stompSession.setServerSession(session);
          transactedSessions.put(txID, stompSession);
       }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
index 3c83209..80811c0 100644
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/inflow/ActiveMQActivation.java
@@ -561,11 +561,11 @@ public class ActiveMQActivation
                                    spec.getTransactionTimeout());
 
          result.addMetaData("resource-adapter", "inbound");
-         result.addMetaData("jms-session", "");
+         result.addMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY, "");
          String clientID = ra.getClientID() == null ? spec.getClientID() : ra.getClientID();
          if (clientID != null)
          {
-            result.addMetaData("jms-client-id", clientID);
+            result.addMetaData(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY, clientID);
          }
 
          ActiveMQRALogger.LOGGER.debug("Using queue connection " + result);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java
index 761e77f..fdc8044 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/PostOffice.java
@@ -24,6 +24,7 @@ import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.core.server.ActiveMQComponent;
 import org.apache.activemq.core.server.MessageReference;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.RoutingContext;
 import org.apache.activemq.core.server.ServerMessage;
 import org.apache.activemq.core.transaction.Transaction;
@@ -66,15 +67,15 @@ public interface PostOffice extends ActiveMQComponent
 
    Map<SimpleString, Binding> getAllBindings();
 
-   void route(ServerMessage message, boolean direct) throws Exception;
+   void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception;
 
-   void route(ServerMessage message, Transaction tx, boolean direct) throws Exception;
+   void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct) throws Exception;
 
-   void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception;
+   void route(ServerMessage message, QueueCreator queueCreator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception;
 
-   void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception;
+   void route(ServerMessage message, QueueCreator queueCreator, RoutingContext context, boolean direct) throws Exception;
 
-   void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception;
+   void route(ServerMessage message, QueueCreator queueCreator, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception;
 
    MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java
index 1c8c811..1294a39 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/postoffice/impl/PostOfficeImpl.java
@@ -61,6 +61,7 @@ import org.apache.activemq.core.server.ActiveMQServerLogger;
 import org.apache.activemq.core.server.LargeServerMessage;
 import org.apache.activemq.core.server.MessageReference;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.QueueFactory;
 import org.apache.activemq.core.server.RouteContextList;
 import org.apache.activemq.core.server.RoutingContext;
@@ -81,6 +82,10 @@ import org.apache.activemq.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.utils.TypedProperties;
 import org.apache.activemq.utils.UUIDGenerator;
 
+/**
+ * This is the class that will make the routing to Queues and decide which consumer will get the messages
+ * It's the queue component on distributing the messages * *
+ */
 public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory
 {
    private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@@ -605,30 +610,32 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
       return addressManager.getBindings();
    }
 
-   public void route(final ServerMessage message, final boolean direct) throws Exception
+   public void route(final ServerMessage message, QueueCreator queueCreator, final boolean direct) throws Exception
    {
-      route(message, (Transaction) null, direct);
+      route(message, queueCreator, (Transaction) null, direct);
    }
 
-   public void route(final ServerMessage message, final Transaction tx, final boolean direct) throws Exception
+   public void route(final ServerMessage message, QueueCreator queueCreator, final Transaction tx, final boolean direct) throws Exception
    {
-      route(message, new RoutingContextImpl(tx), direct);
+      route(message, queueCreator, new RoutingContextImpl(tx), direct);
    }
 
    public void route(final ServerMessage message,
+                     final QueueCreator queueCreator,
                      final Transaction tx,
                      final boolean direct,
                      final boolean rejectDuplicates) throws Exception
    {
-      route(message, new RoutingContextImpl(tx), direct, rejectDuplicates);
+      route(message, queueCreator, new RoutingContextImpl(tx), direct, rejectDuplicates);
    }
 
-   public void route(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
+   public void route(final ServerMessage message, final QueueCreator queueCreator, final RoutingContext context, final boolean direct) throws Exception
    {
-      route(message, context, direct, true);
+      route(message, queueCreator, context, direct, true);
    }
 
    public void route(final ServerMessage message,
+                     final QueueCreator queueCreator,
                      final RoutingContext context,
                      final boolean direct,
                      boolean rejectDuplicates) throws Exception
@@ -661,6 +668,18 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
       Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
 
+      // first check for the auto-queue creation thing
+      if (bindings == null && queueCreator != null)
+      {
+         // There is no queue with this address, we will check if it needs to be created
+         if (queueCreator.create(address))
+         {
+            // TODO: this is not working!!!!
+            // reassign bindings if it was created
+            bindings = addressManager.getBindingsForRoutingAddress(address);
+         }
+      }
+
       if (bindings != null)
       {
          bindings.route(message, context);
@@ -708,7 +727,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
 
                message.setAddress(dlaAddress);
 
-               route(message, context.getTransaction(), false);
+               route(message, null, context.getTransaction(), false);
             }
          }
          else

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java
index fa3330e..e6ae69f 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQPacketHandler.java
@@ -177,7 +177,7 @@ public class ActiveMQPacketHandler implements ChannelHandler
                                                       request.getDefaultAddress(),
                                                       new CoreSessionCallback(request.getName(),
                                                                               protocolManager,
-                                                                              channel), null);
+                                                                              channel), null, true);
 
          ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
                                                                              server.getStorageManager(),

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
index 8888588..fde78d4 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
@@ -111,7 +111,8 @@ public interface ActiveMQServer extends ActiveMQComponent
                                boolean xa,
                                String defaultAddress,
                                SessionCallback callback,
-                               ServerSessionFactory sessionFactory) throws Exception;
+                               ServerSessionFactory sessionFactory,
+                               boolean autoCreateQueues) throws Exception;
 
    SecurityStore getSecurityStore();
 
@@ -143,6 +144,19 @@ public interface ActiveMQServer extends ActiveMQComponent
    boolean isActive();
 
    /**
+    * This is the queue creator responsible for JMS Queue creations*
+    * @param queueCreator
+    */
+   void setJMSQueueCreator(QueueCreator queueCreator);
+
+   /**
+    * @see {@link org.apache.activemq.core.server.ActiveMQServer#setJMSQueueCreator(QueueCreator)} *
+    * *
+    * @return
+    */
+   QueueCreator getJMSQueueCreator();
+
+   /**
     * Wait for server initialization.
     * @param timeout
     * @param unit

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/server/QueueCreator.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/QueueCreator.java b/activemq-server/src/main/java/org/apache/activemq/core/server/QueueCreator.java
new file mode 100644
index 0000000..c1c272d
--- /dev/null
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/QueueCreator.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.core.server;
+
+import org.apache.activemq.api.core.SimpleString;
+
+public interface QueueCreator
+{
+   /**
+    *
+    * You should return true if you even tried to create the queue and the queue was already there.
+    * As the callers of this method will use that as an indicator that they should re-route the messages.
+    * *
+    * @return True if a queue was created.
+    */
+   boolean create(SimpleString address) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java
index 363385f..b599562 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSession.java
@@ -75,6 +75,8 @@ public interface ServerSession
 
    void xaSuspend() throws Exception;
 
+   QueueCreator getQueueCreator();
+
    List<Xid> xaGetInDoubtXids();
 
    int xaGetTimeout();

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java
index 322458c..4efe63d 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ServerSessionFactory.java
@@ -40,6 +40,6 @@ public interface ServerSessionFactory
          SecurityStore securityStore, ManagementService managementService,
          ActiveMQServerImpl activeMQServerImpl, SimpleString managementAddress,
          SimpleString simpleString, SessionCallback callback,
-         OperationContext context) throws Exception;
+         QueueCreator queueCreator, OperationContext context) throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
index 973f3e1..4efb8f7 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
@@ -99,6 +99,7 @@ import org.apache.activemq.core.server.LargeServerMessage;
 import org.apache.activemq.core.server.MemoryManager;
 import org.apache.activemq.core.server.NodeManager;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.QueueFactory;
 import org.apache.activemq.core.server.ServerSession;
 import org.apache.activemq.core.server.ServerSessionFactory;
@@ -222,6 +223,11 @@ public class ActiveMQServerImpl implements ActiveMQServer
 
    private MemoryManager memoryManager;
 
+   /**
+    * This will be set by the JMS Queue Manager.
+    */
+   private QueueCreator jmsQueueCreator;
+
    private final Map<String, ServerSession> sessions = new ConcurrentHashMap<String, ServerSession>();
 
    /**
@@ -593,6 +599,18 @@ public class ActiveMQServerImpl implements ActiveMQServer
       stop(failoverOnServerShutdown, false, false);
    }
 
+   @Override
+   public QueueCreator getJMSQueueCreator()
+   {
+      return jmsQueueCreator;
+   }
+
+   @Override
+   public void setJMSQueueCreator(QueueCreator jmsQueueCreator)
+   {
+      this.jmsQueueCreator = jmsQueueCreator;
+   }
+
    /**
     * Stops the server
     * @param criticalIOError          whether we have encountered an IO error with the journal etc
@@ -1007,6 +1025,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
       return backupManager;
    }
 
+   @Override
    public ServerSession createSession(final String name,
                                       final String username,
                                       final String password,
@@ -1018,7 +1037,8 @@ public class ActiveMQServerImpl implements ActiveMQServer
                                       final boolean xa,
                                       final String defaultAddress,
                                       final SessionCallback callback,
-                                      final ServerSessionFactory sessionFactory) throws Exception
+                                      final ServerSessionFactory sessionFactory,
+                                      final boolean autoCreateQueues) throws Exception
    {
 
       if (securityStore != null)
@@ -1026,14 +1046,22 @@ public class ActiveMQServerImpl implements ActiveMQServer
          securityStore.authenticate(username, password);
       }
       final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
-      final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, sessionFactory);
+      final ServerSessionImpl session = internalCreateSession(name, username, password, minLargeMessageSize,
+                                                              connection, autoCommitSends, autoCommitAcks, preAcknowledge,
+                                                              xa, defaultAddress, callback, context, sessionFactory, autoCreateQueues);
 
       sessions.put(name, session);
 
       return session;
    }
 
-   protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception
+   protected ServerSessionImpl internalCreateSession(String name, String username,
+                                                     String password, int minLargeMessageSize,
+                                                     RemotingConnection connection, boolean autoCommitSends,
+                                                     boolean autoCommitAcks, boolean preAcknowledge, boolean xa,
+                                                     String defaultAddress, SessionCallback callback,
+                                                     OperationContext context, ServerSessionFactory sessionFactory,
+                                                     boolean autoCreateJMSQueues) throws Exception
    {
       if (sessionFactory == null)
       {
@@ -1057,7 +1085,8 @@ public class ActiveMQServerImpl implements ActiveMQServer
                                    defaultAddress == null ? null
                                       : new SimpleString(defaultAddress),
                                    callback,
-                                   context);
+                                   context,
+                                   autoCreateJMSQueues ? jmsQueueCreator : null);
       }
       else
       {
@@ -1081,6 +1110,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
                                    defaultAddress == null ? null
                                       : new SimpleString(defaultAddress),
                                    callback,
+                                   jmsQueueCreator,
                                    context);
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java
index c0068f3..b080351 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/DivertImpl.java
@@ -110,7 +110,7 @@ public class DivertImpl implements Divert
          copy = message;
       }
 
-      postOffice.route(copy, context.getTransaction(), false);
+      postOffice.route(copy, null, context.getTransaction(), false);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
index b0e4289..c6f94d6 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/QueueImpl.java
@@ -2449,7 +2449,7 @@ public class QueueImpl implements Queue
 
       copyMessage.setAddress(toAddress);
 
-      postOffice.route(copyMessage, tx, false, rejectDuplicate);
+      postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
 
       acknowledge(tx, ref);
    }
@@ -2673,7 +2673,7 @@ public class QueueImpl implements Queue
 
       copyMessage.setAddress(address);
 
-      postOffice.route(copyMessage, tx, false, rejectDuplicate);
+      postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
 
       acknowledge(tx, ref);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java
index 4cfc022..6b4939e 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ServerSessionImpl.java
@@ -35,6 +35,7 @@ import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.api.core.Message;
 import org.apache.activemq.api.core.Pair;
 import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientSession;
 import org.apache.activemq.api.core.management.CoreNotificationType;
 import org.apache.activemq.api.core.management.ManagementHelper;
 import org.apache.activemq.api.core.management.ResourceNames;
@@ -63,6 +64,7 @@ import org.apache.activemq.core.server.ActiveMQServer;
 import org.apache.activemq.core.server.LargeServerMessage;
 import org.apache.activemq.core.server.MessageReference;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.QueueQueryResult;
 import org.apache.activemq.core.server.RoutingContext;
 import org.apache.activemq.core.server.ServerConsumer;
@@ -154,6 +156,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
    private final OperationContext context;
 
+   private QueueCreator queueCreator;
+
    // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
    protected final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
 
@@ -169,8 +173,6 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
    private final TransactionFactory transactionFactory;
 
-   // Constructors ---------------------------------------------------------------------------------
-
    //create an 'empty' session. Only used by AMQServerSession
    //in order to check username and password
    protected ServerSessionImpl(String username, String password)
@@ -193,6 +195,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       this.managementAddress = null;
       this.context = null;
       this.callback = null;
+      this.queueCreator = null;
    }
 
    public ServerSessionImpl(final String name,
@@ -214,14 +217,15 @@ public class ServerSessionImpl implements ServerSession, FailureListener
                             final SimpleString managementAddress,
                             final SimpleString defaultAddress,
                             final SessionCallback callback,
-                            final OperationContext context) throws Exception
+                            final OperationContext context,
+                            final QueueCreator queueCreator) throws Exception
    {
       this(name, username, password, minLargeMessageSize,
          autoCommitSends, autoCommitAcks, preAcknowledge,
          strictUpdateDeliveryCount, xa, remotingConnection,
          storageManager, postOffice, resourceManager, securityStore,
          managementService, server, managementAddress, defaultAddress,
-         callback, context, null);
+         callback, context, null, queueCreator);
    }
 
    public ServerSessionImpl(final String name,
@@ -244,7 +248,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
                             final SimpleString defaultAddress,
                             final SessionCallback callback,
                             final OperationContext context,
-                            TransactionFactory transactionFactory) throws Exception
+                            TransactionFactory transactionFactory,
+                            final QueueCreator queueCreator) throws Exception
    {
       this.username = username;
 
@@ -288,6 +293,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       remotingConnection.addFailureListener(this);
       this.context = context;
 
+      this.queueCreator = queueCreator;
+
       if (transactionFactory == null)
       {
          this.transactionFactory = new DefaultTransactionFactory();
@@ -421,6 +428,13 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       }
    }
 
+
+
+   public QueueCreator getQueueCreator()
+   {
+      return queueCreator;
+   }
+
    public ServerConsumer createConsumer(final long consumerID,
                                         final SimpleString queueName,
                                         final SimpleString filterString,
@@ -1596,6 +1610,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       {
          data = metaData.get(key);
       }
+
+      if (key.equals(ClientSession.JMS_SESSION_CLIENT_ID_PROPERTY))
+      {
+         // we know it's a JMS Session, we now install JMS Hooks of any kind
+         installJMSHooks();
+      }
       return data;
    }
 
@@ -1709,16 +1729,18 @@ public class ServerSessionImpl implements ServerSession, FailureListener
       connectionFailed(me, failedOver);
    }
 
-   // Public
-   // ----------------------------------------------------------------------------
-
    public void clearLargeMessage()
    {
       currentLargeMessage = null;
    }
 
-   // Private
-   // ----------------------------------------------------------------------------
+
+
+   private void installJMSHooks()
+   {
+      this.queueCreator = server.getJMSQueueCreator();
+   }
+
 
    private Map<SimpleString, Pair<UUID, AtomicLong>> cloneTargetAddresses()
    {
@@ -1846,7 +1868,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
       try
       {
-         postOffice.route(msg, routingContext, direct);
+         postOffice.route(msg, queueCreator, routingContext, direct);
 
          Pair<UUID, AtomicLong> value = targetAddressInfos.get(msg.getAddress());
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java
index bef361c..6a54438 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/management/impl/ManagementServiceImpl.java
@@ -728,7 +728,7 @@ public class ManagementServiceImpl implements ManagementService
                                                         new SimpleString(notification.getUID()));
                }
 
-               postOffice.route(notificationMessage, false);
+               postOffice.route(notificationMessage, null, false);
             }
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java
----------------------------------------------------------------------
diff --git a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java
index 4c91e77..549f60c 100644
--- a/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java
+++ b/integration/activemq-vertx-integration/src/main/java/org/apache/activemq/integration/vertx/IncomingVertxEventHandler.java
@@ -175,7 +175,7 @@ public class IncomingVertxEventHandler implements ConnectorService
 
          try
          {
-            postOffice.route(msg, false);
+            postOffice.route(msg, null, false);
          }
          catch (Exception e)
          {

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java
index 53372d1..d785ceb 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/AutoCreateJmsQueueTest.java
@@ -73,6 +73,38 @@ public class AutoCreateJmsQueueTest extends JMSTestBase
    }
 
    @Test
+   public void testAutoCreateOnSendToQueueAnonymousProducer() throws Exception
+   {
+      Connection connection = cf.createConnection();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      javax.jms.Queue queue = ActiveMQJMSClient.createQueue("test");
+
+      MessageProducer producer = session.createProducer(null);
+
+      final int numMessages = 100;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         TextMessage mess = session.createTextMessage("msg" + i);
+         producer.send(queue, mess);
+      }
+
+      producer.close();
+
+      MessageConsumer messageConsumer = session.createConsumer(queue);
+      connection.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         Message m = messageConsumer.receive(5000);
+         Assert.assertNotNull(m);
+      }
+
+      connection.close();
+   }
+
+   @Test
    public void testAutoCreateOnSendToQueueSecurity() throws Exception
    {
       ((ActiveMQSecurityManagerImpl)server.getSecurityManager()).getConfiguration().addUser("guest", "guest");

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
index 1b6349a..bc96752 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java
@@ -661,7 +661,7 @@ public class HangConsumerTest extends ServiceTestBase
       }
 
       @Override
-      protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory) throws Exception
+      protected ServerSessionImpl internalCreateSession(String name, String username, String password, int minLargeMessageSize, RemotingConnection connection, boolean autoCommitSends, boolean autoCommitAcks, boolean preAcknowledge, boolean xa, String defaultAddress, SessionCallback callback, OperationContext context, ServerSessionFactory sessionFactory, boolean autoCreateQueue) throws Exception
       {
          return new ServerSessionImpl(name,
             username,
@@ -683,7 +683,8 @@ public class HangConsumerTest extends ServiceTestBase
             defaultAddress == null ? null
                : new SimpleString(defaultAddress),
             new MyCallback(callback),
-            context);
+            context,
+            null);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java
index 0032e62..172958f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/OpenWireTestBase.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.tests.integration.openwire;
 
+import javax.jms.ConnectionFactory;
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -23,10 +26,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import javax.jms.ConnectionFactory;
-import javax.management.MBeanServer;
-import javax.management.MBeanServerFactory;
-
 import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.api.core.TransportConfiguration;
 import org.apache.activemq.api.jms.management.JMSServerControl;
@@ -75,6 +74,7 @@ public class OpenWireTestBase extends ServiceTestBase
       Map<String, AddressSettings> addressSettings = serverConfig.getAddressesSettings();
       String match = "jms.queue.#";
       AddressSettings dlaSettings = new AddressSettings();
+      dlaSettings.setAutoCreateJmsQueues(false);
       SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
       dlaSettings.setDeadLetterAddress(dla);
       addressSettings.put(match, dlaSettings);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
index 6b88682..588291e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/SimpleOpenWireTest.java
@@ -32,6 +32,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.core.settings.impl.AddressSettings;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -293,6 +294,32 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
       assertTrue(message1.getText().equals(message.getText()));
    }
 
+   @Test
+   public void testAutoDestinationNoCreationOnConsumer() throws JMSException
+   {
+      AddressSettings addressSetting = new AddressSettings();
+      addressSetting.setAutoCreateJmsQueues(false);
+
+      String address = "foo";
+      server.getAddressSettingsRepository().addMatch("jms.queue." + address, addressSetting);
+
+      connection.start();
+      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      TextMessage message = session.createTextMessage("bar");
+      Queue queue = new ActiveMQQueue(address);
+
+      try
+      {
+         MessageConsumer consumer = session.createConsumer(queue);
+         Assert.fail("supposed to throw an exception here");
+      }
+      catch (JMSException e)
+      {
+
+      }
+   }
+
    /**
     * This is the example shipped with the distribution
     *

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java
index 7141372..b47e20d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/openwire/amq/ProducerFlowControlTest.java
@@ -16,26 +16,25 @@
  */
 package org.apache.activemq.tests.integration.openwire.amq;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.core.config.Configuration;
 import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.core.settings.impl.AddressSettings;
 import org.apache.activemq.tests.integration.openwire.BasicOpenWireTest;
+import org.apache.activemq.transport.tcp.TcpTransport;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/95b63289/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 70cdb7f..401d958 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -29,6 +29,7 @@ import org.apache.activemq.core.postoffice.PostOffice;
 import org.apache.activemq.core.postoffice.impl.DuplicateIDCacheImpl;
 import org.apache.activemq.core.server.MessageReference;
 import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.server.QueueCreator;
 import org.apache.activemq.core.server.RoutingContext;
 import org.apache.activemq.core.server.ServerMessage;
 import org.apache.activemq.core.server.impl.MessageReferenceImpl;
@@ -153,55 +154,40 @@ public class FakePostOffice implements PostOffice
       return new MessageReferenceImpl();
    }
 
-   public void route(final ServerMessage message, final Transaction tx) throws Exception
+   public void route(ServerMessage message, QueueCreator creator, RoutingContext context, boolean direct) throws Exception
    {
 
 
    }
 
-   public void route(final ServerMessage message, final RoutingContext context) throws Exception
+   public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct) throws Exception
    {
 
 
    }
 
-   public void route(ServerMessage message, boolean direct) throws Exception
-   {
-
-
-   }
-
-   public void route(ServerMessage message, RoutingContext context, boolean direct) throws Exception
-   {
-
-
-   }
-
-   public void route(ServerMessage message, Transaction tx, boolean direct) throws Exception
+   @Override
+   public void route(ServerMessage message, QueueCreator creator, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception
    {
 
 
    }
 
    @Override
-   public void route(ServerMessage message, RoutingContext context, boolean direct, boolean rejectDuplicates) throws Exception
+   public void route(ServerMessage message, QueueCreator creator, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception
    {
 
 
    }
 
    @Override
-   public void route(ServerMessage message, Transaction tx, boolean direct, boolean rejectDuplicates) throws Exception
+   public void processRoute(ServerMessage message, RoutingContext context, boolean direct) throws Exception
    {
-
-
    }
 
    @Override
-   public void processRoute(ServerMessage message, RoutingContext context, boolean direct) throws Exception
+   public void route(ServerMessage message, QueueCreator queueCreator, boolean direct) throws Exception
    {
 
-
    }
-
 }
\ No newline at end of file


[3/3] activemq-6 git commit: Merge branch #209'

Posted by ma...@apache.org.
Merge branch #209'


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/47edcd40
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/47edcd40
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/47edcd40

Branch: refs/heads/master
Commit: 47edcd40149b2c2b84739f83b488962d7759740d
Parents: aa63819 81a7613
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed Apr 22 12:21:46 2015 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed Apr 22 12:21:46 2015 +0100

----------------------------------------------------------------------
 .../activemq/api/core/client/ClientSession.java | 19 ++++++++
 .../activemq/jms/client/ActiveMQConnection.java |  6 +--
 .../jms/client/ActiveMQMessageProducer.java     | 14 +++---
 .../management/impl/JMSServerControlImpl.java   |  9 ++--
 .../jms/server/impl/JMSServerManagerImpl.java   | 41 ++++++++++++++++-
 .../plug/ProtonSessionIntegrationCallback.java  |  3 +-
 .../protocol/openwire/OpenWireConnection.java   | 47 +++++++-------------
 .../protocol/openwire/amq/AMQServerSession.java |  7 ++-
 .../openwire/amq/AMQServerSessionFactory.java   |  5 ++-
 .../core/protocol/openwire/amq/AMQSession.java  | 13 +++---
 .../protocol/stomp/StompProtocolManager.java    |  4 +-
 .../activemq/ra/inflow/ActiveMQActivation.java  |  4 +-
 .../activemq/core/postoffice/PostOffice.java    | 11 ++---
 .../core/postoffice/impl/PostOfficeImpl.java    | 35 +++++++++++----
 .../core/impl/ActiveMQPacketHandler.java        |  2 +-
 .../activemq/core/server/ActiveMQServer.java    | 16 ++++++-
 .../activemq/core/server/QueueCreator.java      | 32 +++++++++++++
 .../activemq/core/server/ServerSession.java     |  2 +
 .../core/server/ServerSessionFactory.java       |  2 +-
 .../core/server/impl/ActiveMQServerImpl.java    | 38 ++++++++++++++--
 .../activemq/core/server/impl/DivertImpl.java   |  2 +-
 .../activemq/core/server/impl/QueueImpl.java    |  4 +-
 .../core/server/impl/ServerSessionImpl.java     | 44 +++++++++++++-----
 .../management/impl/ManagementServiceImpl.java  |  2 +-
 .../vertx/IncomingVertxEventHandler.java        |  2 +-
 .../tests/extras/jms/bridge/JMSBridgeTest.java  |  6 +--
 .../client/AutoCreateJmsQueueTest.java          | 32 +++++++++++++
 .../integration/client/HangConsumerTest.java    |  5 ++-
 .../integration/openwire/OpenWireTestBase.java  |  8 ++--
 .../openwire/SimpleOpenWireTest.java            | 27 +++++++++++
 .../openwire/amq/ProducerFlowControlTest.java   | 13 +++---
 .../core/server/impl/fakes/FakePostOffice.java  | 30 ++++---------
 32 files changed, 346 insertions(+), 139 deletions(-)
----------------------------------------------------------------------



[2/3] activemq-6 git commit: ACTIVEMQ6-97 - fixing a left-over from the renaming

Posted by ma...@apache.org.
ACTIVEMQ6-97 - fixing a left-over from the renaming


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/81a7613b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/81a7613b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/81a7613b

Branch: refs/heads/master
Commit: 81a7613b386551488a66f278dcf9c5bb7f341567
Parents: 95b6328
Author: Clebert Suconic <cl...@apache.org>
Authored: Tue Apr 21 21:54:08 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Apr 21 21:54:10 2015 -0400

----------------------------------------------------------------------
 .../apache/activemq/tests/extras/jms/bridge/JMSBridgeTest.java | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/81a7613b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/JMSBridgeTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/JMSBridgeTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/JMSBridgeTest.java
index 50931ac..ceddca6 100644
--- a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/JMSBridgeTest.java
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/jms/bridge/JMSBridgeTest.java
@@ -1399,7 +1399,7 @@ public class JMSBridgeTest extends BridgeTestBase
 
             if (on)
             {
-               String header = tm.getStringProperty(ActiveMQJMSConstants.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+               String header = tm.getStringProperty(ActiveMQJMSConstants.AMQ_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
 
                Assert.assertNotNull(header);
 
@@ -1449,7 +1449,7 @@ public class JMSBridgeTest extends BridgeTestBase
 
                Assert.assertEquals("mygroup543", tm.getStringProperty("JMSXGroupID"));
 
-               String header = tm.getStringProperty(ActiveMQJMSConstants.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+               String header = tm.getStringProperty(ActiveMQJMSConstants.AMQ_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
 
                Assert.assertNotNull(header);
 
@@ -1749,7 +1749,7 @@ public class JMSBridgeTest extends BridgeTestBase
             Assert.assertTrue(tm.getBooleanProperty("cheese"));
             Assert.assertEquals(23, tm.getIntProperty("Sausages"));
 
-            String header = tm.getStringProperty(ActiveMQJMSConstants.JBOSS_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
+            String header = tm.getStringProperty(ActiveMQJMSConstants.AMQ_MESSAGING_BRIDGE_MESSAGE_ID_LIST);
 
             Assert.assertNull(header);
          }