You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/09 19:49:07 UTC

[23/50] [abbrv] activemq-artemis git commit: ARTEMIS-789 OpenWire queue auto-creation failure

ARTEMIS-789 OpenWire queue auto-creation failure


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/796550d1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/796550d1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/796550d1

Branch: refs/heads/master
Commit: 796550d16b30d9033ff02b9520b19ee042e7cb15
Parents: ec8f061
Author: Howard Gao <ho...@gmail.com>
Authored: Fri Nov 11 19:40:36 2016 +0800
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Dec 9 18:43:15 2016 +0000

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 19 ++++++++--
 .../core/protocol/openwire/amq/AMQConsumer.java |  2 +-
 .../core/protocol/openwire/amq/AMQSession.java  | 40 ++++++++++++++++----
 .../protocol/openwire/util/OpenWireUtil.java    | 17 ---------
 4 files changed, 48 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/796550d1/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index 725733c..e823d0e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -74,6 +74,7 @@ import org.apache.activemq.artemis.spi.core.protocol.AbstractRemotingConnection;
 import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -179,6 +180,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    private boolean useKeepAlive;
    private long maxInactivityDuration;
 
+   private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();
+
    // TODO-NOW: check on why there are two connections created for every createConnection on the client.
    public OpenWireConnection(Connection connection,
                              ActiveMQServer server,
@@ -708,7 +711,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    public void addDestination(DestinationInfo info) throws Exception {
       ActiveMQDestination dest = info.getDestination();
       if (dest.isQueue()) {
-         SimpleString qName = OpenWireUtil.toCoreAddress(dest);
+         SimpleString qName = new SimpleString(dest.getPhysicalName());
          QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
          if (binding == null) {
             if (dest.isTemporary()) {
@@ -790,6 +793,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
       checkInactivity();
    }
 
+   public void addKnownDestination(final SimpleString address) {
+      knownDestinations.add(address);
+   }
+
+   public boolean containsKnownDestination(final SimpleString address) {
+      return knownDestinations.contains(address);
+   }
+
    @Override
    public void tempQueueDeleted(SimpleString bindingName) {
       ActiveMQDestination dest = new ActiveMQTempQueue(bindingName.toString());
@@ -862,7 +873,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
    public void removeDestination(ActiveMQDestination dest) throws Exception {
       if (dest.isQueue()) {
          try {
-            server.destroyQueue(OpenWireUtil.toCoreAddress(dest));
+            server.destroyQueue(new SimpleString(dest.getPhysicalName()));
          } catch (ActiveMQNonExistentQueueException neq) {
             //this is ok, ActiveMQ 5 allows this and will actually do it quite often
             ActiveMQServerLogger.LOGGER.debug("queue never existed");
@@ -870,7 +881,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
 
 
       } else {
-         Bindings bindings = server.getPostOffice().getBindingsForAddress(OpenWireUtil.toCoreAddress(dest));
+         Bindings bindings = server.getPostOffice().getBindingsForAddress(new SimpleString(dest.getPhysicalName()));
 
          for (Binding binding : bindings.getBindings()) {
             Queue b = (Queue) binding.getBindable();
@@ -900,7 +911,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
     */
    private void validateDestination(ActiveMQDestination destination) throws Exception {
       if (destination.isQueue()) {
-         SimpleString physicalName = OpenWireUtil.toCoreAddress(destination);
+         SimpleString physicalName = new SimpleString(destination.getPhysicalName());
          BindingQueryResult result = server.bindingQuery(physicalName);
          if (!result.isExists() && !result.isAutoCreateJmsQueues()) {
             throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(physicalName);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/796550d1/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 7f7907e..d5d65a9 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -95,7 +95,7 @@ public class AMQConsumer {
          serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.isBrowser(), false, -1);
          serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
       } else {
-         SimpleString queueName = OpenWireUtil.toCoreAddress(openwireDestination);
+         SimpleString queueName = new SimpleString(openwireDestination.getPhysicalName());
          try {
             session.getCoreServer().createQueue(queueName, queueName, null, true, false);
          } catch (ActiveMQQueueExistsException e) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/796550d1/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index 5cab686..35fd733 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -23,16 +23,16 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
-import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
@@ -145,11 +145,10 @@ public class AMQSession implements SessionCallback {
 
       for (ActiveMQDestination openWireDest : dests) {
          if (openWireDest.isQueue()) {
-            SimpleString queueName = OpenWireUtil.toCoreAddress(openWireDest);
-            try {
-               getCoreServer().createQueue(queueName, queueName, null, true, false);
-            } catch (ActiveMQQueueExistsException e) {
-               // ignore
+            SimpleString queueName = new SimpleString(openWireDest.getPhysicalName());
+
+            if (!checkAutoCreateQueue(queueName, openWireDest.isTemporary())) {
+               throw new InvalidDestinationException("Destination doesn't exist: " + queueName);
             }
          }
          AMQConsumer consumer = new AMQConsumer(this, openWireDest, info, scheduledPool);
@@ -162,6 +161,27 @@ public class AMQSession implements SessionCallback {
       return consumersList;
    }
 
+   private boolean checkAutoCreateQueue(SimpleString queueName, boolean isTemporary) throws Exception {
+      boolean hasQueue = true;
+      if (!connection.containsKnownDestination(queueName)) {
+
+         BindingQueryResult bindingQuery = server.bindingQuery(queueName);
+         QueueQueryResult queueBinding = server.queueQuery(queueName);
+
+         boolean isAutoCreate = bindingQuery.isExists() ? bindingQuery.isAutoCreateJmsQueues() : true;
+
+         if (!queueBinding.isExists()) {
+            if (isAutoCreate) {
+               server.createQueue(queueName, queueName, null, true, isTemporary);
+               connection.addKnownDestination(queueName);
+            } else {
+               hasQueue = false;
+            }
+         }
+      }
+      return hasQueue;
+   }
+
    public void start() {
 
       coreSession.start();
@@ -338,7 +358,7 @@ public class AMQSession implements SessionCallback {
       // We fillup addresses, pagingStores and we will throw failure if that's the case
       for (int i = 0; i < actualDestinations.length; i++) {
          ActiveMQDestination dest = actualDestinations[i];
-         addresses[i] = OpenWireUtil.toCoreAddress(dest);
+         addresses[i] = new SimpleString(dest.getPhysicalName());
          pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]);
          if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) {
             throw new ResourceAllocationException("Queue is full");
@@ -357,6 +377,10 @@ public class AMQSession implements SessionCallback {
             connection.getTransportConnection().setAutoRead(false);
          }
 
+         if (actualDestinations[i].isQueue()) {
+            checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary());
+         }
+
          RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary());
 
          if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/796550d1/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index a6e7292..04bd6a3 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.openwire.util;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
 import org.apache.activemq.command.ActiveMQDestination;
@@ -37,22 +36,6 @@ public class OpenWireUtil {
       return buffer;
    }
 
-   public static SimpleString toCoreAddress(ActiveMQDestination dest) {
-      if (dest.isQueue()) {
-         if (dest.isTemporary()) {
-            return new SimpleString(dest.getPhysicalName());
-         } else {
-            return new SimpleString(dest.getPhysicalName());
-         }
-      } else {
-         if (dest.isTemporary()) {
-            return new SimpleString(dest.getPhysicalName());
-         } else {
-            return new SimpleString(dest.getPhysicalName());
-         }
-      }
-   }
-
    /**
     * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
     * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was