You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by michaelandrepearce <gi...@git.apache.org> on 2019/01/03 10:52:30 UTC

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

GitHub user michaelandrepearce opened a pull request:

    https://github.com/apache/activemq-artemis/pull/2488

    ARTEMIS-196 Implement Consumer Priority

    Add consumer priority support
    Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement.
    Add OpenWire JMS Test - taken from ActiveMQ5
    Add Core JMS Test
    Add AMQP Test
    Add Docs

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/michaelandrepearce/activemq-artemis ARTEMIS-196

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/activemq-artemis/pull/2488.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2488
    
----
commit 61a91701f3d424d31a83d9942f7786c90ac81559
Author: Michael André Pearce <mi...@...>
Date:   2018-12-31T13:22:02Z

    ARTEMIS-196 Implement Consumer Priority
    
    Add consumer priority support
    Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement.
    Add OpenWire JMS Test - taken from ActiveMQ5
    Add Core JMS Test
    Add AMQP Test
    Add Docs

----


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029745
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---
    @@ -2497,45 +2494,36 @@ private void deliver() {
     
     
                       handled++;
    -
    +                  consumers.reset();
                       continue;
                    }
     
                    if (logger.isTraceEnabled()) {
                       logger.trace("Queue " + this.getName() + " is delivering reference " + ref);
                    }
     
    -               // If a group id is set, then this overrides the consumer chosen round-robin
    +               final SimpleString groupID = extractGroupID(ref);
    +               groupConsumer = getGroupConsumer(groupConsumer, groupID);
     
    -               SimpleString groupID = extractGroupID(ref);
    -
    -               if (groupID != null) {
    -                  groupConsumer = groups.get(groupID);
    -
    -                  if (groupConsumer != null) {
    -                     consumer = groupConsumer;
    -                  }
    -               }
    -
    -               if (exclusive && redistributor == null) {
    -                  consumer = consumerList.get(0).consumer;
    +               if (groupConsumer != null) {
    +                  consumer = groupConsumer;
                    }
     
                    HandleStatus status = handle(ref, consumer);
     
                    if (status == HandleStatus.HANDLED) {
     
    -                  deliveriesInTransit.countUp();
    -
    -                  handledconsumer = consumer;
    -
    -                  removeMessageReference(holder, ref);
    -
                       if (redistributor == null) {
                          handleMessageGroup(ref, consumer, groupConsumer, groupID);
                       }
     
    +                  deliveriesInTransit.countUp();
    +
    +
    +                  removeMessageReference(holder, ref);
    +                  handledconsumer = consumer;
                       handled++;
    +                  consumers.reset();
    --- End diff --
    
    resolving as discussed else where


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244979227
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newLevels, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newLevels, low + 1, current.length - low);
    +         }
    +         newLevels[low] = new Level<T>(level);
    +         setArray(newLevels);
    +         return newLevels[low];
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    --- End diff --
    
    it could happen that many will access this one concurrently?


