You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/09 19:49:07 UTC
[23/50] [abbrv] activemq-artemis git commit: ARTEMIS-789 OpenWire
queue auto-creation failure
ARTEMIS-789 OpenWire queue auto-creation failure
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/796550d1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/796550d1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/796550d1
Branch: refs/heads/master
Commit: 796550d16b30d9033ff02b9520b19ee042e7cb15
Parents: ec8f061
Author: Howard Gao <ho...@gmail.com>
Authored: Fri Nov 11 19:40:36 2016 +0800
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Dec 9 18:43:15 2016 +0000
----------------------------------------------------------------------
.../protocol/openwire/OpenWireConnection.java | 19 ++++++++--
.../core/protocol/openwire/amq/AMQConsumer.java | 2 +-
.../core/protocol/openwire/amq/AMQSession.java | 40 ++++++++++++++++----
.../protocol/openwire/util/OpenWireUtil.java | 17 ---------
4 files changed, 48 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/796550d1/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 725733c..e823d0e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -74,6 +74,7 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@@ -179,6 +180,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private boolean useKeepAlive;
private long maxInactivityDuration;
+ private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
+
// TODO-NOW: check on why there are two connections created for every createConnection on the client.
public OpenWireConnection(Connection connection,
ActiveMQServer server,
@@ -708,7 +711,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public void addDestination(DestinationInfo info) throws Exception {
ActiveMQDestination dest = info.getDestination();
if (dest.isQueue()) {
- SimpleString qName = OpenWireUtil.toCoreAddress(dest);
+ SimpleString qName = new SimpleString(dest.getPhysicalName());
QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
if (binding == null) {
if (dest.isTemporary()) {
@@ -790,6 +793,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
checkInactivity();
}
+ public void addKnownDestination(final SimpleString address) {
+ knownDestinations.add(address);
+ }
+
+ public boolean containsKnownDestination(final SimpleString address) {
+ return knownDestinations.contains(address);
+ }
+
@Override
public void tempQueueDeleted(SimpleString bindingName) {
ActiveMQDestination dest = new ActiveMQTempQueue(bindingName.toString());
@@ -862,7 +873,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public void removeDestination(ActiveMQDestination dest) throws Exception {
if (dest.isQueue()) {
try {
- server.destroyQueue(OpenWireUtil.toCoreAddress(dest));
+ server.destroyQueue(new SimpleString(dest.getPhysicalName()));
} catch (ActiveMQNonExistentQueueException neq) {
//this is ok, ActiveMQ 5 allows this and will actually do it quite often
ActiveMQServerLogger.LOGGER.debug("queue never existed");
@@ -870,7 +881,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} else {
- Bindings bindings = server.getPostOffice().getBindingsForAddress(OpenWireUtil.toCoreAddress(dest));
+ Bindings bindings = server.getPostOffice().getBindingsForAddress(new SimpleString(dest.getPhysicalName()));
for (Binding binding : bindings.getBindings()) {
Queue b = (Queue) binding.getBindable();
@@ -900,7 +911,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
*/
private void validateDestination(ActiveMQDestination destination) throws Exception {
if (destination.isQueue()) {
- SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
+ SimpleString physicalName = new SimpleString(destination.getPhysicalName());
BindingQueryResult result = server.bindingQuery(physicalName);
if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/796550d1/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 7f7907e..d5d65a9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -95,7 +95,7 @@ public class AMQConsumer {
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
} else {
- SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination);
+ SimpleString queueName = new SimpleString(openwireDestination.getPhysicalName());
try {
session.getCoreServer().createQueue(queueName, queueName, null, true, false);
} catch (ActiveMQQueueExistsException e) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/796550d1/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 5cab686..35fd733 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -23,16 +23,16 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
@@ -145,11 +145,10 @@ public class AMQSession implements SessionCallback {
for (ActiveMQDestination openWireDest : dests) {
if (openWireDest.isQueue()) {
- SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
- try {
- getCoreServer().createQueue(queueName, queueName, null, true, false);
- } catch (ActiveMQQueueExistsException e) {
- // ignore
+ SimpleString queueName = new SimpleString(openWireDest.getPhysicalName());
+
+ if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {
+ throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
}
}
AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
@@ -162,6 +161,27 @@ public class AMQSession implements SessionCallback {
return consumersList;
}
+ private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws Exception {
+ boolean hasQueue = true;
+ if (!connection.containsKnownDestination(queueName)) {
+
+ BindingQueryResult bindingQuery = server.bindingQuery(queueName);
+ QueueQueryResult queueBinding = server.queueQuery(queueName);
+
+ boolean isAutoCreate = bindingQuery.isExists() ? bindingQuery.isAutoCreateJmsQueues() : true;
+
+ if (!queueBinding.isExists()) {
+ if (isAutoCreate) {
+ server.createQueue(queueName, queueName, null, true, isTemporary);
+ connection.addKnownDestination(queueName);
+ } else {
+ hasQueue = false;
+ }
+ }
+ }
+ return hasQueue;
+ }
+
public void start() {
coreSession.start();
@@ -338,7 +358,7 @@ public class AMQSession implements SessionCallback {
// We fillup addresses, pagingStores and we will throw failure if that's the case
for (int i = 0; i < actualDestinations.length; i++) {
ActiveMQDestination dest = actualDestinations[i];
- addresses[i] = OpenWireUtil.toCoreAddress(dest);
+ addresses[i] = new SimpleString(dest.getPhysicalName());
pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
throw new ResourceAllocationException("Queue is full");
@@ -357,6 +377,10 @@ public class AMQSession implements SessionCallback {
connection.getTransportConnection().setAutoRead(false);
}
+ if (actualDestinations[i].isQueue()) {
+ checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
+ }
+
RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/796550d1/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index a6e7292..04bd6a3 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.openwire.util;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.command.ActiveMQDestination;
@@ -37,22 +36,6 @@ public class OpenWireUtil {
return buffer;
}
- public static SimpleString toCoreAddress(ActiveMQDestination dest) {
- if (dest.isQueue()) {
- if (dest.isTemporary()) {
- return new SimpleString(dest.getPhysicalName());
- } else {
- return new SimpleString(dest.getPhysicalName());
- }
- } else {
- if (dest.isTemporary()) {
- return new SimpleString(dest.getPhysicalName());
- } else {
- return new SimpleString(dest.getPhysicalName());
- }
- }
- }
-
/**
* We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
* destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was