You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by GitBox <gi...@apache.org> on 2019/08/23 13:04:09 UTC

[GitHub] [activemq-artemis] wy96f commented on a change in pull request #2802: ARTEMIS-2457 implement ring queue

wy96f commented on a change in pull request #2802: ARTEMIS-2457 implement ring queue
URL: https://github.com/apache/activemq-artemis/pull/2802#discussion_r317120705
 
 

 ##########
 File path: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RingQueue.java
 ##########
 @@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.filter.Filter;
+import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.core.server.QueueFactory;
+import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
+import org.jboss.logging.Logger;
+
+public class RingQueue extends QueueImpl {
+
+   private static final Logger logger = Logger.getLogger(RingQueue.class);
+   private volatile long ringSize;
+
+   public RingQueue(final long persistenceID,
+                    final SimpleString address,
+                    final SimpleString name,
+                    final Filter filter,
+                    final PageSubscription pageSubscription,
+                    final SimpleString user,
+                    final boolean durable,
+                    final boolean temporary,
+                    final boolean autoCreated,
+                    final RoutingType routingType,
+                    final Integer maxConsumers,
+                    final Boolean exclusive,
+                    final Boolean groupRebalance,
+                    final Integer groupBuckets,
+                    final SimpleString groupFirstKey,
+                    final Integer consumersBeforeDispatch,
+                    final Long delayBeforeDispatch,
+                    final Boolean purgeOnNoConsumers,
+                    final Long ringSize,
+                    final Boolean nonDestructive,
+                    final Boolean autoDelete,
+                    final Long autoDeleteDelay,
+                    final Long autoDeleteMessageCount,
+                    final boolean configurationManaged,
+                    final ScheduledExecutorService scheduledExecutor,
+                    final PostOffice postOffice,
+                    final StorageManager storageManager,
+                    final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                    final ArtemisExecutor executor,
+                    final ActiveMQServer server,
+                    final QueueFactory factory) {
+      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, autoDelete, autoDeleteDelay, autoDeleteMessageCount, configurationManaged, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
+      this.ringSize = ringSize;
+   }
+
+   @Override
+   public synchronized void addTail(final MessageReference ref, final boolean direct) {
+      enforceRing();
+
+      super.addTail(ref, direct);
+   }
+
+   @Override
+   public synchronized void addHead(final MessageReference ref, boolean scheduling) {
+      enforceRing(ref, scheduling);
+
+      if (!ref.isAlreadyAcked()) {
+         super.addHead(ref, scheduling);
+      }
+   }
+
+   @Override
+   public boolean allowsReferenceCallback() {
+      return false;
+   }
+
+   private void enforceRing() {
+      enforceRing(null, false);
+   }
+
+   private void enforceRing(MessageReference refToAck, boolean scheduling) {
+      if (getMessageCountForRing() >= ringSize) {
 
 Review comment:
   I think we should judge whether ref is already acked first.
   
   E.g. for non destructive queue, suppose ring size is 1, m1 is sent and delivered to consumer c1. Then m2 is sent and delivered to consumer c2. Thus m1 is replace acked and m2 resides in "messageReferences"(message count is now 1). Now consumer c1 is closed without acking m1, m1 is rollbacked and replace acked again that would throw IllegalStateException.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services