You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/28 20:20:11 UTC

[1/2] activemq-artemis git commit: ARTEMIS-191 Refactor RemoveDestinationTest -Using core api to inspect queue status -Catch command visit() exceptions in order to pass it back to client. -Correct destination add/remove handlings

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 40f318f08 -> b91b5a1aa


ARTEMIS-191 Refactor RemoveDestinationTest
  -Using core api to inspect queue status
  -Catch command visit() exceptions in order to
   pass it back to client.
  -Correct destination add/remove handlings


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/be9959e0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/be9959e0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/be9959e0

Branch: refs/heads/master
Commit: be9959e0bc5d4f46a058c0b02dbbdb1060546301
Parents: 40f318f
Author: Howard Gao <ho...@gmail.com>
Authored: Fri Aug 28 20:33:38 2015 +0800
Committer: Howard Gao <ho...@gmail.com>
Committed: Fri Aug 28 20:33:38 2015 +0800

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 29 +++++----
 .../openwire/OpenWireProtocolManager.java       | 64 ++++++++++++++++----
 .../artemiswrapper/ArtemisBrokerBase.java       |  5 ++
 .../artemiswrapper/ArtemisBrokerWrapper.java    |  3 +-
 .../apache/activemq/RemoveDestinationTest.java  | 33 +++++-----
 5 files changed, 93 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9959e0/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index c5644dd..3155794 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -39,7 +39,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
-import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
@@ -174,7 +173,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
 
    private ConnectionState state;
 
-   private final Set<String> tempQueues = new ConcurrentHashSet<String>();
+   private final Set<ActiveMQDestination> tempQueues = new ConcurrentHashSet<ActiveMQDestination>();
 
    private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>();
 
