You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/19 06:08:55 UTC

[49/67] [abbrv] activemq-artemis git commit: more refactorings on producers

more refactorings on producers


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b2ad7fa9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b2ad7fa9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b2ad7fa9

Branch: refs/heads/refactor-openwire
Commit: b2ad7fa949b8425045adbee5f6ffe1a6f07c265a
Parents: 9262390
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Feb 25 18:10:18 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Sat Mar 19 01:07:37 2016 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 190 +++++++++----------
 .../core/protocol/openwire/OpenWireUtil.java    |  23 +--
 .../artemis/core/server/ActiveMQServer.java     |   4 +
 .../core/server/impl/ActiveMQServerImpl.java    |  71 +++++++
 .../core/server/impl/ServerSessionImpl.java     |  56 +-----
 .../InvestigationOpenwireTest.java              |  17 +-
 6 files changed, 181 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2ad7fa9/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index dc2a8a6..6839259 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -51,7 +51,9 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerB
 import org.apache.activemq.artemis.core.remoting.FailureListener;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
@@ -146,6 +148,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
    private String defaultSocketURIString;
 
+   // TODO-NOW: check on why there are two connections created for every createConnection on the client.
    public OpenWireConnection(Connection connection,
                              Executor executor,
                              OpenWireProtocolManager openWireProtocolManager,
@@ -267,13 +270,25 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
          }
       }
-      catch (IOException e) {
+      catch (Exception e) {
+         ActiveMQServerLogger.LOGGER.debug(e);
 
-         // TODO-NOW: send errors
-         ActiveMQServerLogger.LOGGER.error("error decoding", e);
-      }
-      catch (Throwable t) {
-         ActiveMQServerLogger.LOGGER.error("error decoding", t);
+         Response resp;
+         if (e instanceof ActiveMQSecurityException) {
+            resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+         }
+         else if (e instanceof ActiveMQNonExistentQueueException) {
+            resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
+         }
+         else {
+            resp = new ExceptionResponse(e);
+         }
+         try {
+            dispatch(resp);
+         }
+         catch (IOException e2) {
+            ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2);
+         }
       }
    }
 
@@ -861,6 +876,22 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       }
    }
 
+   /**
+    * Checks to see if this destination exists.  If it does not throw an invalid destination exception.
+    *
+    * @param destination
+    */
+   private void validateDestination(ActiveMQDestination destination) throws Exception {
+      if (destination.isQueue()) {
+         SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
+         BindingQueryResult result = protocolManager.getServer().bindingQuery(physicalName);
+         if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
+            throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
+         }
+      }
+   }
+
+
    // This will listen for commands throught the protocolmanager
    public class CommandProcessor implements CommandVisitor {
 
@@ -892,69 +923,40 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
       @Override
       public Response processAddProducer(ProducerInfo info) throws Exception {
-         Response resp = null;
-         try {
-            SessionId sessionId = info.getProducerId().getParentId();
-            ConnectionId connectionId = sessionId.getParentId();
-            ConnectionState cs = getState();
-            if (cs == null) {
-               throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId);
-            }
-            SessionState ss = cs.getSessionState(sessionId);
-            if (ss == null) {
-               throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
-            }
-            // Avoid replaying dup commands
-            if (!ss.getProducerIds().contains(info.getProducerId())) {
+         SessionId sessionId = info.getProducerId().getParentId();
+         ConnectionState cs = getState();
 
-               AMQSession amqSession = sessions.get(sessionId);
-               if (amqSession == null) {
-                  throw new IllegalStateException("Session not exist! : " + sessionId);
-               }
+         if (cs == null) {
+            throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + sessionId.getParentId());
+         }
 
-               ActiveMQDestination destination = info.getDestination();
-               if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
-                  if (destination.isQueue()) {
-                     OpenWireUtil.validateDestination(destination, amqSession);
-                  }
-                  DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
-                  OpenWireConnection.this.addDestination(destInfo);
-               }
+         SessionState ss = cs.getSessionState(sessionId);
+         if (ss == null) {
+            throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
+         }
 
-               ss.addProducer(info);
+         // Avoid replaying dup commands
+         if (!ss.getProducerIds().contains(info.getProducerId())) {
+            ActiveMQDestination destination = info.getDestination();
 
+            if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
+               if (destination.isQueue()) {
+                  OpenWireConnection.this.validateDestination(destination);
+               }
+               DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
+               OpenWireConnection.this.addDestination(destInfo);
             }
+
+            ss.addProducer(info);
+
          }
