You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/01 17:38:39 UTC
[43/52] [abbrv] activemq-artemis git commit: more refactorings on
producers
more refactorings on producers
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4a4b682f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4a4b682f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4a4b682f
Branch: refs/heads/refactor-openwire
Commit: 4a4b682f2d88ba2b9b783160493fed93b849e4a8
Parents: 6e36d48
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Feb 25 18:10:18 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Tue Mar 1 11:37:32 2016 -0500
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 190 +++++++++----------
.../core/protocol/openwire/OpenWireUtil.java | 23 +--
.../artemis/core/server/ActiveMQServer.java | 4 +
.../core/server/impl/ActiveMQServerImpl.java | 71 +++++++
.../core/server/impl/ServerSessionImpl.java | 56 +-----
.../InvestigationOpenwireTest.java | 17 +-
6 files changed, 181 insertions(+), 180 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4a4b682f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index dc2a8a6..6839259 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -51,7 +51,9 @@ import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerB
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.security.CheckType;
import org.apache.activemq.artemis.core.security.SecurityAuth;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener;
@@ -146,6 +148,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private String defaultSocketURIString;
+ // TODO-NOW: check on why there are two connections created for every createConnection on the client.
public OpenWireConnection(Connection connection,
Executor executor,
OpenWireProtocolManager openWireProtocolManager,
@@ -267,13 +270,25 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
- catch (IOException e) {
+ catch (Exception e) {
+ ActiveMQServerLogger.LOGGER.debug(e);
- // TODO-NOW: send errors
- ActiveMQServerLogger.LOGGER.error("error decoding", e);
- }
- catch (Throwable t) {
- ActiveMQServerLogger.LOGGER.error("error decoding", t);
+ Response resp;
+ if (e instanceof ActiveMQSecurityException) {
+ resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+ }
+ else if (e instanceof ActiveMQNonExistentQueueException) {
+ resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
+ }
+ else {
+ resp = new ExceptionResponse(e);
+ }
+ try {
+ dispatch(resp);
+ }
+ catch (IOException e2) {
+ ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e2);
+ }
}
}
@@ -861,6 +876,22 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
}
+ /**
+ * Checks to see if this destination exists. If it does not throw an invalid destination exception.
+ *
+ * @param destination
+ */
+ private void validateDestination(ActiveMQDestination destination) throws Exception {
+ if (destination.isQueue()) {
+ SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
+ BindingQueryResult result = protocolManager.getServer().bindingQuery(physicalName);
+ if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
+ throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
+ }
+ }
+ }
+
+
// This will listen for commands throught the protocolmanager
public class CommandProcessor implements CommandVisitor {
@@ -892,69 +923,40 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processAddProducer(ProducerInfo info) throws Exception {
- Response resp = null;
- try {
- SessionId sessionId = info.getProducerId().getParentId();
- ConnectionId connectionId = sessionId.getParentId();
- ConnectionState cs = getState();
- if (cs == null) {
- throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + connectionId);
- }
- SessionState ss = cs.getSessionState(sessionId);
- if (ss == null) {
- throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
- }
- // Avoid replaying dup commands
- if (!ss.getProducerIds().contains(info.getProducerId())) {
+ SessionId sessionId = info.getProducerId().getParentId();
+ ConnectionState cs = getState();
- AMQSession amqSession = sessions.get(sessionId);
- if (amqSession == null) {
- throw new IllegalStateException("Session not exist! : " + sessionId);
- }
+ if (cs == null) {
+ throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: " + sessionId.getParentId());
+ }
- ActiveMQDestination destination = info.getDestination();
- if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
- if (destination.isQueue()) {
- OpenWireUtil.validateDestination(destination, amqSession);
- }
- DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
- OpenWireConnection.this.addDestination(destInfo);
- }
+ SessionState ss = cs.getSessionState(sessionId);
+ if (ss == null) {
+ throw new IllegalStateException("Cannot add a producer to a session that had not been registered: " + sessionId);
+ }
- ss.addProducer(info);
+ // Avoid replaying dup commands
+ if (!ss.getProducerIds().contains(info.getProducerId())) {
+ ActiveMQDestination destination = info.getDestination();
+ if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
+ if (destination.isQueue()) {
+ OpenWireConnection.this.validateDestination(destination);
+ }
+ DestinationInfo destInfo = new DestinationInfo(getContext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
+ OpenWireConnection.this.addDestination(destInfo);
}
+
+ ss.addProducer(info);
+
}
- catch (Exception e) {
- if (e instanceof ActiveMQSecurityException) {
- resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
- }
- else if (e instanceof ActiveMQNonExistentQueueException) {
- resp = new ExceptionResponse(new InvalidDestinationException(e.getMessage()));
- }
- else {
- resp = new ExceptionResponse(e);
- }
- }
- return resp;
+ return null;
}
@Override
- public Response processAddConsumer(ConsumerInfo info) {
- Response resp = null;
- try {
- addConsumer(info);
- }
- catch (Exception e) {
- e.printStackTrace();
- if (e instanceof ActiveMQSecurityException) {
- resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
- }
- else {
- resp = new ExceptionResponse(e);
- }
- }
- return resp;
+ public Response processAddConsumer(ConsumerInfo info) throws Exception {
+ addConsumer(info);
+ return null;
}
@Override
@@ -1146,50 +1148,40 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
@Override
- public Response processMessage(Message messageSend) {
- Response resp = null;
- try {
- ProducerId producerId = messageSend.getProducerId();
- AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
- final AMQConnectionContext pcontext = producerExchange.getConnectionContext();
- final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
- boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
-
- AMQSession session = getSession(producerId.getParentId());
-
- SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
- if (result.isBlockNextSend()) {
- if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
- // TODO see logging
- throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
- }
-
- if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
- //in that case don't send the response
- //this will force the client to wait until
- //the response is got.
- context.setDontSendReponse(true);
- }
- else {
- //hang the connection until the space is available
- session.blockingWaitForSpace(producerExchange, result);
- }
+ public Response processMessage(Message messageSend) throws Exception {
+ ProducerId producerId = messageSend.getProducerId();
+ AMQProducerBrokerExchange producerExchange = getProducerBrokerExchange(producerId);
+ final AMQConnectionContext pcontext = producerExchange.getConnectionContext();
+ final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
+ boolean sendProducerAck = !messageSend.isResponseRequired() && producerInfo.getWindowSize() > 0 && !pcontext.isInRecoveryMode();
+
+ AMQSession session = getSession(producerId.getParentId());
+
+ SendingResult result = session.send(producerExchange, messageSend, sendProducerAck);
+ if (result.isBlockNextSend()) {
+ if (!context.isNetworkConnection() && result.isSendFailIfNoSpace()) {
+ // TODO see logging
+ throw new ResourceAllocationException("Usage Manager Memory Limit reached. Stopping producer (" + producerId + ") to prevent flooding " + result.getBlockingAddress() + "." + " See http://activemq.apache.org/producer-flow-control.html for more info");
}
- else if (sendProducerAck) {
- // TODO-now: send through OperationContext
- ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
- OpenWireConnection.this.dispatchAsync(ack);
- }
- }
- catch (Throwable e) {
- if (e instanceof ActiveMQSecurityException) {
- resp = new ExceptionResponse(new JMSSecurityException(e.getMessage()));
+
+ if (producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired()) {
+ //in that case don't send the response
+ //this will force the client to wait until
+ //the response is got.
+ context.setDontSendReponse(true);
}
else {
- resp = new ExceptionResponse(e);
+ //hang the connection until the space is available
+ session.blockingWaitForSpace(producerExchange, result);
}
}
- return resp;
+ else if (sendProducerAck) {
+ // TODO-now: send through OperationContext
+ ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
+ OpenWireConnection.this.dispatchAsync(ack);
+ }
+
+ return null;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4a4b682f/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
index d684761..4513eb3 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
@@ -18,16 +18,12 @@ package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
-import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.artemis.api.core.SimpleString;
public class OpenWireUtil {
@@ -64,23 +60,6 @@ public class OpenWireUtil {
}
}
- /**
- * Checks to see if this destination exists. If it does not throw an invalid destination exception.
- *
- * @param destination
- * @param amqSession
- */
- public static void validateDestination(ActiveMQDestination destination, AMQSession amqSession) throws Exception {
- if (destination.isQueue()) {
- AMQServerSession coreSession = amqSession.getCoreSession();
- SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
- BindingQueryResult result = coreSession.executeBindingQuery(physicalName);
- if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
- throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
- }
- }
- }
-
/*
*This util converts amq wildcards to compatible core wildcards
*The conversion is like this:
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4a4b682f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index e3c1b2a..64633bb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -243,6 +243,10 @@ public interface ActiveMQServer extends ActiveMQComponent {
Queue locateQueue(SimpleString queueName);
+ BindingQueryResult bindingQuery(SimpleString address) throws Exception;
+
+ QueueQueryResult queueQuery(SimpleString name) throws Exception;
+
void destroyQueue(SimpleString queueName) throws Exception;
void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4a4b682f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 7554127..13a1283 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -45,6 +45,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
@@ -76,6 +77,8 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.JournalStorageM
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.persistence.impl.nullpm.NullStorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.BindingType;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
@@ -97,6 +100,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Bindable;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
@@ -105,6 +109,7 @@ import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueCreator;
import org.apache.activemq.artemis.core.server.QueueFactory;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.ServerSessionFactory;
@@ -545,6 +550,72 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
+ public BindingQueryResult bindingQuery(SimpleString address) throws Exception {
+ if (address == null) {
+ throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
+ }
+
+ boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
+
+ List<SimpleString> names = new ArrayList<>();
+
+ // make an exception for the management address (see HORNETQ-29)
+ ManagementService managementService = getManagementService();
+ if (managementService != null) {
+ if (address.equals(managementService.getManagementAddress())) {
+ return new BindingQueryResult(true, names, autoCreateJmsQueues);
+ }
+ }
+
+ Bindings bindings = getPostOffice().getMatchingBindings(address);
+
+ for (Binding binding : bindings.getBindings()) {
+ if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
+ names.add(binding.getUniqueName());
+ }
+ }
+
+ return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
+ }
+
+ @Override
+ public QueueQueryResult queueQuery(SimpleString name) {
+ if (name == null) {
+ throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
+ }
+
+ boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
+
+ QueueQueryResult response;
+
+ Binding binding = getPostOffice().getBinding(name);
+
+ SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
+
+ if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
+ Queue queue = (Queue) binding.getBindable();
+
+ Filter filter = queue.getFilter();
+
+ SimpleString filterString = filter == null ? null : filter.getFilterString();
+
+ response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
+ }
+ // make an exception for the management address (see HORNETQ-29)
+ else if (name.equals(managementAddress)) {
+ response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
+ }
+ else if (autoCreateJmsQueues) {
+ response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
+ }
+ else {
+ response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
+ }
+
+ return response;
+ }
+
+ @Override
public void threadDump() {
StringWriter str = new StringWriter();
PrintWriter out = new PrintWriter(str);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4a4b682f/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index c3a979b..fbd91c2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -50,7 +50,6 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
-import org.apache.activemq.artemis.core.postoffice.Bindings;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CloseListener;
@@ -623,63 +622,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
@Override
public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception {
- if (name == null) {
- throw ActiveMQMessageBundle.BUNDLE.queueNameIsNull();
- }
-
- boolean autoCreateJmsQueues = name.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(name.toString()).isAutoCreateJmsQueues();
-
- QueueQueryResult response;
-
- Binding binding = postOffice.getBinding(name);
-
- if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE) {
- Queue queue = (Queue) binding.getBindable();
-
- Filter filter = queue.getFilter();
-
- SimpleString filterString = filter == null ? null : filter.getFilterString();
-
- response = new QueueQueryResult(name, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateJmsQueues);
- }
- // make an exception for the management address (see HORNETQ-29)
- else if (name.equals(managementAddress)) {
- response = new QueueQueryResult(name, managementAddress, true, false, null, -1, -1, autoCreateJmsQueues);
- }
- else if (autoCreateJmsQueues) {
- response = new QueueQueryResult(name, name, true, false, null, 0, 0, true, false);
- }
- else {
- response = new QueueQueryResult(null, null, false, false, null, 0, 0, false, false);
- }
-
- return response;
+ return server.queueQuery(name);
}
@Override
public BindingQueryResult executeBindingQuery(final SimpleString address) throws Exception {
- if (address == null) {
- throw ActiveMQMessageBundle.BUNDLE.addressIsNull();
- }
-
- boolean autoCreateJmsQueues = address.toString().startsWith(ResourceNames.JMS_QUEUE) && server.getAddressSettingsRepository().getMatch(address.toString()).isAutoCreateJmsQueues();
-
- List<SimpleString> names = new ArrayList<>();
-
- // make an exception for the management address (see HORNETQ-29)
- if (address.equals(managementAddress)) {
- return new BindingQueryResult(true, names, autoCreateJmsQueues);
- }
-
- Bindings bindings = postOffice.getMatchingBindings(address);
-
- for (Binding binding : bindings.getBindings()) {
- if (binding.getType() == BindingType.LOCAL_QUEUE || binding.getType() == BindingType.REMOTE_QUEUE) {
- names.add(binding.getUniqueName());
- }
- }
-
- return new BindingQueryResult(!names.isEmpty(), names, autoCreateJmsQueues);
+ return server.bindingQuery(address);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4a4b682f/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
index 3614b9a..914a8e1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/investigations/InvestigationOpenwireTest.java
@@ -17,15 +17,22 @@
package org.apache.activemq.artemis.tests.integration.openwire.investigations;
-import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
-import org.junit.Assert;
-import org.junit.Test;
-
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.XAConnection;
+import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import java.util.Collection;
import java.util.LinkedList;
+import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
+import org.junit.Assert;
+import org.junit.Test;
+
public class InvestigationOpenwireTest extends BasicOpenWireTest {
@Test