@@ -227,7 +226,14 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
                response = new ExceptionResponse(this.stopError);
             }
             else {
-               response = ((Command) command).visit(this);
+               try {
+                  response = ((Command) command).visit(this);
+               }
+               catch (Exception e) {
+                  if (responseRequired) {
+                     response = new ExceptionResponse(e);
+                  }
+               }
 
                if (response instanceof ExceptionResponse) {
                   if (!responseRequired) {
@@ -409,10 +415,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
    }
 
    private void deleteTempQueues() throws Exception {
-      Iterator<String> queueNames = tempQueues.iterator();
-      while (queueNames.hasNext()) {
-         String q = queueNames.next();
-         protocolManager.deleteQueue(q);
+      Iterator<ActiveMQDestination> tmpQs = tempQueues.iterator();
+      while (tmpQs.hasNext()) {
+         ActiveMQDestination q = tmpQs.next();
+         protocolManager.removeDestination(this, q);
       }
    }
 
@@ -1230,10 +1236,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
    @Override
    public Response processRemoveDestination(DestinationInfo info) throws Exception {
       ActiveMQDestination dest = info.getDestination();
-      if (dest.isQueue()) {
-         String qName = "jms.queue." + dest.getPhysicalName();
-         protocolManager.deleteQueue(qName);
-      }
+      protocolManager.removeDestination(this, dest);
       return null;
    }
 
@@ -1320,8 +1323,8 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
       return this.wireFormat;
    }
 
-   public void registerTempQueue(SimpleString qName) {
-      tempQueues.add(qName.toString());
+   public void registerTempQueue(ActiveMQDestination queue) {
+      tempQueues.add(queue);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9959e0/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 2554ce1..98b41ab 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -40,6 +40,9 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter;
@@ -50,6 +53,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnectio
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
@@ -136,7 +140,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
    private final ScheduledExecutorService scheduledPool;
 
-
    public OpenWireProtocolManager(OpenWireProtocolManagerFactory factory, ActiveMQServer server) {
       this.factory = factory;
       this.server = server;
@@ -429,17 +432,24 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       }
       // Avoid replaying dup commands
       if (!ss.getProducerIds().contains(info.getProducerId())) {
+
+         AMQSession amqSession = sessions.get(sessionId);
+         if (amqSession == null) {
+            throw new IllegalStateException("Session not exist! : " + sessionId);
+         }
+
          ActiveMQDestination destination = info.getDestination();
          if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
             if (theConn.getProducerCount() >= theConn.getMaximumProducersAllowedPerConnection()) {
                throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + theConn.getMaximumProducersAllowedPerConnection());
             }
+            if (destination.isQueue()) {
+               OpenWireUtil.validateDestination(destination, amqSession);
+            }
+            DestinationInfo destInfo = new DestinationInfo(theConn.getConext().getConnectionId(), DestinationInfo.ADD_OPERATION_TYPE, destination);
+            this.addDestination(theConn, destInfo);
          }
 
-         AMQSession amqSession = sessions.get(sessionId);
-         if (amqSession == null) {
-            throw new IllegalStateException("Session not exist! : " + sessionId);
-         }
 
          amqSession.createProducer(info);
 
@@ -539,10 +549,40 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       return sessions.get(sessionId);
    }
 
+   public void removeDestination(OpenWireConnection connection, ActiveMQDestination dest) throws Exception {
+      if (dest.isQueue()) {
+         SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
+         this.server.destroyQueue(qName);
+      }
+      else {
+         Bindings bindings = this.server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString("jms.topic." + dest.getPhysicalName()));
+         Iterator<Binding> iterator = bindings.getBindings().iterator();
+
+         while (iterator.hasNext()) {
+            Queue b = (Queue) iterator.next().getBindable();
+            if (b.getConsumerCount() > 0) {
+               throw new Exception("Destination still has an active subscription: " + dest.getPhysicalName());
+            }
+            if (b.isDurable()) {
+               throw new Exception("Destination still has durable subscription: " + dest.getPhysicalName());
+            }
+            b.deleteQueue();
+         }
+      }
+
+      if (!AdvisorySupport.isAdvisoryTopic(dest)) {
+         AMQConnectionContext context = connection.getConext();
+         DestinationInfo advInfo = new DestinationInfo(context.getConnectionId(), DestinationInfo.REMOVE_OPERATION_TYPE, dest);
+
+         ActiveMQTopic topic = AdvisorySupport.getDestinationAdvisoryTopic(dest);
+         fireAdvisory(context, topic, advInfo);
+      }
+   }
+
    public void addDestination(OpenWireConnection connection, DestinationInfo info) throws Exception {
       ActiveMQDestination dest = info.getDestination();
       if (dest.isQueue()) {
-         SimpleString qName = new SimpleString("jms.queue." + dest.getPhysicalName());
+         SimpleString qName = OpenWireUtil.toCoreAddress(dest);
          ConnectionState state = connection.getState();
          ConnectionInfo connInfo = state.getInfo();
          if (connInfo != null) {
@@ -555,9 +595,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
 
             ((ActiveMQServerImpl) server).checkQueueCreationLimit(user);
          }
-         this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, true);
+
+         QueueBinding binding = (QueueBinding) server.getPostOffice().getBinding(qName);
+         if (binding == null) {
+            this.server.createQueue(qName, qName, null, connInfo == null ? null : SimpleString.toSimpleString(connInfo.getUserName()), false, dest.isTemporary());
+         }
          if (dest.isTemporary()) {
-            connection.registerTempQueue(qName);
+            connection.registerTempQueue(dest);
          }
       }
 
@@ -570,10 +614,6 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
       }
    }
 