-         catch (Exception e) {
-            if (e instanceof ActiveMQSecurityException) {
-               resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
-            }
-            else if (e instanceof ActiveMQNonExistentQueueException) {
-               resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
-            }
-            else {
-               resp = new ExceptionResponse(e);
-            }
-         }
-         return resp;
+         return null;
       }
 
       @Override
-      public Response processAddConsumer(ConsumerInfo info) {
-         Response resp = null;
-         try {
-            addConsumer(info);
-         }
-         catch (Exception e) {
-            e.printStackTrace();
-            if (e instanceof ActiveMQSecurityException) {
-               resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
-            }
-            else {
-               resp = new ExceptionResponse(e);
-            }
-         }
-         return resp;
+      public Response processAddConsumer(ConsumerInfo info) throws Exception {
+         addConsumer(info);
+         return null;
       }
 
       @Override
@@ -1146,50 +1148,40 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       }
 
       @Override
-      public Response processMessage(Message messageSend) {
-         Response resp = null;
-         try {
-            ProducerId producerId = messageSend.getProducerId();
-            AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
-            final AMQConnectionContext pcontext = producerExchange.getConnectionContext();
-            final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
-            boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
-
-            AMQSession session = getSession(producerId.getParentId());
-
-            SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
-            if (result.isBlockNextSend()) {
-               if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
-                  // TODO see logging
-                  throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
-               }
-
-               if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
-                  //in that case don't send the response
-                  //this will force the client to wait until
-                  //the response is got.
-                  context.setDontSendReponse(true);
-               }
-               else {
-                  //hang the connection until the space is available
-                  session.blockingWaitForSpace(producerExchange, result);
-               }
+      public Response processMessage(Message messageSend) throws Exception {
+         ProducerId producerId = messageSend.getProducerId();
+         AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
+         final AMQConnectionContext pcontext = producerExchange.getConnectionContext();
+         final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
+         boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
+
+         AMQSession session = getSession(producerId.getParentId());
+
+         SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
+         if (result.isBlockNextSend()) {
+            if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
+               // TODO see logging
+               throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
             }
-            else if (sendProducerAck) {
-               // TODO-now: send through OperationContext
-               ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
-               OpenWireConnection.this.dispatchAsync(ack);
-            }
-         }
-         catch (Throwable e) {
-            if (e instanceof ActiveMQSecurityException) {
-               resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+
+            if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
+               //in that case don't send the response
+               //this will force the client to wait until
+               //the response is got.
+               context.setDontSendReponse(true);
             }
             else {
-               resp = new ExceptionResponse(e);
+               //hang the connection until the space is available
+               session.blockingWaitForSpace(producerExchange, result);
             }
          }
-         return resp;
+         else if (sendProducerAck) {
+            // TODO-now: send through OperationContext
+            ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+            OpenWireConnection.this.dispatchAsync(ack);
+         }
+
+         return null;
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2ad7fa9/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
index d684761..4513eb3 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
@@ -18,16 +18,12 @@ package org.apache.activemq.artemis.core.protocol.openwire;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
-import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.artemis.api.core.SimpleString;
 
 public class OpenWireUtil {
 
@@ -64,23 +60,6 @@ public class OpenWireUtil {
       }
    }
 
-   /**
-    * Checks to see if this destination exists.  If it does not throw an invalid destination exception.
-    *
-    * @param destination
-    * @param amqSession
-    */
-   public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception {
-      if (destination.isQueue()) {
-         AMQServerSession coreSession = amqSession.getCoreSession();
-         SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
-         BindingQueryResult result = coreSession.executeBindingQuery(physicalName);
-         if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
-            throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
-         }
-      }
-   }
-
    /*
     *This util converts amq wildcards to compatible core wildcards
     *The conversion is like this:

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2ad7fa9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index e3c1b2a..64633bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -243,6 +243,10 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    Queue locateQueue(SimpleString queueName);
 
+   BindingQueryResult bindingQuery(SimpleString address) throws Exception;
+
+   QueueQueryResult queueQuery(SimpleString name) throws Exception;
+
    void destroyQueue(SimpleString queueName) throws Exception;
 
    void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2ad7fa9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 7554127..13a1283 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
@@ -76,6 +77,8 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
 import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
 import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.BindingType;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
@@ -97,6 +100,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.Bindable;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.Divert;
 import org.apache.activemq.artemis.core.server.JournalType;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -105,6 +109,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.QueueCreator;
 import org.apache.activemq.artemis.core.server.QueueFactory;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
 import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.ServerSessionFactory;
@@ -545,6 +550,72 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public BindingQueryResult bindingQuery(SimpleString address) throws Exception {
+      if (address == null) {
+         throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
+      }
+
+      boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
+
+      List<SimpleString> names = new ArrayList<>();
+
+      // make an exception for the management address (see HORNETQ-29)
+      ManagementService managementService = getManagementService();
+      if (managementService != null) {
+         if (address.equals(managementService.getManagementAddress())) {
+            return new BindingQueryResult(true, names, autoCreateJmsQueues);
+         }
+      }
+
+      Bindings bindings = getPostOffice().getMatchingBindings(address);
+
+      for (Binding binding : bindings.getBindings()) {
+         if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
+            names.add(binding.getUniqueName());
+         }
+      }
+
+      return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
+   }
+
+   @Override
+   public QueueQueryResult queueQuery(SimpleString name) {
+      if (name == null) {
+         throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
+      }
+
+      boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
+
+      QueueQueryResult response;
+
+      Binding binding = getPostOffice().getBinding(name);
+
+      SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
+
+      if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
+         Queue queue = (Queue) binding.getBindable();
+
+         Filter filter = queue.getFilter();
+
+         SimpleString filterString = filter == null ? null : filter.getFilterString();
+
+         response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
+      }
+      // make an exception for the management address (see HORNETQ-29)
+      else if (name.equals(managementAddress)) {
+         response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
+      }
+      else if (autoCreateJmsQueues) {
+         response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
+      }
+      else {
+         response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
+      }
+
+      return response;
+   }
+
+   @Override
    public void threadDump() {
       StringWriter str = new StringWriter();
       PrintWriter out = new PrintWriter(str);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2ad7fa9/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index d628bde..77705fa 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -50,7 +50,6 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.BindingType;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.remoting.CloseListener;
@@ -623,63 +622,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
    @Override
    public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception {
-      if (name == null) {
-         throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
-      }
-
-      boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
-
-      QueueQueryResult response;
-
-      Binding binding = postOffice.getBinding(name);
-
-      if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
-         Queue queue = (Queue) binding.getBindable();
-
-         Filter filter = queue.getFilter();
-
-         SimpleString filterString = filter == null ? null : filter.getFilterString();
-
-         response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
-      }
-      // make an exception for the management address (see HORNETQ-29)
-      else if (name.equals(managementAddress)) {
-         response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
-      }
-      else if (autoCreateJmsQueues) {
-         response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
-      }
-      else {
-         response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
-      }
-
-      return response;
+      return server.queueQuery(name);
    }
 
    @Override
    public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
-      if (address == null) {
-         throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
-      }
-
-      boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
-
-      List<SimpleString> names = new ArrayList<>();
-
-      // make an exception for the management address (see HORNETQ-29)
-      if (address.equals(managementAddress)) {
-         return new BindingQueryResult(true, names, autoCreateJmsQueues);
-      }
-
-      Bindings bindings = postOffice.getMatchingBindings(address);
-
-      for (Binding binding : bindings.getBindings()) {
-         if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
-            names.add(binding.getUniqueName());
-         }
-      }
-
-      return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
+      return server.bindingQuery(address);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b2ad7fa9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
index 3614b9a..914a8e1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
@@ -17,15 +17,22 @@
 
 package org.apache.activemq.artemis.tests.integration.openwire.investigations;
 
-import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
 import javax.transaction.xa.XAResource;
 import java.util.Collection;
 import java.util.LinkedList;
 
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Assert;
+import org.junit.Test;
+
 public class InvestigationOpenwireTest extends BasicOpenWireTest {
 
    @Test