You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/11/07 16:36:43 UTC

[23/50] [abbrv] activemq-artemis git commit: Add Queue Meta Data for DeleteOnNoConsumers, MaxConsumers

Add Queue Meta Data for DeleteOnNoConsumers,MaxConsumers


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/47f47e85
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/47f47e85
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/47f47e85

Branch: refs/heads/ARTEMIS-780
Commit: 47f47e85b870382effd3f4a47234b94756b094b7
Parents: d70bf3b
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Oct 31 13:19:02 2016 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Nov 7 11:28:07 2016 -0500

----------------------------------------------------------------------
 .../impl/ActiveMQServerControlImpl.java         |  1 -
 .../core/persistence/QueueBindingInfo.java      |  7 ++
 .../codec/PersistentQueueBindingEncoding.java   | 43 ++++++++++-
 .../artemis/core/server/ActiveMQServer.java     |  9 +++
 .../activemq/artemis/core/server/Queue.java     |  4 ++
 .../artemis/core/server/QueueConfig.java        | 51 ++++++++++++-
 .../core/server/impl/ActiveMQServerImpl.java    | 76 +++++++++++++++++---
 .../artemis/core/server/impl/AddressInfo.java   |  5 +-
 .../server/impl/PostOfficeJournalLoader.java    |  8 ++-
 9 files changed, 185 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index b0e8b9b..fcbf15c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -573,7 +573,6 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
       SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
       clearIO();
       try {
-
          server.deployQueue(SimpleString.toSimpleString(address), new SimpleString(name), filter, durable, false);
       } finally {
          blockOnIO();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 8c80a8a..4d435c6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -46,4 +46,11 @@ public interface QueueBindingInfo {
 
    List<QueueStatusEncoding> getQueueStatusEncodings();
 
+   int getMaxConsumers();
+
+   void setMaxConsumers(int maxConsumers);
+
+   boolean isDeleteOnNoConsumers();
+
+   void setDeleteOnNoConsumers();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 039460c..78a81ea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -41,6 +41,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
 
    public List<QueueStatusEncoding> queueStatusEncodings;
 
+   public int maxConsumers;
+
+   public boolean deleteOnNoConsumers;
+
    public PersistentQueueBindingEncoding() {
    }
 
@@ -57,6 +61,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
          user +
          ", autoCreated=" +
          autoCreated +
+         ", maxConsumers=" +
+         maxConsumers +
+         ", deleteOnNoConsumers=" +
+         deleteOnNoConsumers +
          "]";
    }
 
@@ -125,6 +133,26 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
    }
 
    @Override
+   public int getMaxConsumers() {
+      return 0;
+   }
+
+   @Override
+   public void setMaxConsumers(int maxConsumers) {
+
+   }
+
+   @Override
+   public boolean isDeleteOnNoConsumers() {
+      return false;
+   }
+
+   @Override
+   public void setDeleteOnNoConsumers() {
+
+   }
+
+   @Override
    public void decode(final ActiveMQBuffer buffer) {
       name = buffer.readSimpleString();
       address = buffer.readSimpleString();
@@ -144,6 +172,15 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       }
 
       autoCreated = buffer.readBoolean();
+
+      if (buffer.readableBytes() > 0) {
+         maxConsumers = buffer.readInt();
+         deleteOnNoConsumers = buffer.readBoolean();
+      }
+      else {
+         maxConsumers = -1;
+         deleteOnNoConsumers = false;
+      }
    }
 
    @Override
@@ -153,13 +190,17 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       buffer.writeNullableSimpleString(filterString);
       buffer.writeNullableSimpleString(createMetadata());
       buffer.writeBoolean(autoCreated);