-   public void deleteQueue(String q) throws Exception {
-      server.destroyQueue(new SimpleString(q));
-   }
-
    public void endTransaction(TransactionInfo info) throws Exception {
       AMQSession txSession = transactions.get(info.getTransactionId());
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9959e0/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
index 6f2fff6..5c052a6 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
@@ -614,4 +614,9 @@ public abstract class ArtemisBrokerBase implements Broker {
       return directory.delete();
    }
 
+   public ActiveMQServer getServer()
+   {
+      return server;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9959e0/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index 723529f..14b93c6 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.broker.artemiswrapper;
 
-import java.net.URI;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -35,7 +34,6 @@ import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
-import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
 import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManagerImpl;
 import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
@@ -82,6 +80,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
       }
       SimpleString dla = new SimpleString("jms.queue.ActiveMQ.DLQ");
       commonSettings.setDeadLetterAddress(dla);
+      commonSettings.setAutoCreateJmsQueues(true);
 
       serverConfig.getAcceptorConfigurations().add(transportConfiguration);
       if (this.bservice.enableSsl()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/be9959e0/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java
index 5429723..894abe3 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.net.URI;
+import java.util.Iterator;
+import java.util.Set;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -31,12 +33,13 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
-import javax.management.ObjectName;
 
 import org.apache.activemq.advisory.DestinationSource;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.artemiswrapper.ArtemisBrokerWrapper;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.junit.After;
@@ -45,8 +48,8 @@ import org.junit.Test;
 
 public class RemoveDestinationTest {
 
-   private static final String VM_BROKER_URL = "vm://localhost?create=false";
-   private static final String BROKER_URL = "broker:vm://localhost?broker.persistent=false&broker.useJmx=true";
+   private static final String TCP_BROKER_URL = "tcp://localhost:61616?create=false";
+   private static final String BROKER_URL = "broker:tcp://localhost:61616?broker.persistent=false&broker.useJmx=true";
 
    BrokerService broker;
 
@@ -65,7 +68,7 @@ public class RemoveDestinationTest {
    }
 
    private Connection createConnection(final boolean start) throws JMSException {
-      ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL);
+      ConnectionFactory cf = new ActiveMQConnectionFactory(TCP_BROKER_URL);
       Connection conn = cf.createConnection();
       if (start) {
          conn.start();
@@ -118,7 +121,7 @@ public class RemoveDestinationTest {
 
       ActiveMQTopic amqTopic = (ActiveMQTopic) topic;
 
-      assertTrue(destinationPresentInAdminView(broker, amqTopic));
+      assertTrue(destinationPresentInAdminView(amqTopic));
       assertTrue(destinationSource.getTopics().contains(amqTopic));
 
       // This line generates a broker error since the consumer is still active.
@@ -133,7 +136,7 @@ public class RemoveDestinationTest {
       Thread.sleep(3000);
 
       assertTrue(destinationSource.getTopics().contains(amqTopic));
-      assertTrue(destinationPresentInAdminView(broker, amqTopic));
+      assertTrue(destinationPresentInAdminView(amqTopic));
 
       consumer.close();
       producer.close();
@@ -146,16 +149,18 @@ public class RemoveDestinationTest {
       amqConnection.destroyDestination(amqTopic);
       Thread.sleep(3000);
       assertFalse(destinationSource.getTopics().contains(amqTopic));
-      assertFalse(destinationPresentInAdminView(broker, amqTopic));
+      assertFalse(destinationPresentInAdminView(amqTopic));
    }
 
-   private boolean destinationPresentInAdminView(BrokerService broker2, ActiveMQTopic amqTopic) throws Exception {
+   private boolean destinationPresentInAdminView(ActiveMQTopic amqTopic) throws Exception {
       boolean found = false;
-      for (ObjectName name : broker.getAdminView().getTopics()) {
-
-         DestinationViewMBean proxy = (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true);
-
-         if (proxy.getName().equals(amqTopic.getPhysicalName())) {
+      ArtemisBrokerWrapper wrapper = (ArtemisBrokerWrapper) broker.getBroker();
+      PostOffice po = wrapper.getServer().getPostOffice();
+      Set<SimpleString> addressSet = po.getAddresses();
+      Iterator<SimpleString> iter = addressSet.iterator();
+      String addressToFind = "jms.topic." + amqTopic.getPhysicalName();
+      while (iter.hasNext()) {
+         if (addressToFind.equals(iter.next().toString())) {
             found = true;
             break;
          }


[2/2] activemq-artemis git commit: This closes #142 Remove destination

Posted by cl...@apache.org.
This closes #142 Remove destination


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b91b5a1a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b91b5a1a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b91b5a1a

Branch: refs/heads/master
Commit: b91b5a1aae3b4f51814029d3abe4ea4201d4538d
Parents: 40f318f be9959e
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Aug 28 14:19:57 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Aug 28 14:19:57 2015 -0400

----------------------------------------------------------------------
 .../protocol/openwire/OpenWireConnection.java   | 29 +++++----
 .../openwire/OpenWireProtocolManager.java       | 64 ++++++++++++++++----
 .../artemiswrapper/ArtemisBrokerBase.java       |  5 ++
 .../artemiswrapper/ArtemisBrokerWrapper.java    |  3 +-
 .../apache/activemq/RemoveDestinationTest.java  | 33 +++++-----
 5 files changed, 93 insertions(+), 41 deletions(-)
----------------------------------------------------------------------