---

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2488
  
    @michaelandrepearce Consider that a static final MethodHandle::invokeExact has performance similar to a direct call, not a reflective call http://mail.openjdk.java.net/pipermail/jigsaw-dev/2017-January/010894.html


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245006519
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.util.Collection;
    +import java.util.Set;
    +
    +public interface QueueConsumers<T extends PriorityAware> extends Collection<T> {
    +
    +   Set<Integer> getPriorites();
    --- End diff --
    
    Priority is of Integer type, not byte.


---

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2488
  
    Yep, it is a common scenario to use a security manager so I prefer to not force adding any exception on the security manger to allow just this calls to work


---

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2488
  
    @franz1981 is that java 8? I thought it was Java 9+


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244982460
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newLevels, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newLevels, low + 1, current.length - low);
    +         }
    +         newLevels[low] = new Level<T>(level);
    +         setArray(newLevels);
    +         return newLevels[low];
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Level<T> level = getLevel(t.getPriority(), true);
    +      return level.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Level<T> level = getLevel(priorityAware.getPriority(), false);
    +      boolean result = level != null && level.remove(priorityAware);
    +      if (level != null && level.size() == 0) {
    +         removeLevel(level.level);
    +      }
    +      return result;
    +   }
    +
    +   private Level<T> removeLevel(int level) {
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else {
    +            Level<T>[] newLevels = newLevelArrayInstance(len - 1);
    +            System.arraycopy(current, 0, newLevels, 0, mid);
    +            System.arraycopy(current, mid + 1, newLevels, mid, len - mid - 1);
    +            setArray(newLevels);
    +            return midVal; //key found
    +         }
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public boolean containsAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      for (Object e : c)
    +         if (!contains(e))
    +            return false;
    +      return true;
    +   }
    +
    +   @Override
    +   public synchronized boolean addAll(Collection<? extends T> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (T e : c)
    +         if (addInternal(e))
    +            modified = true;
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean removeAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (Object o : c) {
    +         if (remove(o)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean retainAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      Level<T>[] levels = getArray();
    +      for (Level<T> level : levels) {
    +         if (level.retainAll(c)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized void clear() {
    +      Level<T>[] levels = getArray();
    +      for (Level<T> level : levels) {
    +         level.clear();
    +      }
    +      calcSize();
    +   }
    +
    +
    +
    +   @Override
    +   public boolean contains(Object o) {
    +      return o instanceof PriorityAware && contains((PriorityAware) o);
    +   }
    +
    +   public boolean contains(PriorityAware priorityAware) {
    +      if (priorityAware == null) return false;
    +      Level<T> level = getLevel(priorityAware.getPriority(), false);
    +      return level != null && level.contains(priorityAware);
    +   }
    +
    +   private void calcSize() {
    +      Level<T>[] current = getArray();
    +      int size = 0;
    +      for (Level<T> level : current) {
    +         size += level.size();
    +      }
    +      this.size = size;
    +   }
    +
    +   private static class QueueConsumersIterator<T extends PriorityAware> implements ResetableIterator<T> {
    +
    +      private final QueueConsumersImpl<T> queueConsumers;
    +      private final boolean resetable;
    +      private Level<T>[] levels;
    +      int level = -1;
    +      private ResetableIterator<T> currentIterator;
    +
    +      private QueueConsumersIterator(QueueConsumersImpl<T> queueConsumers, boolean resetable) {
    +         this.queueConsumers = queueConsumers;
    +         this.levels = queueConsumers.getArray();
    +         this.resetable = resetable;
    +
    +      }
    +
    +      @Override
    +      public boolean hasNext() {
    +         while (true) {
    +            if (currentIterator != null) {
    +               if (currentIterator.hasNext()) {
    +                  return true;
    +               }
    +            }
    +            int nextLevel = level + 1;
    +            if (levels != null && nextLevel < levels.length) {
    +               moveToLevel(nextLevel);
    +            } else {
    +               return false;
    +            }
    +         }
    +      }
    +
    +      @Override
    +      public T next() {
    +         while (true) {
    +            if (currentIterator != null) {
    +               if (currentIterator.hasNext()) {
    +                  return currentIterator.next();
    +               }
    +            }
    +            int nextLevel = level + 1;
    +            if (levels != null && nextLevel < levels.length) {
    +               moveToLevel(nextLevel);
    +            } else {
    +               return null;
    +            }
    +         }
    +      }
    +
    +      private void moveToLevel(int level) {
    +         Level<T> level0 = levels[level];
    +         if (resetable) {
    +            currentIterator = level0.resetableIterator().reset();
    +         } else {
    +            currentIterator = level0.iterator();
    +         }
    +         this.level = level;
    +      }
    +
    +      @Override
    +      public ResetableIterator<T> reset() {
    +         if (!resetable) {
    +            throw new IllegalStateException("Iterator is not resetable");
    +         }
    +         levels = queueConsumers.getArray();
    +         level = -1;
    +         currentIterator = null;
    +         return this;
    +      }
    +   }
    +
    +   /**
    +    * This is represents a getPriority and is modeled on CopyOnArrayList.
    +    *
    +    * @param <E>
    +    */
    +   private static class Level<E> {
    --- End diff --
    
    I'm not a big fan of multiple layers of abstractions, but I admit that `Level` is sharing here a lot from `CopyOnWriteArratList`: we can't just reuse it somehow? It is due to the `setArray` call?


---

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2488
  
    @franz1981 can you check your private email, a few queries on MethodHandle ;)


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245006326
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java ---
    @@ -21,7 +21,7 @@
     import org.apache.activemq.artemis.core.filter.Filter;
     import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
     
    -public interface Consumer {
    +public interface Consumer extends PriorityAware {
    --- End diff --
    
    Correct, but to make testing of the QueueConsumersImpl logic easier and quite self contained this, its kept seperate, also it means if we wanted to reuse the new QueueConsumerImpl for other PriorityAware needs it makes it easier.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245010184
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newLevels, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newLevels, low + 1, current.length - low);
    +         }
    +         newLevels[low] = new Level<T>(level);
    +         setArray(newLevels);
    +         return newLevels[low];
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    --- End diff --
    
    I understand it, but my point is more an idea to improve/simplify things: I understand that `CopyOnWriteArrayList` is the original starting point but here we are doing something similar, but different.
    If we just use few methods of it I would instead just implement what we need, avoiding to test/provide behaviours for more complex things that we are not using from it. The same to be said from the concurrency behaviour: if we have this class designed to be used in a single writer fashing, making the code simpler (and faster most of the time), I would do this instead.
    We are not implementing here any new CopyOnWriteArrayList, but a specific domain -specific class so I suppose that won't hurt to make it simpler (if possible)


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244979032
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    --- End diff --
    
    will pack this one into a separate method (if possible)


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244980096
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newLevels, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newLevels, low + 1, current.length - low);
    +         }
    +         newLevels[low] = new Level<T>(level);
    +         setArray(newLevels);
    +         return newLevels[low];
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Level<T> level = getLevel(t.getPriority(), true);
    +      return level.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Level<T> level = getLevel(priorityAware.getPriority(), false);
    +      boolean result = level != null && level.remove(priorityAware);
    +      if (level != null && level.size() == 0) {
    +         removeLevel(level.level);
    +      }
    +      return result;
    +   }
    +
    +   private Level<T> removeLevel(int level) {
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else {
    +            Level<T>[] newLevels = newLevelArrayInstance(len - 1);
    +            System.arraycopy(current, 0, newLevels, 0, mid);
    +            System.arraycopy(current, mid + 1, newLevels, mid, len - mid - 1);
    +            setArray(newLevels);
    +            return midVal; //key found
    +         }
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public boolean containsAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      for (Object e : c)
    +         if (!contains(e))
    +            return false;
    +      return true;
    +   }
    +
    +   @Override
    +   public synchronized boolean addAll(Collection<? extends T> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (T e : c)
    +         if (addInternal(e))
    +            modified = true;
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean removeAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (Object o : c) {
    +         if (remove(o)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean retainAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      Level<T>[] levels = getArray();
    +      for (Level<T> level : levels) {
    +         if (level.retainAll(c)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized void clear() {
    +      Level<T>[] levels = getArray();
    +      for (Level<T> level : levels) {
    +         level.clear();
    +      }
    +      calcSize();
    +   }
    +
    +
    +
    +   @Override
    +   public boolean contains(Object o) {
    +      return o instanceof PriorityAware && contains((PriorityAware) o);
    +   }
    +
    +   public boolean contains(PriorityAware priorityAware) {
    +      if (priorityAware == null) return false;
    +      Level<T> level = getLevel(priorityAware.getPriority(), false);
    +      return level != null && level.contains(priorityAware);
    +   }
    +
    +   private void calcSize() {
    +      Level<T>[] current = getArray();
    +      int size = 0;
    +      for (Level<T> level : current) {
    +         size += level.size();
    +      }
    +      this.size = size;
    +   }
    +
    +   private static class QueueConsumersIterator<T extends PriorityAware> implements ResetableIterator<T> {
    +
    +      private final QueueConsumersImpl<T> queueConsumers;
    +      private final boolean resetable;
    +      private Level<T>[] levels;
    +      int level = -1;
    +      private ResetableIterator<T> currentIterator;
    +
    +      private QueueConsumersIterator(QueueConsumersImpl<T> queueConsumers, boolean resetable) {
    +         this.queueConsumers = queueConsumers;
    +         this.levels = queueConsumers.getArray();
    +         this.resetable = resetable;
    +
    +      }
    +
    +      @Override
    +      public boolean hasNext() {
    +         while (true) {
    +            if (currentIterator != null) {
    +               if (currentIterator.hasNext()) {
    +                  return true;
    +               }
    +            }
    +            int nextLevel = level + 1;
    +            if (levels != null && nextLevel < levels.length) {
    +               moveToLevel(nextLevel);
    +            } else {
    +               return false;
    +            }
    +         }
    +      }
    +
    +      @Override
    +      public T next() {
    +         while (true) {
    +            if (currentIterator != null) {
    +               if (currentIterator.hasNext()) {
    +                  return currentIterator.next();
    +               }
    +            }
    +            int nextLevel = level + 1;
    +            if (levels != null && nextLevel < levels.length) {
    +               moveToLevel(nextLevel);
    +            } else {
    +               return null;
    +            }
    +         }
    +      }
    +
    +      private void moveToLevel(int level) {
    +         Level<T> level0 = levels[level];
    +         if (resetable) {
    +            currentIterator = level0.resetableIterator().reset();
    +         } else {
    +            currentIterator = level0.iterator();
    +         }
    +         this.level = level;
    +      }
    +
    +      @Override
    +      public ResetableIterator<T> reset() {
    +         if (!resetable) {
    +            throw new IllegalStateException("Iterator is not resetable");
    +         }
    +         levels = queueConsumers.getArray();
    +         level = -1;
    +         currentIterator = null;
    +         return this;
    +      }
    +   }
    +
    +   /**
    +    * This is represents a getPriority and is modeled on CopyOnArrayList.
    --- End diff --
    
    `{@link CopyOnWriteArrayList}`


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245013090
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newLevels, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newLevels, low + 1, current.length - low);
    +         }
    +         newLevels[low] = new Level<T>(level);
    +         setArray(newLevels);
    +         return newLevels[low];
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    --- End diff --
    
    All methods added we are using. Its not used in single write. This class essentially replaces the CopyOnWriteArrayList we used to hold the consumers, and supported only simple round robin before, and why most of its design is based on that.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244986539
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -95,6 +109,7 @@ public void encodeRest(final ActiveMQBuffer buffer) {
           buffer.writeNullableSimpleString(filterString);
           buffer.writeBoolean(browseOnly);
           buffer.writeBoolean(requiresResponse);
    +      buffer.writeInt(priority);
    --- End diff --
    
    I will write it as a byte, if we don't plan to support more then 127 priorities, consumer-side: but as I've said is a negligible save of space on the wire


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244986350
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -25,20 +26,24 @@
     
        private SimpleString filterString;
     
    +   private int priority;
    --- End diff --
    
    I will change it as `byte` if we are not planning to support priorities > 127 ie `Byte::MAX_VALUE`
    For small messages it *could* save some space (TBH to be verified with JOL tool)


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245009864
  
    --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---
    @@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext protonSender,
           return consumer;
        }
     
    +   private int getPriority(Map<Symbol, Object> properties) {
    +      Integer value = properties == null ? null : (Integer) properties.get(PRIORITY);
    --- End diff --
    
    Users in AMQP land will expect Integer.
    
    Qpid -https://qpid.apache.org/releases/qpid-broker-j-7.0.6/book/Java-Broker-Runtime-Consumers.html#Java-Broker-Runtime-Consumers-Prioirty


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029912
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -25,20 +26,24 @@
     
        private SimpleString filterString;
     
    +   private int priority;
    --- End diff --
    
    marking resolved.


---

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2488
  
    @michaelandrepearce It has been introduced into Java 8 to support the lambda as they are now, Java 9 has introduced VarHandle, that has a similar name and overlapping functionalities :P


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245012214
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newLevels, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newLevels, low + 1, current.length - low);
    +         }
    +         newLevels[low] = new Level<T>(level);
    +         setArray(newLevels);
    +         return newLevels[low];
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Level<T> level = getLevel(t.getPriority(), true);
    +      return level.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Level<T> level = getLevel(priorityAware.getPriority(), false);
    +      boolean result = level != null && level.remove(priorityAware);
    +      if (level != null && level.size() == 0) {
    +         removeLevel(level.level);
    +      }
    +      return result;
    +   }
    +
    +   private Level<T> removeLevel(int level) {
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else {
    +            Level<T>[] newLevels = newLevelArrayInstance(len - 1);
    +            System.arraycopy(current, 0, newLevels, 0, mid);
    +            System.arraycopy(current, mid + 1, newLevels, mid, len - mid - 1);
    +            setArray(newLevels);
    +            return midVal; //key found
    +         }
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public boolean containsAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      for (Object e : c)
    +         if (!contains(e))
    +            return false;
    +      return true;
    +   }
    +
    +   @Override
    +   public synchronized boolean addAll(Collection<? extends T> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (T e : c)
    +         if (addInternal(e))
    +            modified = true;
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean removeAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (Object o : c) {
    +         if (remove(o)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean retainAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      Level<T>[] levels = getArray();
    +      for (Level<T> level : levels) {
    +         if (level.retainAll(c)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized void clear() {
    +      Level<T>[] levels = getArray();
    +      for (Level<T> level : levels) {
    +         level.clear();
    +      }
    +      calcSize();
    +   }
    +
    +
    +
    +   @Override
    +   public boolean contains(Object o) {
    +      return o instanceof PriorityAware && contains((PriorityAware) o);
    +   }
    +
    +   public boolean contains(PriorityAware priorityAware) {
    +      if (priorityAware == null) return false;
    +      Level<T> level = getLevel(priorityAware.getPriority(), false);
    +      return level != null && level.contains(priorityAware);
    +   }
    +
    +   private void calcSize() {
    +      Level<T>[] current = getArray();
    +      int size = 0;
    +      for (Level<T> level : current) {
    +         size += level.size();
    +      }
    +      this.size = size;
    +   }
    +
    +   private static class QueueConsumersIterator<T extends PriorityAware> implements ResetableIterator<T> {
    +
    +      private final QueueConsumersImpl<T> queueConsumers;
    +      private final boolean resetable;
    +      private Level<T>[] levels;
    +      int level = -1;
    +      private ResetableIterator<T> currentIterator;
    +
    +      private QueueConsumersIterator(QueueConsumersImpl<T> queueConsumers, boolean resetable) {
    +         this.queueConsumers = queueConsumers;
    +         this.levels = queueConsumers.getArray();
    +         this.resetable = resetable;
    +
    +      }
    +
    +      @Override
    +      public boolean hasNext() {
    +         while (true) {
    +            if (currentIterator != null) {
    +               if (currentIterator.hasNext()) {
    +                  return true;
    +               }
    +            }
    +            int nextLevel = level + 1;
    +            if (levels != null && nextLevel < levels.length) {
    +               moveToLevel(nextLevel);
    +            } else {
    +               return false;
    +            }
    +         }
    +      }
    +
    +      @Override
    +      public T next() {
    +         while (true) {
    +            if (currentIterator != null) {
    +               if (currentIterator.hasNext()) {
    +                  return currentIterator.next();
    +               }
    +            }
    +            int nextLevel = level + 1;
    +            if (levels != null && nextLevel < levels.length) {
    +               moveToLevel(nextLevel);
    +            } else {
    +               return null;
    +            }
    +         }
    +      }
    +
    +      private void moveToLevel(int level) {
    +         Level<T> level0 = levels[level];
    +         if (resetable) {
    +            currentIterator = level0.resetableIterator().reset();
    +         } else {
    +            currentIterator = level0.iterator();
    +         }
    +         this.level = level;
    +      }
    +
    +      @Override
    +      public ResetableIterator<T> reset() {
    +         if (!resetable) {
    +            throw new IllegalStateException("Iterator is not resetable");
    +         }
    +         levels = queueConsumers.getArray();
    +         level = -1;
    +         currentIterator = null;
    +         return this;
    +      }
    +   }
    +
    +   /**
    +    * This is represents a getPriority and is modeled on CopyOnArrayList.
    --- End diff --
    
    will do.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245011975
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import java.util.Iterator;
    +
    +public interface ResetableIterator<T> extends Iterator<T> {
    +
    +   /**
    +    * Resets the iterator so you can re-iterate over all elements.
    +    *
    +    * @return itself, this is just for convenience.
    +    */
    +   ResetableIterator<T> reset();
    --- End diff --
    
    -1 we are not closing the iterator, nor would this go in a try resources block., we are simply resetting the iterator so it marks the endpos = startpos, so we continue to round robin, as successfully handled a message.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245007052
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newLevels, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newLevels, low + 1, current.length - low);
    +         }
    +         newLevels[low] = new Level<T>(level);
    +         setArray(newLevels);
    +         return newLevels[low];
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    --- End diff --
    
    Yes, its following the lines on design of CopyOnWriteArrayList, but using syncronized methods over using Reentrant lock as concurrency expected is low.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244976790
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Consumer.java ---
    @@ -21,7 +21,7 @@
     import org.apache.activemq.artemis.core.filter.Filter;
     import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
     
    -public interface Consumer {
    +public interface Consumer extends PriorityAware {
    --- End diff --
    
    Just a design q: why using a specific `PriorityAware` interface?
    I'm assuming that we can't have anymore a `Consumer` that doesn't provide a `default int getPriority()` 


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244978399
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumers.java ---
    @@ -0,0 +1,34 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.util.Collection;
    +import java.util.Set;
    +
    +public interface QueueConsumers<T extends PriorityAware> extends Collection<T> {
    +
    +   Set<Integer> getPriorites();
    --- End diff --
    
    I know that's just a test method, so feel free to ignore me, but I would just use a byte[] here :P


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244984864
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---
    @@ -3080,45 +3053,20 @@ private boolean deliverDirect(final MessageReference ref) {
                 return true;
              }
     
    -         int startPos = pos;
    -
    -         int size = consumerList.size();
    +         consumers.reset();
     
    -         while (true) {
    -            ConsumerHolder<? extends Consumer> holder;
    -            if (redistributor == null) {
    -               holder = consumerList.get(pos);
    -            } else {
    -               holder = redistributor;
    -            }
    +         while (consumers.hasNext() || redistributor != null) {
    --- End diff --
    
    Just thinking loud: given that `consumers::hasNext` is mostly used with a `next` after it, why not provide just a `pollNext` method that return the consumer or `null` if there isn't any? 


---

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2488
  
    Cool so will leave as is then. thanks for the review!


---

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2488
  
    Reflection would not be good here, getArray is HOT path, its reason why i need it.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244984274
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---
    @@ -2497,45 +2494,36 @@ private void deliver() {
     
     
                       handled++;
    -
    +                  consumers.reset();
                       continue;
                    }
     
                    if (logger.isTraceEnabled()) {
                       logger.trace("Queue " + this.getName() + " is delivering reference " + ref);
                    }
     
    -               // If a group id is set, then this overrides the consumer chosen round-robin
    +               final SimpleString groupID = extractGroupID(ref);
    +               groupConsumer = getGroupConsumer(groupConsumer, groupID);
     
    -               SimpleString groupID = extractGroupID(ref);
    -
    -               if (groupID != null) {
    -                  groupConsumer = groups.get(groupID);
    -
    -                  if (groupConsumer != null) {
    -                     consumer = groupConsumer;
    -                  }
    -               }
    -
    -               if (exclusive && redistributor == null) {
    -                  consumer = consumerList.get(0).consumer;
    +               if (groupConsumer != null) {
    +                  consumer = groupConsumer;
                    }
     
                    HandleStatus status = handle(ref, consumer);
     
                    if (status == HandleStatus.HANDLED) {
     
    -                  deliveriesInTransit.countUp();
    -
    -                  handledconsumer = consumer;
    -
    -                  removeMessageReference(holder, ref);
    -
                       if (redistributor == null) {
                          handleMessageGroup(ref, consumer, groupConsumer, groupID);
                       }
     
    +                  deliveriesInTransit.countUp();
    +
    +
    +                  removeMessageReference(holder, ref);
    +                  handledconsumer = consumer;
                       handled++;
    +                  consumers.reset();
    --- End diff --
    
    will try to put the `consumers.reset` into a `try..finally` block or wrapping the reset call into an `AutoCloseable::close` method to force a `try-with-resources` usage too


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245009338
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -25,20 +26,24 @@
     
        private SimpleString filterString;
     
    +   private int priority;
    --- End diff --
    
    We should support int, as AMQP uses can use -2^31 to 2^31-1. 
    
    This changes nothing on the Message size, or any space concerns, this is Only on consumer creation.


---

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2488
  
    Ready for review. 
    
    Ill look to merge within the week.
    
    As noted by the Jira ticket, this is to add feature that is in ActiveMQ5 (http://activemq.apache.org/consumer-priority.html) so part of bringing feature parity, also note feature is in RabbitMQ also.
    
    Think this might be the lowest Jira Ticket number i've worked on :)
    
    
    



---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244985399
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import java.util.Iterator;
    +
    +public interface ResetableIterator<T> extends Iterator<T> {
    +
    +   /**
    +    * Resets the iterator so you can re-iterate over all elements.
    +    *
    +    * @return itself, this is just for convenience.
    +    */
    +   ResetableIterator<T> reset();
    --- End diff --
    
    We can extends `AutoCloseable` too and override `close`  in order to call `reset` by default too


---

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2488
  
    @franz1981 alot of comments where why integer vs byte, whilst legacy ActiveMQ only supported 0-127 in open wire, many other brokers support integer for this feature, e.g. QPID for AMQP supports -2^31 to 2^31-1 like wise rabbitmq.
    
    This is set on the consumer (its not per message) so size isn't an issue it makes sense to support the int and not constrain our selves un-neededly, making people migrating AMQP easier.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245030347
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newLevels, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newLevels, low + 1, current.length - low);
    +         }
    +         newLevels[low] = new Level<T>(level);
    +         setArray(newLevels);
    +         return newLevels[low];
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    --- End diff --
    
    I assume thumbs up, means your query is resolved, ill mark this as resolved so i can keep track of open things i need to address.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029528
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newLevels, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newLevels, low + 1, current.length - low);
    +         }
    +         newLevels[low] = new Level<T>(level);
    +         setArray(newLevels);
    +         return newLevels[low];
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Level<T> level = getLevel(t.getPriority(), true);
    +      return level.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Level<T> level = getLevel(priorityAware.getPriority(), false);
    +      boolean result = level != null && level.remove(priorityAware);
    +      if (level != null && level.size() == 0) {
    +         removeLevel(level.level);
    +      }
    +      return result;
    +   }
    +
    +   private Level<T> removeLevel(int level) {
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else {
    +            Level<T>[] newLevels = newLevelArrayInstance(len - 1);
    +            System.arraycopy(current, 0, newLevels, 0, mid);
    +            System.arraycopy(current, mid + 1, newLevels, mid, len - mid - 1);
    +            setArray(newLevels);
    +            return midVal; //key found
    +         }
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public boolean containsAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      for (Object e : c)
    +         if (!contains(e))
    +            return false;
    +      return true;
    +   }
    +
    +   @Override
    +   public synchronized boolean addAll(Collection<? extends T> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (T e : c)
    +         if (addInternal(e))
    +            modified = true;
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean removeAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (Object o : c) {
    +         if (remove(o)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean retainAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      Level<T>[] levels = getArray();
    +      for (Level<T> level : levels) {
    +         if (level.retainAll(c)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized void clear() {
    +      Level<T>[] levels = getArray();
    +      for (Level<T> level : levels) {
    +         level.clear();
    +      }
    +      calcSize();
    +   }
    +
    +
    +
    +   @Override
    +   public boolean contains(Object o) {
    +      return o instanceof PriorityAware && contains((PriorityAware) o);
    +   }
    +
    +   public boolean contains(PriorityAware priorityAware) {
    +      if (priorityAware == null) return false;
    +      Level<T> level = getLevel(priorityAware.getPriority(), false);
    +      return level != null && level.contains(priorityAware);
    +   }
    +
    +   private void calcSize() {
    +      Level<T>[] current = getArray();
    +      int size = 0;
    +      for (Level<T> level : current) {
    +         size += level.size();
    +      }
    +      this.size = size;
    +   }
    +
    +   private static class QueueConsumersIterator<T extends PriorityAware> implements ResetableIterator<T> {
    +
    +      private final QueueConsumersImpl<T> queueConsumers;
    +      private final boolean resetable;
    +      private Level<T>[] levels;
    +      int level = -1;
    +      private ResetableIterator<T> currentIterator;
    +
    +      private QueueConsumersIterator(QueueConsumersImpl<T> queueConsumers, boolean resetable) {
    +         this.queueConsumers = queueConsumers;
    +         this.levels = queueConsumers.getArray();
    +         this.resetable = resetable;
    +
    +      }
    +
    +      @Override
    +      public boolean hasNext() {
    +         while (true) {
    +            if (currentIterator != null) {
    +               if (currentIterator.hasNext()) {
    +                  return true;
    +               }
    +            }
    +            int nextLevel = level + 1;
    +            if (levels != null && nextLevel < levels.length) {
    +               moveToLevel(nextLevel);
    +            } else {
    +               return false;
    +            }
    +         }
    +      }
    +
    +      @Override
    +      public T next() {
    +         while (true) {
    +            if (currentIterator != null) {
    +               if (currentIterator.hasNext()) {
    +                  return currentIterator.next();
    +               }
    +            }
    +            int nextLevel = level + 1;
    +            if (levels != null && nextLevel < levels.length) {
    +               moveToLevel(nextLevel);
    +            } else {
    +               return null;
    +            }
    +         }
    +      }
    +
    +      private void moveToLevel(int level) {
    +         Level<T> level0 = levels[level];
    +         if (resetable) {
    +            currentIterator = level0.resetableIterator().reset();
    +         } else {
    +            currentIterator = level0.iterator();
    +         }
    +         this.level = level;
    +      }
    +
    +      @Override
    +      public ResetableIterator<T> reset() {
    +         if (!resetable) {
    +            throw new IllegalStateException("Iterator is not resetable");
    +         }
    +         levels = queueConsumers.getArray();
    +         level = -1;
    +         currentIterator = null;
    +         return this;
    +      }
    +   }
    +
    +   /**
    +    * This is represents a getPriority and is modeled on CopyOnArrayList.
    --- End diff --
    
    Done - resolving


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245008000
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---
    @@ -2497,45 +2494,36 @@ private void deliver() {
     
     
                       handled++;
    -
    +                  consumers.reset();
                       continue;
                    }
     
                    if (logger.isTraceEnabled()) {
                       logger.trace("Queue " + this.getName() + " is delivering reference " + ref);
                    }
     
    -               // If a group id is set, then this overrides the consumer chosen round-robin
    +               final SimpleString groupID = extractGroupID(ref);
    +               groupConsumer = getGroupConsumer(groupConsumer, groupID);
     
    -               SimpleString groupID = extractGroupID(ref);
    -
    -               if (groupID != null) {
    -                  groupConsumer = groups.get(groupID);
    -
    -                  if (groupConsumer != null) {
    -                     consumer = groupConsumer;
    -                  }
    -               }
    -
    -               if (exclusive && redistributor == null) {
    -                  consumer = consumerList.get(0).consumer;
    +               if (groupConsumer != null) {
    +                  consumer = groupConsumer;
                    }
     
                    HandleStatus status = handle(ref, consumer);
     
                    if (status == HandleStatus.HANDLED) {
     
    -                  deliveriesInTransit.countUp();
    -
    -                  handledconsumer = consumer;
    -
    -                  removeMessageReference(holder, ref);
    -
                       if (redistributor == null) {
                          handleMessageGroup(ref, consumer, groupConsumer, groupID);
                       }
     
    +                  deliveriesInTransit.countUp();
    +
    +
    +                  removeMessageReference(holder, ref);
    +                  handledconsumer = consumer;
                       handled++;
    +                  consumers.reset();
    --- End diff --
    
    That is not this intent, the intent in reset is just to move the iterator markers, its not a resources its purpose. 


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245007330
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    +         if (low > 0) {
    +            System.arraycopy(current, 0, newLevels, 0, low);
    +         }
    +         if (current.length - low > 0) {
    +            System.arraycopy(current, low, newLevels, low + 1, current.length - low);
    +         }
    +         newLevels[low] = new Level<T>(level);
    +         setArray(newLevels);
    +         return newLevels[low];
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public synchronized boolean add(T t) {
    +      boolean result = addInternal(t);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean addInternal(T t) {
    +      if (t == null) return false;
    +      Level<T> level = getLevel(t.getPriority(), true);
    +      return level.add(t);
    +   }
    +
    +   @Override
    +   public boolean remove(Object o) {
    +      return o instanceof PriorityAware && remove((PriorityAware) o);
    +   }
    +
    +   public synchronized boolean remove(PriorityAware priorityAware) {
    +      boolean result = removeInternal(priorityAware);
    +      calcSize();
    +      return result;
    +   }
    +
    +   private boolean removeInternal(PriorityAware priorityAware) {
    +      if ( priorityAware == null) return false;
    +      Level<T> level = getLevel(priorityAware.getPriority(), false);
    +      boolean result = level != null && level.remove(priorityAware);
    +      if (level != null && level.size() == 0) {
    +         removeLevel(level.level);
    +      }
    +      return result;
    +   }
    +
    +   private Level<T> removeLevel(int level) {
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      int low = 0;
    +      int high = len - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else {
    +            Level<T>[] newLevels = newLevelArrayInstance(len - 1);
    +            System.arraycopy(current, 0, newLevels, 0, mid);
    +            System.arraycopy(current, mid + 1, newLevels, mid, len - mid - 1);
    +            setArray(newLevels);
    +            return midVal; //key found
    +         }
    +      }
    +      return null;
    +   }
    +
    +   @Override
    +   public boolean containsAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      for (Object e : c)
    +         if (!contains(e))
    +            return false;
    +      return true;
    +   }
    +
    +   @Override
    +   public synchronized boolean addAll(Collection<? extends T> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (T e : c)
    +         if (addInternal(e))
    +            modified = true;
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean removeAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      for (Object o : c) {
    +         if (remove(o)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized boolean retainAll(Collection<?> c) {
    +      Objects.requireNonNull(c);
    +      boolean modified = false;
    +      Level<T>[] levels = getArray();
    +      for (Level<T> level : levels) {
    +         if (level.retainAll(c)) {
    +            modified = true;
    +         }
    +      }
    +      calcSize();
    +      return modified;
    +   }
    +
    +   @Override
    +   public synchronized void clear() {
    +      Level<T>[] levels = getArray();
    +      for (Level<T> level : levels) {
    +         level.clear();
    +      }
    +      calcSize();
    +   }
    +
    +
    +
    +   @Override
    +   public boolean contains(Object o) {
    +      return o instanceof PriorityAware && contains((PriorityAware) o);
    +   }
    +
    +   public boolean contains(PriorityAware priorityAware) {
    +      if (priorityAware == null) return false;
    +      Level<T> level = getLevel(priorityAware.getPriority(), false);
    +      return level != null && level.contains(priorityAware);
    +   }
    +
    +   private void calcSize() {
    +      Level<T>[] current = getArray();
    +      int size = 0;
    +      for (Level<T> level : current) {
    +         size += level.size();
    +      }
    +      this.size = size;
    +   }
    +
    +   private static class QueueConsumersIterator<T extends PriorityAware> implements ResetableIterator<T> {
    +
    +      private final QueueConsumersImpl<T> queueConsumers;
    +      private final boolean resetable;
    +      private Level<T>[] levels;
    +      int level = -1;
    +      private ResetableIterator<T> currentIterator;
    +
    +      private QueueConsumersIterator(QueueConsumersImpl<T> queueConsumers, boolean resetable) {
    +         this.queueConsumers = queueConsumers;
    +         this.levels = queueConsumers.getArray();
    +         this.resetable = resetable;
    +
    +      }
    +
    +      @Override
    +      public boolean hasNext() {
    +         while (true) {
    +            if (currentIterator != null) {
    +               if (currentIterator.hasNext()) {
    +                  return true;
    +               }
    +            }
    +            int nextLevel = level + 1;
    +            if (levels != null && nextLevel < levels.length) {
    +               moveToLevel(nextLevel);
    +            } else {
    +               return false;
    +            }
    +         }
    +      }
    +
    +      @Override
    +      public T next() {
    +         while (true) {
    +            if (currentIterator != null) {
    +               if (currentIterator.hasNext()) {
    +                  return currentIterator.next();
    +               }
    +            }
    +            int nextLevel = level + 1;
    +            if (levels != null && nextLevel < levels.length) {
    +               moveToLevel(nextLevel);
    +            } else {
    +               return null;
    +            }
    +         }
    +      }
    +
    +      private void moveToLevel(int level) {
    +         Level<T> level0 = levels[level];
    +         if (resetable) {
    +            currentIterator = level0.resetableIterator().reset();
    +         } else {
    +            currentIterator = level0.iterator();
    +         }
    +         this.level = level;
    +      }
    +
    +      @Override
    +      public ResetableIterator<T> reset() {
    +         if (!resetable) {
    +            throw new IllegalStateException("Iterator is not resetable");
    +         }
    +         levels = queueConsumers.getArray();
    +         level = -1;
    +         currentIterator = null;
    +         return this;
    +      }
    +   }
    +
    +   /**
    +    * This is represents a getPriority and is modeled on CopyOnArrayList.
    +    *
    +    * @param <E>
    +    */
    +   private static class Level<E> {
    --- End diff --
    
    No we cant, as we need access to getArray() without a copy, which is package access only in CopyOnWriteArrayList.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245014300
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import java.util.Iterator;
    +
    +public interface ResetableIterator<T> extends Iterator<T> {
    +
    +   /**
    +    * Resets the iterator so you can re-iterate over all elements.
    +    *
    +    * @return itself, this is just for convenience.
    +    */
    +   ResetableIterator<T> reset();
    --- End diff --
    
    Got it, thanks!


---

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2488
  
    @michaelandrepearce Yep, I have written it everytime while reading the code mate, sorry :)
    Yes I do understand the bits related to byte vs int (and I agree) and related to `Iterator:.reset` too (i agree there too).
    Just I'm concerned about reducing as much as possible the code complexity by having "inherited" a custom "CopyOnWriteArrayList".
    I can't say if is better to simplify it as much as possible to get what we really need or just having this custom version of it as we are doing now. 
    Using a final static `MethodHandle` to steal the array when we need it could be not such a bad solution too: I'm not a fan of `reflection-magic-like` things but I admit that it could avoid to copy an entire implementation just for 1 or 2 missing methods.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r244987340
  
    --- Diff: artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---
    @@ -233,6 +239,11 @@ public Object createSender(ProtonServerSenderContext protonSender,
           return consumer;
        }
     
    +   private int getPriority(Map<Symbol, Object> properties) {
    +      Integer value = properties == null ? null : (Integer) properties.get(PRIORITY);
    --- End diff --
    
    I would use a cast to `Number` and call `Number::byteValue`


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245029854
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ResetableIterator.java ---
    @@ -0,0 +1,29 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import java.util.Iterator;
    +
    +public interface ResetableIterator<T> extends Iterator<T> {
    +
    +   /**
    +    * Resets the iterator so you can re-iterate over all elements.
    +    *
    +    * @return itself, this is just for convenience.
    +    */
    +   ResetableIterator<T> reset();
    --- End diff --
    
    ill mark resolved then.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245008661
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---
    @@ -3080,45 +3053,20 @@ private boolean deliverDirect(final MessageReference ref) {
                 return true;
              }
     
    -         int startPos = pos;
    -
    -         int size = consumerList.size();
    +         consumers.reset();
     
    -         while (true) {
    -            ConsumerHolder<? extends Consumer> holder;
    -            if (redistributor == null) {
    -               holder = consumerList.get(pos);
    -            } else {
    -               holder = redistributor;
    -            }
    +         while (consumers.hasNext() || redistributor != null) {
    --- End diff --
    
    Trying to keep to standard interfaces, this is the standard Iterator methods. Also at this point in the while we do not want to actually get the next, theres some timeouts and other checks needed to be done first, thus next is called a little later on.


---

[GitHub] activemq-artemis issue #2488: ARTEMIS-196 Implement Consumer Priority

Posted by franz1981 <gi...@git.apache.org>.
Github user franz1981 commented on the issue:

    https://github.com/apache/activemq-artemis/pull/2488
  
    I have taken a fast look and already written down some comment: it looks ok to me, but I need to look further in the `QueueConsumersImpl` details.
    I'm happy about the nice abstraction over the consumers to simplify the message handling on `QueueImpl` and I suppose that keeping refactoring and this feature (consumer's priorities) separated isn't easy (nor sure it makes sense TBH), so I won't bother asking for it. well done! :+1: 


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245009418
  
    --- Diff: artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCreateConsumerMessage.java ---
    @@ -95,6 +109,7 @@ public void encodeRest(final ActiveMQBuffer buffer) {
           buffer.writeNullableSimpleString(filterString);
           buffer.writeBoolean(browseOnly);
           buffer.writeBoolean(requiresResponse);
    +      buffer.writeInt(priority);
    --- End diff --
    
    we plan to support int.


---

[GitHub] activemq-artemis pull request #2488: ARTEMIS-196 Implement Consumer Priority

Posted by michaelandrepearce <gi...@git.apache.org>.
Github user michaelandrepearce commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/2488#discussion_r245006670
  
    --- Diff: artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueConsumersImpl.java ---
    @@ -0,0 +1,648 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.core.server.impl;
    +
    +import org.apache.activemq.artemis.core.server.PriorityAware;
    +
    +import java.lang.reflect.Array;
    +import java.util.AbstractCollection;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.Objects;
    +import java.util.Set;
    +import java.util.concurrent.locks.ReentrantLock;
    +import java.util.function.Consumer;
    +import java.util.stream.Collectors;
    +
    +/**
    + * This class's purpose is to hold the consumers, it models around multi getPriority (getPriority) varient of
    + * java.util.concurrent.CopyOnWriteArrayList, so that reads are concurrent safe and non blocking.
    + *
    + * N.b. we could have made Level extend CopyOnWriteArrayList but due to the need to access the internal Array structure,
    + * which is privileged to package java.util.concurrent. As such much of Level is is taken from here.
    + *
    + * Modifications like in CopyOnWriteArrayList are single threaded via a single re-entrant lock.
    + *
    + * Iterators iterate over a snapshot of the internal array structure, so will not see mutations.
    + *
    + * There can only be one resettable iterable view, this is exposed at the top getPriority,
    + *     and is intended for use in QueueImpl only.
    + * All other iterators are not reset-able and are created on calling iterator().
    + *
    + * Methods getArray, setArray MUST never be exposed, and all array modifications must go through these.
    + *
    + * @param <T> The type this class may hold, this is generic as can be anything that extends PriorityAware,
    + *         but intent is this is the QueueImpl:ConsumerHolder.
    + */
    +public class QueueConsumersImpl<T extends PriorityAware> extends AbstractCollection<T> implements QueueConsumers<T> {
    +
    +   private final QueueConsumersIterator<T> iterator = new QueueConsumersIterator<>(this, true);
    +
    +   private volatile Level<T>[] levels;
    +   private volatile int size;
    +   private volatile T first;
    +
    +   private void setArray(Level<T>[] array) {
    +      this.levels = array;
    +   }
    +
    +   private Level<T>[] getArray() {
    +      return levels;
    +   }
    +
    +
    +   public QueueConsumersImpl() {
    +      levels = newLevelArrayInstance(0);
    +   }
    +
    +   @SuppressWarnings("unchecked")
    +   private static <T> Level<T>[] newLevelArrayInstance(int length) {
    +      return (Level<T>[]) Array.newInstance(Level.class, length);
    +   }
    +
    +   @Override
    +   public int size() {
    +      return size;
    +   }
    +
    +   @Override
    +   public boolean isEmpty() {
    +      return size() == 0;
    +   }
    +
    +   @Override
    +   public Set<Integer> getPriorites() {
    +      Level<T>[] levels = getArray();
    +      return Arrays.stream(levels).map(Level::level).collect(Collectors.toSet());
    +   }
    +
    +   @Override
    +   public Iterator<T> iterator() {
    +      return new QueueConsumersIterator<>(this, false);
    +   }
    +
    +   @Override
    +   public boolean hasNext() {
    +      return iterator.hasNext();
    +   }
    +
    +   @Override
    +   public T next() {
    +      return iterator.next();
    +   }
    +
    +   @Override
    +   public QueueConsumers<T> reset() {
    +      iterator.reset();
    +      return this;
    +   }
    +
    +   @Override
    +   public void forEach(Consumer<? super T> action) {
    +      Objects.requireNonNull(action);
    +      Level<T>[] current = getArray();
    +      int len = current.length;
    +      for (int i = 0; i < len; ++i) {
    +         current[i].forEach(action);
    +      }
    +   }
    +
    +   private Level<T> getLevel(int level, boolean createIfMissing) {
    +      Level<T>[] current = getArray();
    +      int low = 0;
    +      int high = current.length - 1;
    +
    +      while (low <= high) {
    +         int mid = (low + high) >>> 1;
    +         Level<T> midVal = current[mid];
    +
    +         if (midVal.level() > level)
    +            low = mid + 1;
    +         else if (midVal.level() < level)
    +            high = mid - 1;
    +         else
    +            return midVal; //key found
    +      }
    +
    +      if (createIfMissing) {
    +         Level<T>[] newLevels = newLevelArrayInstance(current.length + 1);
    --- End diff --
    
    I tried....its not, or at least i couldnt ;)


---