+      buffer.writeInt(maxConsumers);
+      buffer.writeBoolean(deleteOnNoConsumers);
    }
 
    @Override
    public int getEncodeSize() {
       return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
          SimpleString.sizeofNullableString(filterString) + DataConstants.SIZE_BOOLEAN +
-         SimpleString.sizeofNullableString(createMetadata());
+         SimpleString.sizeofNullableString(createMetadata()) +
+         DataConstants.SIZE_INT +
+         DataConstants.SIZE_BOOLEAN;
    }
 
    private SimpleString createMetadata() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index a6256d8..ed45645 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -333,6 +333,15 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    QueueQueryResult queueQuery(SimpleString name) throws Exception;
 
+   Queue deployQueue(SimpleString address,
+                     SimpleString resourceName,
+                     SimpleString filterString,
+                     boolean durable,
+                     boolean temporary,
+                     boolean autoCreated,
+                     Integer maxConsumers,
+                     Boolean deleteOnNoConsumers) throws Exception;
+
    void destroyQueue(SimpleString queueName) throws Exception;
 
    void destroyQueue(SimpleString queueName, SecurityAuth session) throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 52cd2f0..270e0cd 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -46,6 +46,10 @@ public interface Queue extends Bindable {
 
    boolean isAutoCreated();
 
+   boolean isDeleteOnNoConsumers();
+
+   boolean getMaxConsumers();
+
    void addConsumer(Consumer consumer) throws Exception;
 
    void removeConsumer(Consumer consumer);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index f750f6c..3b7ed71 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -33,6 +33,8 @@ public final class QueueConfig {
    private final boolean durable;
    private final boolean temporary;
    private final boolean autoCreated;
+   private final int maxConsumers;
+   private final boolean deleteOnNoConsumers;
 
    public static final class Builder {
 
@@ -45,6 +47,8 @@ public final class QueueConfig {
       private boolean durable;
       private boolean temporary;
       private boolean autoCreated;
+      private int maxConsumers;
+      private boolean deleteOnNoConsumers;
 
       private Builder(final long id, final SimpleString name) {
          this(id, name, name);
@@ -60,6 +64,8 @@ public final class QueueConfig {
          this.durable = true;
          this.temporary = false;
          this.autoCreated = true;
+         this.maxConsumers = -1;
+         this.deleteOnNoConsumers = false;
          validateState();
       }
 
@@ -106,6 +112,16 @@ public final class QueueConfig {
          return this;
       }
 
+      public Builder maxConsumers(final int maxConsumers) {
+         this.maxConsumers = maxConsumers;
+         return this;
+      }
+
+      public Builder deleteOnNoConsumers(final boolean deleteOnNoConsumers) {
+         this.deleteOnNoConsumers = deleteOnNoConsumers;
+         return this;
+      }
+
       /**
        * Returns a new {@link QueueConfig} using the parameters configured on the {@link Builder}.
        * <br>
@@ -127,7 +143,7 @@ public final class QueueConfig {
          } else {
             pageSubscription = null;
          }
-         return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated);
+         return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, maxConsumers, deleteOnNoConsumers);
       }
 
    }
@@ -168,7 +184,9 @@ public final class QueueConfig {
                        final SimpleString user,
                        final boolean durable,
                        final boolean temporary,
-                       final boolean autoCreated) {
+                       final boolean autoCreated,
+                       final int maxConsumers,
+                       final boolean deleteOnNoConsumers) {
       this.id = id;
       this.address = address;
       this.name = name;
@@ -178,6 +196,8 @@ public final class QueueConfig {
       this.durable = durable;
       this.temporary = temporary;
       this.autoCreated = autoCreated;
+      this.deleteOnNoConsumers = deleteOnNoConsumers;
+      this.maxConsumers = maxConsumers;
    }
 
    public long id() {
@@ -216,6 +236,14 @@ public final class QueueConfig {
       return autoCreated;
    }
 
+   public boolean isDeleteOnNoConsumers() {
+      return deleteOnNoConsumers;
+   }
+
+   public int maxConsumers() {
+      return maxConsumers;
+   }
+
    @Override
    public boolean equals(Object o) {
       if (this == o)
@@ -241,6 +269,10 @@ public final class QueueConfig {
          return false;
       if (pageSubscription != null ? !pageSubscription.equals(that.pageSubscription) : that.pageSubscription != null)
          return false;
+      if (maxConsumers != that.maxConsumers)
+         return false;
+      if (deleteOnNoConsumers != that.deleteOnNoConsumers)
+         return false;
       return user != null ? user.equals(that.user) : that.user == null;
 
    }
@@ -256,11 +288,24 @@ public final class QueueConfig {
       result = 31 * result + (durable ? 1 : 0);
       result = 31 * result + (temporary ? 1 : 0);
       result = 31 * result + (autoCreated ? 1 : 0);
+      result = 31 * result + maxConsumers;
+      result = 31 * result + (deleteOnNoConsumers ? 1 : 0);
       return result;
    }
 
    @Override
    public String toString() {
-      return "QueueConfig{" + "id=" + id + ", address=" + address + ", name=" + name + ", filter=" + filter + ", pageSubscription=" + pageSubscription + ", user=" + user + ", durable=" + durable + ", temporary=" + temporary + ", autoCreated=" + autoCreated + '}';
+      return "QueueConfig{"
+         + "id=" + id
+         + ", address=" + address
+         + ", name=" + name
+         + ", filter=" + filter
+         + ", pageSubscription=" + pageSubscription
+         + ", user=" + user
+         + ", durable=" + durable
+         + ", temporary=" + temporary
+         + ", autoCreated=" + autoCreated
+         + ", maxConsumers=" + maxConsumers
+         + ", deleteOnNoConsumers=" + deleteOnNoConsumers + '}';
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index cce81c5..9421df3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1509,6 +1509,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final boolean durable,
                             final boolean temporary,
                             final boolean autoCreated) throws Exception {
+      return deployQueue(address, resourceName, filterString, durable, temporary, autoCreated, null, null);
+   }
+
+   @Override
+   public Queue deployQueue(final SimpleString address,
+                            final SimpleString resourceName,
+                            final SimpleString filterString,
+                            final boolean durable,
+                            final boolean temporary,
+                            final boolean autoCreated,
+                            final Integer maxConsumers,
+                            final Boolean deleteOnNoConsumers) throws Exception {
 
       if (resourceName.toString().toLowerCase().startsWith("jms.topic")) {
          ActiveMQServerLogger.LOGGER.deployTopic(resourceName);
@@ -1516,7 +1528,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          ActiveMQServerLogger.LOGGER.deployQueue(resourceName);
       }
 
-      return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated);
+      return createQueue(address, resourceName, filterString, null, durable, temporary, true, false, autoCreated, maxConsumers, deleteOnNoConsumers);
    }
 
    @Override
@@ -2102,9 +2114,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
    private void deployQueuesFromListCoreQueueConfiguration(List<CoreQueueConfiguration> queues) throws Exception {
       for (CoreQueueConfiguration config : queues) {
-         deployQueue(SimpleString.toSimpleString(config.getAddress()), SimpleString.toSimpleString(config.getName()), SimpleString.toSimpleString(config.getFilterString()), config.isDurable(), false);
+         deployQueue(SimpleString.toSimpleString(config.getAddress()),
+                     SimpleString.toSimpleString(config.getName()),
+                     SimpleString.toSimpleString(config.getFilterString()),
+                     config.isDurable(),
+                     false,
+                     false,
+                     config.getMaxConsumers(),
+                     config.getDeleteOnNoConsumers());
       }
    }
+
    private void deployQueuesFromConfiguration() throws Exception {
       deployQueuesFromListCoreQueueConfiguration(configuration.getQueueConfigurations());
    }
@@ -2228,6 +2248,30 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                              final boolean ignoreIfExists,
                              final boolean transientQueue,
                              final boolean autoCreated) throws Exception {
+      return createQueue(addressName,
+                         queueName,
+                         filterString,
+                         user,
+                         durable,
+                         temporary,
+                         ignoreIfExists,
+                         transientQueue,
+                         autoCreated,
+                         null,
+                         null);
+   }
+
+   private Queue createQueue(final SimpleString addressName,
+                             final SimpleString queueName,
+                             final SimpleString filterString,
+                             final SimpleString user,
+                             final boolean durable,
+                             final boolean temporary,
+                             final boolean ignoreIfExists,
+                             final boolean transientQueue,
+                             final boolean autoCreated,
+                             final Integer maxConsumers,
+                             final Boolean deleteOnNoConsumers) throws Exception {
 
       final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
       if (binding != null) {
@@ -2244,21 +2288,31 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       final long queueID = storageManager.generateID();
 
       final QueueConfig.Builder queueConfigBuilder;
+      final SimpleString address;
       if (addressName == null) {
          queueConfigBuilder = QueueConfig.builderWith(queueID, queueName);
+         address = queueName;
       } else {
          queueConfigBuilder = QueueConfig.builderWith(queueID, queueName, addressName);
+         address = addressName;
       }
-      final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).build();
-      final Queue queue = queueFactory.createQueueWith(queueConfig);
 
-      boolean addressAlreadyExists = true;
+      // FIXME This boils down to a putIfAbsent (avoids race).  This should be reflected in the API.
+      AddressInfo info = postOffice.addAddressInfo(new AddressInfo(address));
 
-      if (postOffice.getAddressInfo(queue.getAddress()) == null) {
-         postOffice.addAddressInfo(new AddressInfo(queue.getAddress())
-                           .setRoutingType(AddressInfo.RoutingType.MULTICAST));
-         addressAlreadyExists = false;
-      }
+      final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers() : deleteOnNoConsumers;
+      final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxConsumers() : maxConsumers;
+
+      final QueueConfig queueConfig = queueConfigBuilder.filter(filter)
+         .pagingManager(pagingManager)
+         .user(user)
+         .durable(durable)
+         .temporary(temporary)
+         .autoCreated(autoCreated)
+         .deleteOnNoConsumers(isDeleteOnNoConsumers)
+         .maxConsumers(noMaxConsumers)
+         .build();
+      final Queue queue = queueFactory.createQueueWith(queueConfig);
 
       if (transientQueue) {
          queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
@@ -2270,7 +2324,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
       if (queue.isDurable()) {
          storageManager.addQueueBinding(txID, localQueueBinding);
-         if (!addressAlreadyExists) {
+         if (info == null) {
             storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress()));
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
index 4e982c4..7c71c1f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/AddressInfo.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.server.impl;
 
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 
 public class AddressInfo {
@@ -24,9 +25,9 @@ public class AddressInfo {
 
    private RoutingType routingType = RoutingType.MULTICAST;
 
-   private boolean defaultDeleteOnNoConsumers;
+   private boolean defaultDeleteOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultDeleteQueueOnNoConsumers();
 
-   private int defaultMaxConsumers;
+   private int defaultMaxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
 
    public AddressInfo(SimpleString name) {
       this.name = name;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/47f47e85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 4e89e8a..9bd14f0 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -143,7 +143,13 @@ public class PostOfficeJournalLoader implements JournalLoader {
          } else {
             queueConfigBuilder = QueueConfig.builderWith(queueBindingInfo.getId(), queueBindingInfo.getQueueName(), queueBindingInfo.getAddress());
          }
-         queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(queueBindingInfo.getUser()).durable(true).temporary(false).autoCreated(queueBindingInfo.isAutoCreated());
+         queueConfigBuilder.filter(filter).pagingManager(pagingManager)
+            .user(queueBindingInfo.getUser())
+            .durable(true)
+            .temporary(false)
+            .autoCreated(queueBindingInfo.isAutoCreated())
+            .de
+            );
          final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
          if (queue.isAutoCreated()) {
             queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl) postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));